欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Active MQ 消息队列

程序员文章站 2022-05-18 10:21:51
...

有什么用?

对于一个系统来说,少不了通信。如果要有通信的和返回就涉及到了同步和异步的问题了。

讲个小故事:

​ 现在有AB两个人一起做生意,A主要负责收钱,B负责发货。一开始人不多的时候,A收完钱要去给B说发货,两个人合作愉快;可是有时候B不在,A收了钱以后,就要等着B回来才能给B说发货的事情。久而久之,影响到了效率。

​ 有一天,A给B说,“老哥,咱们这样:我收钱写订单,让后我把订单放在一个盒子里。你呢发货的时候也别来找我了,直接去盒子里看有没有订单就行。”

一开始的两个人通信就是同步通信,只有A得到了B的回应才会继续记性;后来A提出的“放入盒子”就是一种异步操作。订单信息我们可以看作是数据,这个盒子就是我们的消息中间件。

当然一个这个盒子只是一个简单的模型,我们使用的时候还要考虑别的事情,像是:订单会不会丢了、订单会不会被修改等等。

小结:消息中间件键的作用小结一下集中:

  1. 削峰:面对大流量的时,率先开始就收消息,这样将压力从io移走,使得系统可用。
  2. 填谷:面对“来就来的多,要不没什么人来”的环境,中间件可以让数据处理在小流量时候依然执行。削峰填谷,可以通过消息异步的方式增强了系统面对大流量时候的适应性,将流量分摊开,更好的使用系统的资源。
  3. 结构改变:消息请求在消息队列中,处理的只要不停的取就可以了,不用在乎现在现在是不是大流量。使得产生请求和处理请求分离。编程时候只要面向中间件编写就行,消息提供方或者接收方的改变不会影响到另一个模块。

activeMQ的安装

解压安装就OK了

默认账号admin 默认密码admin

Active MQ 消息队列

用java实现连接

添加xml依赖

 <!--本地的版本对应-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.14.3</version>
</dependency>

<!--spring的activemq的支持-->
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.17</version>
</dependency>

JMS规范

什么是jms

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。------来自百度百科

jms的编码架构

图片来自百度

Active MQ 消息队列
消息发送分两类,一类是点对点的消息队列queue,另一个是点对多的订阅 主题topic。

基本使用(非持久化)

队列queue

成员:发送方、接收方、发送队列、消息

特点:发送方和接收方互相独立,各自运行各自的。同时消息被消费后不会就消失(有别于订阅)。

模式:这种模式更像是生产者消费者模式。------生产者往“中介”放入数据,消费者消费“中介”的数据。

所以java实现就存在两个身份即 生产者(provider)和消费者(customer)。

  1. 配置地址
  2. 连接工厂 & 创建连接 (端口61616)
  3. 创建一个会话(session)
  4. 创建目的地(queue/topic)
  5. 确定身份(provide还是customer)
  6. 确定消息的类型(它们分别携带:
    1. 简单文本(TextMessage)、
    2. 可序列化的对象 (ObjectMessage)、
    3. 属性集合 (MapMessage)、
    4. 字节流 (BytesMessage)、
    5. 原始值流 (StreamMessage),
    6. 还有无有效负载的消息 (Message)。)
  7. 放入消息
  8. 发送消息
  9. 消息关闭

代码如下:

public class JMSProvider {

    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
//    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
    private final static int TEST_TIME = 5;

