欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ学习记录(二)

程序员文章站 2022-03-23 16:24:33
...

    经过上一篇简单了解ActiveMQ后,我们继续了解ActiveMQ,再次说明,本系列主要参考一些好的博客后进行整理,自己尝试后编写的。好了,那我们开始继续学习ActiveMQ。

    消息的顺序消费

    在上一篇文章中,我们已经明确知道了ActiveMQ并不能保证消费的顺序性,即便我们使用了消息优先级。而在实际开发中,有些场景又是需要对消息进行顺序消费的,比如:用户从下单、到支付、再到发货等。如果使用ActiveMQ该如何保证消费的顺序性呢?

    ActiveMQ学习记录(二)

    首先来说,在实际中,我们并不需要的是对全部消息的全局有序消费,我们仅仅需要的是局部业务有序性消费。比如说,我们仅仅需要的是一个用户的下订单、支付、发货这个过程的3条消息有序消费。

    比如,我们可以根据用户ID简单做一个HASH,将消息定位到不同的队列上,也就意味着同一个用户的消息将发往同一个队列。这样做的好处在于,多个队列之间可以并行处理。

    然后,在队列上可以对一段时间上的消息按照用户分组进行排序,这只是一个少量消息的局部排序而已,比如Queue-A上有一个用户的3条消息(订单消息msg1、支付消息msg2、发货消息msg3),那么,msg1将交给订单业务系统,处理完成后,msg2交给支付系统,处理完成后,msg3交给发货系统。虽然这个处理过程是同步的(一条消息处理完,在接着处理),但是它的并发性,系统的处理能力并没有下降!为什么这么说呢?

    假设,msg1/msg2/msg3处理各需要0.1S,如果订单业务系统、支付系统、发货系统并没有分开,而是一个“大系统”,那么显然订单业务在0.1S完成后,需要等待后面的支付、发货逻辑处理完才能继续工作,意味着订单业务干了0.1S的活,等了0.2S,导致在0.3秒内订单业务只处理了1条消息。而现在这3个系统是分开的,那么在0.3S内,订单业务系统可以处理3条消息,而且没有业务系统闲着!

   JMS Selectors

    JMS Selectors,即消息选择器。之前介绍过消息的组成部分是消息头+消息属性+消息体,其中谈到消息对象有消息属性,用于消息选择器。下面是个完整的例子,看完这个例子应该就理解了,代码中都有相关注释说明。

    Producer端代码:

 public static void main(String[] args) throws JMSException {
        Destination destination = MQUtils.createDestination();
        //通过session(MQUtils.createSession())创建 发送消息的生产者
        MessageProducer producer = MQUtils.createSession2().createProducer(destination);

        MapMessage mapMessage1 = MQUtils.createSession2().createMapMessage();
        mapMessage1.setString("name","zhangsan");
        //设置消息属性,用于消息选择器
        mapMessage1.setIntProperty("age",23);

        MapMessage mapMessage2 = MQUtils.createSession2().createMapMessage();
        mapMessage2.setString("name","lisi");
        mapMessage2.setIntProperty("age",25);

        MapMessage mapMessage3 = MQUtils.createSession2().createMapMessage();
        mapMessage3.setString("name","wangwu");
        mapMessage3.setIntProperty("age",27);

        //参数说明:
        // 第一个参数destination其实是连接到那个队列
        //第二个参数mapMessage1是发送的消息(消息头+消息属性+消息体)
        //第三个参数是否序列化DeliveryMode.NON_PERSISTENT表示当activemq关闭的时候,队列里面的数据将会被清空
        //第四个参数优先的等级,0-9
        //第五个参数,失效时间
        producer.send(destination,mapMessage1,DeliveryMode.NON_PERSISTENT,4,1000*60*10);
        producer.send(destination,mapMessage2,DeliveryMode.NON_PERSISTENT,4,1000*60*10);
        producer.send(destination,mapMessage3,DeliveryMode.NON_PERSISTENT,4,1000*60*10);

        //关闭连接
        MQUtils.close();
    }

    Consumer端代码:

 public static void main(String[] args) throws JMSException {
        Destination destination = MQUtils.createDestination();
        //设置获取什么样的消息属性,语法是SQL92
        String condition = " age >= 25";

        //通过session(MQUtils.createSession2(),创建的是手动签收的session对象)创建 发送消息的消费者
        //其中消费者从队列获取消息的条件是condition
        MessageConsumer consumer = MQUtils.createSession2().createConsumer(destination,condition);

        //实现一个消息的监听器
        //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if(message instanceof  MapMessage){
                        //获取到接收的数据
                        String name = ((MapMessage)message).getString("name");
                        System.out.println("Consumer get : "  + name + "---");
                        //手动签收必须给出应答
                        message.acknowledge();
                    }

                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    其中MQUtils和第一节中的一样,只是多加一个方法如下:

