欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activemq消息队列-使用监听器来接收消息(常用)

程序员文章站 2022-07-13 16:44:35
...

 

//点对点-使用监听器接收消息
public class ConsumerListener {
  
    // tcp 地址 服务器器端地址
    //public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
 public static final String BROKER_URL = "tcp://192.168.191.12:61616";
 // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp中可以查询到发送的mq消息
    public static final String DESTINATION = "Jaycekon-MQ";
    //测试连接使用默认的用户名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
    //测试连接使用默认的密码
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
    
    public static void run() throws Exception {
        
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 1、创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ConsumerListener.DEFAULT_USER, ConsumerListener.DEFAULT_PASSWORD, ConsumerListener.BROKER_URL);
            // 2、通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 3、启动连接
            connection.start();
            // 4、创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、创建一个消息队列
            Queue queue = session.createQueue(DESTINATION);
            // 创建消息接收者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
            
            //使用内部类为消息接收者加载相应的Listener监听
            receiver.setMessageListener(new MessageListener() {
                //重写onMessage方法
                public void onMessage(Message msg) {
                    if (msg != null) {
                        TextMessage textMessage = (TextMessage) msg;
                        try {
                            System.out.println("接收#" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠10s再关闭 接收生产者发送的全部的10条消息
            // 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
            // 也就是QueueReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
            Thread.sleep(1000 * 10);
            
            // 提交会话
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        ConsumerListener.run();
    }
}

//使用监听器的方式订阅消息
public class TopicListener {
  
    // tcp 地址 服务器器端地址
    //public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
 public static final String BROKER_URL = "tcp://192.168.191.12:61616";
 // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
    public static final String DESTINATION = "jd.mq.topic";
    //测试连接使用默认的用户名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
    //测试连接使用默认的密码
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
 
    
    public static void run() throws Exception {
        
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 1、创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicListener.DEFAULT_USER, TopicListener.DEFAULT_PASSWORD, TopicListener.BROKER_URL);
            // 2、通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 3、启动连接
            connection.start();
            // 4、创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、创建一个消息队列
            Topic topic = session.createTopic(DESTINATION);
            // 6、创建消息制作者
            TopicSubscriber subscriber = session.createSubscriber(topic);
            
            //使用监听器的方式订阅消息
            subscriber.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg != null) {
                        TextMessage textMessage = (TextMessage) msg;
                        try {
                            System.out.println("接收#" + textMessage.getText());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠100s再关闭 接收生产者发送的全部的10条消息
            // 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
            // 也就是TopicReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
            Thread.sleep(1000 *100);
            
            // 提交会话
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopicListener.run();
    }
}

相关标签: 消息队列