    public static void main(String[] args) throws JMSException {
        // 建立一个连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 建立连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建 主题还是队列
        Queue ubw = session.createQueue("ubw");
//        session.createTopic("ubw");
        // 确定生产者
        MessageProducer producer = session.createProducer(ubw);
//        MessageConsumer consumer = session.createConsumer(ubw);
        // 创建类型
        TextMessage textMessage = session.createTextMessage();
        // 输入值
//        注意不能写成这样 一个textMessage算一个消息
//        for(int i = 0; i < TEST_TIME ; i++){
//            textMessage.setText(i+"");
//        }
        for (int i = 0; i < TEST_TIME; i++) {
            textMessage.setText(i+"");
            // 提交
            producer.send(textMessage);
        }
        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

消费发送成功能看到下面的这个图:

Active MQ 消息队列

因为只有生产者没有消费者,我们再写一个消费者。

只是session创建的时候创建一个消费者就行

    public static void main(String[] args) throws JMSException {
        // 建立一个连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 建立连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建 主题还是队列
        Queue ubw = session.createQueue("ubw");
//        session.createTopic("ubw");
        // 确定生产者
//        MessageProducer producer = session.createProducer(ubw);
        MessageConsumer consumer = session.createConsumer(ubw);
        // 第一种方式 循环阻塞的等待
         while (true){
            TextMessage receive = (TextMessage)consumer.receive();
            String text = receive.getText();
            if(StringUtils.isEmpty(text)){
                break;
            }
            System.out.println("消息是: " + text);
        }
        // 第二种方式 监听状态 这种多用于主题订阅 订阅里用的是这个方法
        consumer.setMessageListener((message)->{
            if (message!=null && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收到主题消息"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

结果为:

Active MQ 消息队列
Active MQ 消息队列
Active MQ 消息队列
Active MQ 消息队列

第一种是循环阻塞的等待

consumer.receive(1000)这里等待1s后就关闭

主题topic

订阅模式

生产者

    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
//    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
    private final static int TEST_TIME = 5;

    public static void main(String[] args) throws JMSException {
        // 建立一个连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 建立连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建 主题还是队列
//        Queue ubw = session.createQueue("ubw-topic");
        Topic ubw = session.createTopic("ubw-topic");
        // 确定生产者
        MessageProducer producer = session.createProducer(ubw);

        // 创建类型
        TextMessage textMessage = session.createTextMessage();
        
        for (int i = 0; i < TEST_TIME; i++) {
            textMessage.setText(i+"");
            // 提交
            producer.send(textMessage);
        }

        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }

消费者

    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
//    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
    private final static int TEST_TIME = 5;

    public static void main(String[] args) throws JMSException {
        // 建立一个连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 建立连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建 主题还是队列
//        Queue ubw = session.createQueue("ubw-topic");
        Topic ubw = session.createTopic("ubw-topic");
        // 确定生产者
//        MessageProducer producer = session.createProducer(ubw);
        MessageConsumer consumer = session.createConsumer(ubw);

//

        consumer.setMessageListener((message)->{

            if (message!=null && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收到主题消息"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 卡住等待
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();




    }

注意:这里一定要先监听,后发送,如果顺序反了就不会接收到消息。

持久化

持久化,顾名思义就是将消息队列里面的数据进行持久化,也就是当消息队列挂了重启以后队列中的数据不会消失。

我们先看看java代码里面怎么实现:

queue持久化

    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
//    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
    private final static int TEST_TIME = 5;

    public static void main(String[] args) throws JMSException {
        // 建立一个连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 建立连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue ubw = session.createQueue("ubw-persistence");
        MessageProducer producer = session.createProducer(ubw);

        // 创建类型
        TextMessage textMessage = session.createTextMessage();
        
        for (int i = 0; i < TEST_TIME; i++) {
            textMessage.setText(i+"");
            // 在这里持久化
            textMessage.setJMSDeliveryMode(Message.DEFAULT_PRIORITY);
            // 提交
            producer.send(textMessage);
        }

        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }

Active MQ 消息队列

重启activeMQ以后启动消费者依然可以接收到消息。

topic持久化

主题的持久化有别的与队列,并不是发送的消息持久化,客户端随时上来就能取到。

主题(topic)客户端的持久化是需要订阅的。

Active MQ 消息队列

消费者

    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
//    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
    private final static int TEST_TIME = 5;

    public static void main(String[] args) throws JMSException {
        // 建立一个连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 建立连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 注意这个id要写在创建session会话的前面
        connection.setClientID("clientID");

        // 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建 主题还是队列
        Topic ubw = session.createTopic("ubw-topic-p");
        // 消费者的topic模式下,只能用订阅的方式进行持久化,
        // 即才能实现即便是离线了,上线以后还是能看到消息

        // 设置完持久化在连接


        // 确定生产者
        MessageConsumer consumer = session.createConsumer(ubw);
        // Subscriber 设置name 注意这里是用session创建订阅
        TopicSubscriber sessionID = session.createDurableSubscriber(ubw, "sessionID");
        // 进行连接
        connection.start();

        while (true){
            // 接受消息
            Message receive = sessionID.receive();
            if (receive!=null) {
                TextMessage message = (TextMessage)receive;
                System.out.println(message.getText());
            }else {
                break;
            }
        }
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

这样就是订阅上了 后面provider向这个里面发送消息就可以同步接收了。

Active MQ 消息队列
这时候关闭客户端这边就显示下线了
Active MQ 消息队列

生产者:

需要注意生产者的连接需要放在持久化配置之后,不然不能生效:

    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
//    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private final static String ACTIVE_URL = "tcp://127.0.0.1:61616";
    private final static int TEST_TIME = 5;

    public static void main(String[] args) throws JMSException {
        // 建立一个连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 建立连接
        Connection connection = activeMQConnectionFactory.createConnection();


        // 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建 主题还是队列
//        Queue ubw = session.createQueue("ubw-topic");
        Topic ubw = session.createTopic("ubw-topic-p");
        // 确定生产者
        MessageProducer producer = session.createProducer(ubw);
//        producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 这里也可以持久化


        // 创建类型
        TextMessage textMessage = session.createTextMessage();
        TextMessage textMessage2 = session.createTextMessage();
        textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT); // 这里持久化 这两个持久化其实只是作用域不用而已
        connection.start();
        for (int i = 0; i < TEST_TIME; i++) {
            textMessage.setText(i+"");
            // 提交
            producer.send(textMessage);
        }
        for (int i = 5; i < TEST_TIME + 5 ; i++) {
            textMessage2.setText(i+"");
            // 提交
            producer.send(textMessage2);
        }

        // 关闭资源
        producer.close();
        session.close();
        connection.close()}

执行生产者,发送消息可以看到订阅的客户端多了几条消息。重启activemq以后可以获得数据

Active MQ 消息队列
Active MQ 消息队列

分析:这里都进行了持久化,也就说持久化设置在MessageProducerTextMessage区别不大。同时create的生产者不是单例。也就是说,订阅模式本身就实现了持久化,并不需要加入setJMSDeliveryMode去设置持久化。

消息的队列的事务

对于生产者而言

影响:用send并不能发送消息,需要用commit才能提交事务。

for (int i = 0; i < TEST_TIME; i++) {
            textMessage.setText(i+"");
            textMessage.setJMSDeliveryMode(Message.DEFAULT_PRIORITY);
            // 提交
            producer.send(textMessage);
        }
//        session.commit(); 这里没有启动commit

Active MQ 消息队列

 for (int i = 0; i < TEST_TIME; i++) {
            textMessage.setText(i+"");
            textMessage.setJMSDeliveryMode(Message.DEFAULT_PRIORITY);
            // 提交
            producer.send(textMessage);
        }
        session.commit();

启动commit以后

Active MQ 消息队列

对于消费者而言

影响:如果开启了事务而接收到以后不提交就会重复消费。

// 创建session
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

Active MQ 消息队列
Active MQ 消息队列

//加入commit
System.out.println("commit");
session.commit();

提交了事务以后就没有重复消费了。

Active MQ 消息队列