public  static Session createSession2() throws JMSException {
        if(session == null ){
            //这里我们使用手动签收,前面讲过了签收,这里回顾下
            //AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收
            //CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收
            //DUPS_OK_ACKNOWLEDGE:签不签收无所谓了,只要消费者能够容忍重复的消息接受,当然这样会降低Session的开销
            session = createConnection().createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
        }
        return session;
    }

    运行Consumer后在运行Producer,可以在控制台看到如下结果:

    ActiveMQ学习记录(二)

    同时我们可以从管控台看出,ActiveMQ中生产者生产了3条消息,但消费者只消费了2条消息,剩下1条消息未被消费

ActiveMQ学习记录(二)    好了,这里梳理下几个注意的事项点:

    第一:生产者需要设置消息的属性,一定要注意是setXxxProperty(filed,value)

    第二:生产者要给出条件, 条件的语法是SQL92,可以自己去了解下

    第三:创建消费者的时候要给出条件,即下面的condition参数

MessageConsumer consumer = MQUtils.createSession2().createConsumer(destination,condition);

    第四:消费者在实现监听器的时候,最好通过instanceof来判断类型后在进行处理,即下面

if(message instanceof  MapMessage)

    第五:session对象是手动签收的话,一定要在获取消息并处理完后进行手动签收,给ActiveMQ答复,即下面

