欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMq-顺序消息

程序员文章站 2022-07-14 23:42:11
...

顺序消息

顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生
成、付款、发货,这3个消息必须按顺序处理才行。
顺序消息分为全局顺序消息和部分顺序消息:

  1. 全局顺序消息指某个Topic下的所有消息都要保证顺序;
  2. 部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一
    个订单ID的三个消息能按顺序消费即可。
    在多数的业务场景中实际上只需要局部有序就可以了

RocketMQ在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这时
候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个
Consumer也可能启动多个线程并行处理,所以消息被哪个Consumer消费,被消费的顺序和写入的顺
序是否一致是不确定的

要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发
设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都
设置成单线程处理。
RocketMq-顺序消息
要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题

Consumer使用MessageListenerOrderly的时候,下面四个Consumer的设置依旧可以使用:

  1. setConsumeThreadMin
  2. setConsumeThreadMax
  3. setPullBatchSize
  4. setConsumeMessageBatchMaxSize。
    前两个参数设置Consumer的线程数;
    PullBatchSize指的是一次从Broker的一个Message Queue获取消息的最大数量,默认值是32;

ConsumeMessageBatchMaxSize指的是这个Consumer的Executor(也就是调用
MessageListener处理的地方)一次传入的消息数(Listmsgs这个链表的最大长度),
默认值是1。
上述四个参数可以使用,说明MessageListenerOrderly并不是简单地禁止并发处理。在MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得
这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一个Consumer Queue的消
息不被并发消费,但不同Consumer Queue的消息可以并发处理

部分有序:
顺序消息的生产和消费

# 创建主题,88[root@node1 ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 8 -t
tp_demo_07 -w 8
# 删除主题的操作:
[root@node1 ~]# mqadmin deleteTopic -c DefaultCluster deleteTopic -n
localhost:9876 -t tp_demo_07
# 主题描述
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07

OrderProducer

public class OrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_07_01");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
List<MessageQueue> queues =
producer.fetchPublishMessageQueues("tp_demo_07");
System.err.println(queues.size());
MessageQueue queue = null;
for (int i = 0; i < 100; i++) {
queue = queues.get(i % 8);
message = new Message("tp_demo_07", ("hello lagou - order
create" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello lagou - order
payed" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello lagou - order ship"
+ i).getBytes());
producer.send(message, queue);
} p
roducer.shutdown();
}
}

OrderConsumer

public class OrderConsumer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("consumer_grp_07_01");
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> messageQueues =
consumer.fetchSubscribeMessageQueues("tp_demo_07");
System.err.println(messageQueues.size());
for (MessageQueue messageQueue : messageQueues) {
long nextBeginOffset = 0;
System.out.println("===============================");
do {
PullResult pullResult = consumer.pull(messageQueue, "*",
nextBeginOffset, 1);
if (pullResult == null || pullResult.getMsgFoundList() ==
null) break;
nextBeginOffset = pullResult.getNextBeginOffset();
List<MessageExt> msgFoundList =
pullResult.getMsgFoundList();
System.out.println(messageQueue.getQueueId() + "\t" +
msgFoundList.size());
for (MessageExt messageExt : msgFoundList) {
System.out.println(
messageExt.getTopic() + "\t" +
messageExt.getQueueId() + "\t" +
messageExt.getMsgId() + "\t" +
new String(messageExt.getBody())
);
}
} while (true);
} 
consumer.shutdown();
}
}

全局有序:

顺序消息的生产和消费:

# 创建主题,8写8读
[aaa@qq.com ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 1 -t
tp_demo_07_01 -w 1
# 删除主题的操作:
[aaa@qq.com ~]# mqadmin deleteTopic -c DefaultCluster deleteTopic -n
localhost:9876 -t tp_demo_07_01
# 主题描述
[aaa@qq.com ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07_01
  • GlobalOrderProduer
public class GlobalOrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_07_02");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
for (int i = 0; i < 100; i++) {
message = new Message("tp_demo_07_01", ("hello lagou" +i).getBytes());
producer.send(message);
} 
producer.shutdown();
}
}
  • GlobalOrderConsumer
public class GlobalOrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_07_03");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_07_01", "*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
} 
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
相关标签: MQ