欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocketmq API顺序消息的使用

程序员文章站 2022-03-23 12:54:37
...

前言

       在使用rocketmq的时候如果要保证严格的顺序,那么就需要将消息发送到rocketmq的一个消息队列中,由于一个消息队列只能在一个broker上,可能处出现短暂的不可用性(当节点的一个master发生主备切换时)。

1. producer demo

使用rocketmq实现的SelectMessageQueueByHash

package com.feng.rocketmq.base;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class OrderedProducerDefaultMode {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        //instance
        DefaultMQProducer producer = new DefaultMQProducer();
        //group, for producer load balance
        producer.setProducerGroup("demo-producer-group");
        //namesrvAddr,cluster nameserver with ; spit
        producer.setNamesrvAddr("127.0.0.1:9876");
        //start
        producer.start();
        // send msg
        int num = 20;
        for (int i = 0; i < num; i++) {
            //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体
            Message message = new Message("demoTopic","tags-1", "instanceKeys", ("I`m a " + i + " rocket mq msg!").getBytes(RemotingHelper.DEFAULT_CHARSET));
            Integer id = i/5;
            SendResult result = producer.send(message, new SelectMessageQueueByHash(), id);
            System.out.println("send result is\t" + result);
        }
        //close, can use for rocket mq switch
        producer.shutdown();
    }
}

通过对参数hash取模队列总数,分发到某一条消息队列

默认3种实现

rocketmq API顺序消息的使用

当然也可以自己实现逻辑,要保证

producer --> messagequeue --> consumer 完整的一对一关系,才能保证消息的顺序发送;当然我们只需要保证消息的局部一致性就可以了,全局一致性不需要保证,否则性能会大幅度下降,也是不需要这种实际场景的。

我们可以根据自己的业务自己实现

SendResult result = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    return mqs.get((Integer) arg % mqs.size());
                }
            }, 1);

2. consumer

public class OrderedPushConsumer {
    public static void main(String[] args) throws MQClientException {
        //instance
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        //group
        consumer.setConsumerGroup("demo-consumer-group");
        //setNamesrvAddr,cluster with ; spit
        consumer.setNamesrvAddr("127.0.0.1:9876");

        //consumer offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //subscribe, the subExpression is tags in message send
        //subscribe topic store in map
        consumer.subscribe("demoTopic", "tags-1");
        //can subscribe more
        //consumer.subscribe("demoTopic2", "*");
        //or use setSubscription, method is deprecated
        //consumer.setSubscription();

        //batch consumer max message limit
        consumer.setConsumeMessageBatchMaxSize(1000);
        //min thread
        consumer.setConsumeThreadMin(10);

        //listener, MessageListenerOrderly for one messagequeue can only consumed by one thread
        consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
            try {
                for (MessageExt messageExt : list) {
                    if (messageExt.getReconsumeTimes() > 1){
                        continue;
                    }
                    String topic = messageExt.getTopic();
                    int queueId = messageExt.getQueueId();
                    String message = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.out.println("the topic: " + topic + "\tqueueId:" + queueId + "\t body:" + message);
                }
            } catch (Exception e) {
                //retry
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();

        //consumer.shutdown();
    }
}

多个队列之间不保证顺序,单个队列保证顺序。如果consumer集群的某一个节点宕机或者故障导致消费消息未提交,可能导致重复消费

先看看MessageListenerOrderly,使用线程池处理推送消息,使用lock锁保证消息队列被一个线程处理

rocketmq API顺序消息的使用

3. 多个consumer的重复消费需要自行处理

再起一个consumer端,同一group

rocketmq API顺序消息的使用

 消费时,退出进程

rocketmq API顺序消息的使用

rocketmq API顺序消息的使用

确实会重复消费,原因很明显了,状态未提交,rocketmq不知道consumer是否消费了,需要自行去重或者幂等设计

 

总结

        rocketmq原生提供通过Hash取模算法的顺序一致性,保证局部一致性。性能较强,满足业务需求,还可以定制算法,提供灵活扩展性。消费者不保证唯一消费,需要自行处理。rocketmq还可以支持分布式事务,下一章说明。

 

 

相关标签: rocketmq