message.acknowledge();

   P2P和Pub/Sub

    上一节也简绍了P2P和Pub/Sub分别是什么以及他们的区别,这里通过两张图让我们更直白了解他们的区别。

    P2P:生产者发送一条消息,消费端只有一个消费者消费这个消息,好像打电话,一对一通信。

      ActiveMQ学习记录(二)

    Pub/Sub:  一对多通信,生产者发送一条消息,消费端只要订阅了该目标的消费者都能收到消息。

     ActiveMQ学习记录(二)

    P2P、Pub/Sub在代码上的区别点仅仅在于,目标类型的创建是createQueue or createTopic,其他一切照旧!

    对于订阅模式,对订阅者提出了特殊的要求,要想收到消息,必须先订阅,而且订阅进程必须一直处于运行状态!实际上,有时候消费者重启了下,那么这个消费者将丢失掉一些消息,那么能否避免这样的情况呢?ActiveMQ已经替我们想好了,就是持久化订阅!

   持久化订阅

    所谓持久化订阅,打个比方,就是说跟MQ打声招呼,即便我不在,那么给我发送的消息暂存在MQ,等我来了,再给我发过来。说白了,持久化订阅,需要给MQ备个案(你是谁,想在哪个Topic上搞特殊化)!下面看一个实例:
    Producer:
 public static void main(String[] args) throws JMSException {
        String myTopic = "myTopic";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL
        );
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
        Destination destination = session.createTopic(myTopic);

        MessageProducer producer = session.createProducer(destination);

        MapMessage mapMessage1 =session.createMapMessage();
        mapMessage1.setString("name","zhangsan");
        //设置消息属性,用于消息选择器
        mapMessage1.setIntProperty("age",23);

        MapMessage mapMessage2 =session.createMapMessage();
        mapMessage2.setString("name","lisi");
        mapMessage2.setIntProperty("age",25);

        MapMessage mapMessage3 = session.createMapMessage();
        mapMessage3.setString("name","wangwu");
        mapMessage3.setIntProperty("age",27);

        producer.send(destination,mapMessage1,DeliveryMode.PERSISTENT,4,1000*60*10);
        producer.send(destination,mapMessage2,DeliveryMode.PERSISTENT,4,1000*60*10);
        producer.send(destination,mapMessage3,DeliveryMode.PERSISTENT,4,1000*60*10);

        session.close();
    }

    ConsumerOne(普通的topic):

  public static void main(String[] args) throws JMSException {
        String myTopic = "myTopic";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL
        );
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
        Destination destination = session.createTopic(myTopic);

        String condition = " age >= 25";

        MessageConsumer consumer = session.createConsumer(destination,condition);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if(message instanceof MapMessage){
                        //获取到接收的数据
                        String name = ((MapMessage)message).getString("name");
                        System.out.println("ConsumerOne get : "  + name + "---");
                        //手动签收必须给出应答
                        message.acknowledge();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    ConsumerTwo:

    public static void main(String[] args) throws JMSException {
        String myTopic = "myTopic";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL
        );
        Connection connection = connectionFactory.createConnection();
        //将ConsumerTwo消费者的ID注册到JMS服务器上,注意,这个clientId是不能重复的
        connection.setClientID("consumer-1");
        connection.start();
        Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
        Destination destination = session.createTopic(myTopic);

        String condition = " age >= 25";

        //参数1:发送主题目的地,参数2:持久订阅者名字,
        //参数3:消息过滤条件,参数4:是否只接收同一clientID的信息,默认false
        MessageConsumer consumer = session.createDurableSubscriber((Topic)destination,"consumer-1",condition,false);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if(message instanceof MapMessage){
                        //获取到接收的数据
                        String name = ((MapMessage)message).getString("name");
                        System.out.println("ConsumerTwo get : "  + name + "---");
                        //手动签收必须给出应答
                        message.acknowledge();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    我们运行实例看看,注意运行顺序,首先运行ConsumserOne、ConsumerTwo,最后在运行Producer,我们可以看到ConsumerOne和ConsumerTwo接收到了生产者的消息,接着我们将ConsumerTwo和Producer关掉,在重启producer后再重启ConsumerTwo,发现ConsumerTwo还能接收到producer消息。这就是持久化订阅的效果。

    每一个持久化订阅者都应该有一个唯一的ID作为标示以及要在哪个Topic上进行持久化订阅,一旦这些信息告知MQ之后,那么以后不论持久化订阅者在不在线,那么他的消息会暂存在MQ,以后都会发给他!

    LZ在写的时候发现了几个异常,这里总结梳理下:

    一:创建持久订阅的时候,必须要设置client,否则会报错:  

        javax.jms.JMSException: You cannot create a durable subscriber without specifying a unique clientID on a                    Connection  

    二:如果clientID重复(已经存在相同id的活动连接),会报错  

        javax.jms.InvalidClientIDException: Broker: localhost - Client: XXX   already connected from tcp://127.0.0.1:2758  

    三 在同一个连接的ClientID下,持久订阅者的名称必须唯一  

    javax.jms.JMSException: Durable consumer is in use for client: 1 and  subscriptionName: XX

   持久化消息到MySQL

    在前文中已经提及默认情况下,ActiveMQ是开启持久化消息机制的,并且是持久化到kahadb的,但是"很可惜"kahadb对我们不是很友好的可视化,其实ActiveMQ提供了配置的方式让我们来选择持久化消息到哪里,这里我以到MySQL为例来说明。(实际上ActiveMQ已经在conf配置文件中提供了相应的例子,我这里就简单说明下)

    第一步,需要将mysql的jar包放入ActiveMQ的lib下,如图

    ActiveMQ学习记录(二)

    第二步:配置activemq.xml,添加一个bean内容(加粗为注意点)和修改persistenceAdapter标签的内容:

<bean id="mysqlTest" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
	<property name="driverClassName" value="com.mysql.jdbc.Driver" />
	<property name="url" value="jdbc:mysql://localhost:3306/activemqtest?relaxAutoCommit=true" />
	<property name="username" value="root" />
	<property name="password" value="123456" />
	<property name="maxTotal" value="200" />
	<property name="poolPreparedStatements" value="true" />
</bean>
       persistenceAdapter修改如下:
<persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#mysqlTest" />
</persistenceAdapter>
    第三步,创建数据库表activemqtest,并设置字符集为latin1,

    第四步,启动ActiveMQ,然后会发现mysql中activemqtest中多了三张表,即说明持久化成功

    ActiveMQ学习记录(二)

    第五步,运行之前持久化订阅的代码,然后可以观察到表里数据的变化,例如:   

ActiveMQ学习记录(二)

ActiveMQ学习记录(二)

    好了,这篇就到这里,下一篇将讲下ActiveMQ和spring的整合。谢谢!


    主要参考博文:https://www.jianshu.com/p/f7a7105b3c27

相关标签: MQ