欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activemq消息队列-发布/订阅

程序员文章站 2022-03-07 19:17:49
...

 

  发布-订阅消息:一个消息多次消费(群发)

 

  阻塞的方式接收消息

 

 

 public class TopicProducter {
   
    // 发送次数
   public static final int SEND_NUM = 10;
   // tcp 地址 服务器器端地址
   //public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
   public static final String BROKER_URL = "tcp://192.168.191.12:61616";
   // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
   public static final String DESTINATION = "jd.mq.topic";
   //测试连接使用默认的用户名
   public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
   //测试连接使用默认的密码
   public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
 
   /**
    * 消息发送端
    * @param session
    * @param publisher
    * @throws Exception
    */
   public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
       for (int i = 0; i < SEND_NUM; i++) {
           String message = "发送消息第" + (i + 1) + "条";
           
           TextMessage textMessage = session.createTextMessage(message);
           System.out.println(textMessage.getText());
           //发送 Topic消息
           publisher.send(textMessage);
       }
   }
   
   public  void run() throws Exception {
       //Topic连接
       TopicConnection connection = null;
       //Topic会话
       TopicSession session = null;
       try {
           // 1、创建链接工厂
           TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicProducter.DEFAULT_USER, TopicProducter.DEFAULT_PASSWORD, TopicProducter.BROKER_URL);
           // 2、通过工厂创建一个连接
           connection = factory.createTopicConnection();
           // 3、启动连接
           connection.start();
           // 4、创建一个session会话
           session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
           // 5、创建一个消息队列
           Topic topic = session.createTopic(DESTINATION);
           // 6、创建消息发送者
           TopicPublisher publisher = session.createPublisher(topic);
           // 设置持久化模式
           publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
           sendMessage(session, publisher);
           // 提交会话
           session.commit();
           
           
       } catch (Exception e) {
           throw e;
       } finally {
           // 关闭释放资源
           if (session != null) {
               session.close();
           }
           if (connection != null) {
               connection.close();
           }
       }
   }
   
   public static void main(String[] args) throws Exception {
     new  TopicProducter().run();
   }
}

 

public class TopicReceiver {
  
 
    // tcp 地址 服务器器端地址
   // public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
 public static final String BROKER_URL = "tcp://192.168.191.12:61616";
 // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
    public static final String DESTINATION = "jd.mq.topic";
    //测试连接使用默认的用户名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
    //测试连接使用默认的密码
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
 
    
    public static void run() throws Exception {
        
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 1、创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver.DEFAULT_USER, TopicReceiver.DEFAULT_PASSWORD, TopicReceiver.BROKER_URL);
            // 2、通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 3、启动连接
            connection.start();
            // 4、创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、创建一个消息队列
            Topic topic = session.createTopic(DESTINATION);
            // 6、创建消息制作者
            final TopicSubscriber subscriber = session.createSubscriber(topic);
            
            //接收Topic生产者发送过来的消息
            //需要注意的是此处需要启动一个新的线程来处理问题
            new Thread(){
                public void run(){
                TextMessage textMessage = null;
                try {
                    while(true){//持续接收消息
                    textMessage = (TextMessage) subscriber.receive();
                    if(textMessage==null)
                     break;
                    System.out.println("接收#" + textMessage.getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                }
            }.start();
            
            // 休眠100s再关闭 接收生产者发送的全部的10条消息
            // 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
            // 也就是TopicReceiver_Receive这个类进入休眠状态了,而接收者.start方法刚刚启动的新线程会继续执行的哦。
            Thread.sleep(1000 *100);
            
            // 提交会话
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopicReceiver.run();
    }
}

相关标签: 消息队列