欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式

程序员文章站 2022-05-18 14:58:10
...

之前有篇文件介绍了生产消费者模式(http://blog.csdn.net/canot/article/details/51541920),当时是通过BlockingQueue阻塞队列来实现,以及在Redis中使用pub/sub模式(http://blog.csdn.net/canot/article/details/51938955)。而实际项目中往往是通过JMS使用消息队列来实现这两种模式的。

JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS类似与JDBC,sun提供接口,由各个厂商(provider)来进行具体的实现。市面上众多成熟的JMS规范实现的框架Kafk,RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ等。

JMS的队列消息(Queue)传递过程如下图:

消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式

对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。

JMS的主题消息传递过程如下图:

消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式

对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。

我们从ActiveMQ来实践:(安装部署省掉)

Queue模式实践:

消息生产者:

public class Sender {  

    public static void main(String[] args) throws JMSException, InterruptedException {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
        //61616是ActiveMQ默认端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
                                                  ActiveMQConnection.DEFAULT_USER,  
                                                  ActiveMQConnection.DEFAULT_PASSWORD,  
                                                  "tcp://localhost:61616");  

        // Connection :JMS 客户端到JMS Provider 的连接  
        Connection connection =  connectionFactory.createConnection();  

        connection.start();  
        // Session: 一个发送或接收消息的线程  
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  

        // Destination :消息的目的地;消息发送给谁.  
        Destination destination =  session.createQueue("my-queue");  

        // MessageProducer:消息发送者  
        MessageProducer producer =  session.createProducer(destination);  

        // 设置不持久化,可以更改  
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  

        for(int i=0;i<10;i++){  
            //创建文本消息  
            TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i);  

            Thread.sleep(1000);  
            //发送消息  
            producer.send(message);  
        }  

        session.commit();  
        session.close();  
        connection.close();  
    }  

}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

消息接收者

    // ConnectionFactory :连接工厂,JMS 用它创建连接  
   private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,  
           ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  

   public static void main(String[] args) throws JMSException {  
        // Connection :JMS 客户端到JMS Provider 的连接  
        final Connection connection =  connectionFactory.createConnection();  

        connection.start();  
        // Session: 一个发送或接收消息的线程  
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
        // Destination :消息的目的地;消息送谁那获取.  
        Destination destination =  session.createQueue("my-queue");  
        // 消费者,消息接收者  
        MessageConsumer consumer1 =  session.createConsumer(destination);  

        consumer1.setMessageListener(new MessageListener() {  
                @Override  
                public void onMessage(Message msg) {  

                    try {  

                        TextMessage message = (TextMessage)msg ;  
                        System.out.println("consumerOne收到消息: "+message.getText());  
                        session.commit();  
                    } catch (JMSException e) {                
                        e.printStackTrace();  
                    }  

                }  
            });  
}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

运行之后控制台不会退出一直监听消息库,对于消息发送者的十条信息,控制输出:

consumerOne收到消息: hello.I am producer, this is a test message0
consumerOne收到消息: hello.I am producer, this is a test message1
consumerOne收到消息: hello.I am producer, this is a test message2
consumerOne收到消息: hello.I am producer, this is a test message3
consumerOne收到消息: hello.I am producer, this is a test message4
consumerOne收到消息: hello.I am producer, this is a test message5
consumerOne收到消息: hello.I am producer, this is a test message6
consumerOne收到消息: hello.I am producer, this is a test message7
consumerOne收到消息: hello.I am producer, this is a test message8
consumerOne收到消息: hello.I am producer, this is a test message9
如果此时另外一个线程也存在消费者监听该Queue,则两者交换输出,共输出10条

Topic模式实现

消息发布者

public static void main(String[] args) throws JMSException, InterruptedException {
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
        //61616是ActiveMQ默认端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
                                                  ActiveMQConnection.DEFAULT_USER,  
                                                  ActiveMQConnection.DEFAULT_PASSWORD,  
                                                  "tcp://localhost:61616");  

        // Connection :JMS 客户端到JMS Provider 的连接  
        Connection connection =  connectionFactory.createConnection();  

        connection.start();  
        // Session: 一个发送或接收消息的线程  
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  

        // Destination :消息的目的地;消息发送给谁.  
        //Destination destination =  session.createQueue("my-queue");  
        Destination destination = session.createTopic("STOCKS.myTopic"); //创建topic   myTopic
        // MessageProducer:消息发送者  
        MessageProducer producer =  session.createProducer(destination);  

        // 设置不持久化,可以更改  
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  

        for(int i=0;i<10;i++){  
            //创建文本消息  
            TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i);  
            //发送消息  
            producer.send(message);  
        }  

        session.commit();  
        session.close();  
        connection.close();
    }  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

消息订阅者

private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");

    public void run() {
        // Connection :JMS 客户端到JMS Provider 的连接
        try {
            final Connection connection = connectionFactory.createConnection();

            connection.start();
            // Session: 一个发送或接收消息的线程
            final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // Destination :消息的目的地;消息送谁那获取.
            // Destination destination = session.createQueue("my-queue");
            Destination destination = session.createTopic("STOCKS.myTopic"); // 创建topic
                                                                                // myTopic
            // 消费者,消息接收者
            MessageConsumer consumer1 = session.createConsumer(destination);

            consumer1.setMessageListener(new MessageListener() {

                public void onMessage(Message msg) {

                    try {

                        TextMessage message = (TextMessage) msg;
                        System.out.println("consumerOne收到消息: " + message.getText());
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                }
            });

            // 再来一个消费者,消息接收者
            MessageConsumer consumer2 = session.createConsumer(destination);

            consumer2.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {

                    try {

                        TextMessage message = (TextMessage) msg;
                        System.out.println("consumerTwo收到消息: " + message.getText());
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                }
            });
        } catch (Exception e) {
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

最后消息会重复输出:
consumerOne收到消息: hello.I am producer, this is a test message0
consumerTwo收到消息: hello.I am producer, this is a test message0
consumerOne收到消息: hello.I am producer, this is a test message1
consumerTwo收到消息: hello.I am producer, this is a test message1
consumerOne收到消息: hello.I am producer, this is a test message2
consumerTwo收到消息: hello.I am producer, this is a test message2
consumerOne收到消息: hello.I am producer, this is a test message3
consumerTwo收到消息: hello.I am producer, this is a test message3
consumerOne收到消息: hello.I am producer, this is a test message4
consumerTwo收到消息: hello.I am producer, this is a test message4
consumerOne收到消息: hello.I am producer, this is a test message5
consumerTwo收到消息: hello.I am producer, this is a test message5
consumerOne收到消息: hello.I am producer, this is a test message6
consumerTwo收到消息: hello.I am producer, this is a test message6
consumerOne收到消息: hello.I am producer, this is a test message7
consumerTwo收到消息: hello.I am producer, this is a test message7
consumerOne收到消息: hello.I am producer, this is a test message8
consumerTwo收到消息: hello.I am producer, this is a test message8
consumerOne收到消息: hello.I am producer, this is a test message9

我们简单总结一下使用MQ的过程:

  • 1.创建与MQ的链接
  • 2.创建消息的目的地或者来原地即Destination
  • 3.发送消息或者制定对应的MessageListener

上述就是关于MQ两种消息模型的简单应用,至于具体的细节。如在消费者监听消息时有哪些Listener类型,生产者发送消息时有哪些Message类型。生成Session时参数1表示是否开启事务,至于事务的处理,消息的持久化等等。后面慢慢介绍。

相关标签: 消息中间件