RocketMQ
此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!
一、RocketMQ简介
1.1、介绍
RocketMQ是一款分布式、队列模型的消息中间件,由Metaq3.X版本改名而来,RocketMQ并不遵循包括JMS规范在内的任何规范,但是参考了各种规范不同类产品的设计思想,自己有一套自定义的机制,简单来说就是使用订阅主题的方式去发送和接收任务,但是支持集群和广播两种消息模式。开源项目地址:https://github.com/apache/rocketmq
具有以下特点:
1、能够保证严格的消息顺序
2、提供丰富的消息拉取模式
3、高效的订阅者水平扩展能力
4、实时的消息订阅机制
5、亿级消息堆积能力
选用理由:
1、强调集群无单点,可扩展,任意一点高可用,水平可扩展。
2、海量消息堆积能力,消息堆积后,写入低延迟。
3、支持上万个队列。
4、消息失败重试机制。
5、消息可查询。
6、开源社区活跃。
7、成熟度(历经多次天猫双十一海量消息考验)
1.2、专业术语
1、Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息。
2、Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费。
3、Push Consumer
Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
4、Pull Consumer
Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
5、Producer Group
一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
6、Consumer Group
一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
7、Broker
消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。
8、广播消费
一条消息被多个 Consumer 消费,即使返些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。
在 CORBA Notification 规范中,消费方式都属于广播消费。
在 JMS 规范中,相当于 JMS publish/subscribe model
9、集群消费
一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
在 CORBA Notification 规范中,无此消费方式。
在 JMS 规范中,JMS point-to-point model 与之类似,但是 RocketMQ 的集群消费功能大等于 PTP 模型。
因为 RocketMQ 单个 Consumer Group 内的消费者类似于 PTP,但是一个 Topic/Queue 可以被多个 Consumer Group 消费。
10、顺序消息
消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要挃的是尿部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序収送,丏収送到同一个队列,返样 Consumer 就可以挄照 Producer 发送的顺序去消费消息。
11、普通顺序消息
顺序消息的一种,正常情冴下可以保证完全的顺序消息,但是一旦収生通信异常,Broker 重启,由亍队列总数収生发化,哈希叏模后定位的队列会发化,产生短暂的消息顺序丌一致。如果业务能容忍在集群异常情冴(如某个 Broker 宕机戒者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
12、严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器丌可用,则整个集群都丌可用,服务可用性大大降低。
如果服务器部署为同步双写模式,此缺陷可通过备机自劢切换为主避免,丌过仍然会存在几分钟的服务丌可用。(依赖同步双写,主备自劢切换,自劢切换功能目前迓未实现)
目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
13、Message Queue
在 RocketMQ 中,所有消息队列都是持丽化,长度无限的数据结构,所谓长度无限是挃队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。
也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。
1.3、关键概念
1.3.1、主题与标签
主题Topic:第一级消息类型,书的标题;
标签Tags:第二级消息类型,书的目录,可以基于Tag做简单的消息过滤,通常这已经可以满足90%的需求了,如果有更复杂的过滤场景,就需要使用rocketmq-filtersrv组件了。
例如,主题是订单交易,那么标签可以是订单交易-创建、订单交易-付款、订单交易-完成。
通过查看源码就可以发现:一个主题在MQ上默认会有4个Queue队列来存储该主题上的消息,Queue的数量也可以在创建主题时指定。这也是为什么,当MQ采用双Master集群方式时,如果向MQ发送100条消息,其中52条在BrokerA上,48条在BrokerB上。因为4条发给A,4条发给B…依次循环下去,最后4条是发给了A,所以A比B多存储了4条消息。
1.3.2、群组
生产组:用于消息的发送的群组,官方推荐:一个生产组理应发送的是同一主题的消息,消息子类型再使用Tags来区分;
消费组:用于消息的订阅处理的群组,官方推荐:一个消费组理应消费的是同一主题的消息,再使用Tags在Broker做消息过滤。
生产组和消费组极大地方便了扩缩机器、增减处理能力等,同时只有群组名相同才会被认为是一个集群组的,RocketMQ默认情况下采用集群消费模式,所以消息每次只会随机的发给每个消费群组中的一员,这也体现了RocketMQ集群无单点、水平可扩展、任意一点高可用、支持负载均衡等特点。
1.4、RocketMQ核心模块
rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。
rocketmq-client:提供发送、接受消息的客户端API。
rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
rocketmq-common:通用的一些类,方法,数据结构等。
rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议。
rocketmq-store:消息、索引存储等。
rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!【一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件】。
rocketmq-tools:命令行工具。
二、RocketMQ示例
2.1、RocketMQ部署–双master方式
可参考我的博文:“RocketMQ部署–双master方式”。
2.2、HelloWorld示例
2.2.1、生产者
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//实例化生产者,实例化时需要指定生产组名
DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
//设置namesrc地址,有多个的话用";"隔开
producer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
//启动生产者
producer.start();
for(int i=1;i<=100;i++){
//创建一条消息,指定了消息的主题topic、标签tag、消息的内容
Message msg = new Message("TopicQuickStart", "TagA", ("Hello RocketMQ "+i).getBytes());
//发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
//关闭生产者,main方法主线程结束,程序终止
producer.shutdown();
}
}
2.2.2、消费者
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//实例化消费者,实例化时需要指定消费组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
//设置namesrc地址,有多个的话用";"隔开
consumer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置每次消费的消息最大数量,默认是1,即一条条拉取
consumer.setConsumeMessageBatchMaxSize(10);
//设置订阅的消息主题topic和标签tags,这里订阅TopicQuickStart主题下的所有消息,所以会收到上面生产者发送的该主题下标签为TagA的消息
consumer.subscribe("TopicQuickStart", "*");
//注册消费监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//如果不设置每次消费的消息最大数量,这里的msgs里只会有一条
System.out.println("消息条数:"+msgs.size());
for(MessageExt msg : msgs){
System.out.println(Thread.currentThread().getName()+"收到消息:topic:"+msg.getTopic()+",tags:"+msg.getTags()+",msg:"+new String(msg.getBody()));
}
//回复RocketMQ,这条消息消费成功,如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER,即表明消息消费失败,那RocketMQ会对这条消息进行重发操作
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者,main方法主线程结束后,程序不会停止,进入阻塞状态,来一条消息就触发一次监听事件
consumer.start();
System.out.println("Consumer Started.");
}
}
可以执行多次上面消费者的main方法,也就是启动多个这样的消费者,因为在一个群组里,消息每次只会发送给群组里的一个成员,所以假设有100条消息,启动了两个同一群组的消费者,那么每个消费者各消费50条消息。可见,RocketMQ自动完成了相同群组下的消费者的负载均衡操作,而且如果想增减消费者,只需启动或者关闭消费者即可,无需任何配置,水平可扩展性好!
如果要切换成广播消费模式,每个消费端都需进行下面的设置:
consumer.setMessageModel(MessageModel.BROADCASTING);//设置为广播消费模式
这样即使是同一个消费组的消费者,也都会收到订阅的所有消息,不会进行均衡消费。
2.3、两类Consumer
在RocketMQ里,Consumer分为两类:MQPullConsumer和MQPushConsumer。其实两种都是拉模式(pull),即Consumer轮询从broker拉取消息。
push方式就是上面例子里的消费者,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumerMessage()来消费,对用户而言,感觉消息是被推送过来的。
pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
三、消息重试
3.1、生产端消息重试
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败,这种消息失败重试我们可以手动设置发送失败重试的次数。
producer.setRetryTimesWhenSendFailed(3); //设置重试次数
producer.send(msg, 1000); //发送消息,并设置消息发送超时时间
上面的代码表示消息在1S内没有发送成功就会触发重试,重试最多3次。
3.2、消费端消息重试
消费端在收到消息并处理完成会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功,如果返回了失败或者没返回就会触发重试,即MQ会把消息再发一遍。所以,发生消费端的消息重试有两种情况:1、返回了ConsumeConcurrentlyStatus.RECONSUME_LATER直接表明消费失败;2、长时间没有返回消息处理状态给MQ导致超时。
消息重复消费
值得注意的是,当一个消费组有多个消费者时,其中一个消费者处理消息后长时间没返回,那么MQ就会把这条消息进行重试,会发送给同一消费组的另外一个消费者进行消费。要是这时候之前的消费者又把消息处理结果返回了,那就出现了消息重复消费的问题。
RocketMQ无法避免消息重复,如果业务对消息重复非常敏感,务必要在业务层面去重,这就要求我们一定要做好消费端幂等处理。比如每条消息都有一个唯一编号,每处理完一条消息就记录日志,当消息再来的时候判断一下本条消息是否处理过。需要注意的是,如果消费端处理消息后的结果保存在DB中,那记录日志的操作也一定要保存在这个DB中,这样才能保证事务,其中有一步失败了就会一起回滚。倘若把消息处理后的结果存在mysql里,日志却记录在redis中,然后每次消息再来的时候去redis中查看是否已经处理过,这样是错误的做法,本以为放redis里再去查询的时候速度快,可以提升性能,但是却导致事务的一致性无法保证(比如mysql操作成功了而redis操作失败了那怎么回滚呢),至少目前为止单靠spring的事务管理无法回滚两个数据源的操作,需要增加其他的组件,所以建议都在一个DB中操作。
四、集群
推荐的几种 Broker 集群部署方式,这里的 Slave 不可写,但可读,类似于 Mysql 主备方式。当主节点挂了,就可以访问从节点来获取之前未消费的数据。但是因为Slave是只读的,所以不会接收生产者生产的新数据,新数据只会存储到其他的Broker主备节点上,直到宕机的主节点重新启动了才会接收新数据。至少截止到v3.2.4版本,RocketMQ还未能支持主备自动切换功能。
4.1、单个 Master
返种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
4.2、多 Master 模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
4.3、多 Master 多 Slave 模式,异步复制
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master 宕机、磁盘损坏等情况,会丢失少量消息。
4.4、多 Master 多 Slave 模式,同步双写
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,主备都写成功,才会向应用返回成功。
优点:数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
五、顺序消费
普通模式下,使用传统的send发送消息即可,比如2.2里的示例代码,但是这种模式下不能保证消息消费顺序的一致性。假如我们在网购的时候,需要下单,那么下单需要有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成,也就是这个三个环节要有顺序,这个订单才有意义,这种场景下就需要顺序消费。
世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!
那通过RocketMQ怎么实现顺序消费的呢?
答:需要顺序消费的消息在生成端必须发送到同一个主题的同一个队列中(一个主题默认4个队列),比如创建订单1、订单1付款,订单1完成这三条消息就需要在同一个队列中,创建订单2、订单2付款,订单2完成这三条消息也需要在同一队列中,但订单1和订单2的队列可以不是同一个队列。然后消费端消费时必须实现MessageListenerOrderly接口以保证一个队列只会被同一个消费端的一个线程所消费,因为队列先进先出的原则,就可以保证顺序消费了。
比如有1个生产端和2个消费端,要保证顺序消费,示例代码如下:
5.1、生产者
public class Producer {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
// 主题:TopicOrderTest,标签:order_1,KEY:"KEY" + i,消息内容:"order_1 " + i
Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg; //arg就是producer.send方法的最后一个参数,这里是0
int index = id % mqs.size(); //队列数量没有事先设置那就是4,0%4=0
return mqs.get(index); //返回下标为0的队列,即这5条消息存放在0号队列中
}
}, 0);
System.out.println(sendResult);
}
for (int i = 1; i <= 5; i++) {
Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index); //返回下标为1的队列,即这5条消息存放在1号队列中
}
}, 1);
System.out.println(sendResult);
}
for (int i = 1; i <= 5; i++) {
Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index); //返回下标为2的队列,即这5条消息存放在2号队列中
}
}, 2);
System.out.println(sendResult);
}
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.2、消费者1
public class Consumer1 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");
/**
* 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列
* 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}
5.3、消费者2
public class Consumer2 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");
/**
* 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列
* 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer2 Started.");
}
}
先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息。
Consumer1消费情况如图,都按照顺序执行了
Consumer2消费情况如图,也都按照顺序执行了
六、事务消费
考虑生活中的场景:我们去北京庆丰包子铺吃炒肝,先去营业员那里付款(Action1),拿到小票(Ticket),然后去取餐窗口排队拿炒肝(Action2)。思考2个问题:第一,为什么不在付款的同时,给顾客炒肝?如果这样的话,会增加处理时间,使得后面的顾客等待时间变长,相当于降低了接待顾客的能力(降低了系统的QPS)。第二,付了款,拿到的是Ticket,顾客为什么会接受?从心理上说,顾客相信Ticket会兑现炒肝。事实上也是如此,就算在最后炒肝没了,或者断电断水(系统出现异常),顾客依然可以通过Ticket进行退款操作,这样都不会有什么损失!(虽然这么说,但是实际上包子铺最大化了它的利益,如果炒肝真的没了,浪费了顾客的时间,不过顾客顶多发发牢骚,最后接受)
生活已经告诉我们处理分布式事务,保证数据最终一致性的思路!这个Ticket(凭证)其实就是消息!
通过RocketMQ可以实现分布式事务,比如银行A向银行B转账,银行A扣款1000,那银行B一定要加1000才行,通过RocketMQ的执行逻辑如下:
如上图所示,消息数据独立存储,业务和消息解耦,实质上消息的发送有2次,一条是转账消息,另一条是确认消息。发送转账消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息。
到这里,我们先来看看基于RocketMQ的代码:
6.1、消费者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTransactionTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("transaction_Consumer Started.");
}
}
6.2、生产者
6.2.1、生产者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
// 事务回查最小并发数
producer.setCheckThreadPoolMinSize(2);
// 事务回查最大并发数
producer.setCheckThreadPoolMaxSize(2);
// 队列数
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 1; i <= 2; i++) {
try {
Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,
("Hello RocketMQ " + i).getBytes());
//发送消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.println(sendResult);
Thread.sleep(10);
} catch (MQClientException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
6.2.2、执行本地事务
TransactionExecuterImpl类用于执行本地事务如下:
public class TransactionExecuterImpl implements LocalTransactionExecuter {
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
System.out.println("执行本地事务msg = " + new String(msg.getBody()));
System.out.println("执行本地事务arg = " + arg);
String tags = msg.getTags();
if (tags.equals("transaction2")) {
System.out.println("======我的操作============,失败了 -进行ROLLBACK");
return LocalTransactionState.ROLLBACK_MESSAGE; //返回失败并发送回滚消息
}
return LocalTransactionState.COMMIT_MESSAGE; //返回成功并发送确认消息
// return LocalTransactionState.UNKNOW;
}
}
6.2.3、针对未决事务,MQ服务器回查客户端
如果因网络问题最后发送确认消息给MQ失败了或者发送了LocalTransactionState.UNKNOW,那事务就一直没能完成,一直处于prepared状态,针对未决事务,MQ服务器会回查客户端看看到底有没有完成(目前已经被阉割啦),这时会调用TransactionCheckListener接口,所以TransactionCheckListenerImpl类实现了这个接口用于回查,代码如下:
public class TransactionCheckListenerImpl implements TransactionCheckListener {
//在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));
// return LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.UNKNOW;
}
}
producer端发送数据到MQ,并且处理本地事物,这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据,第二个数据失败了,不会被消费。
因为MQ回查客户端的功能被阿里去除了,导致即使返回了LocalTransactionState.UNKNOW,TransactionCheckListenerImpl里的代码也不会被触发,所以目前事务回查这部分需要自己设计实现。
七、参考文章
RocketMQ重点原理讲解:https://www.jianshu.com/p/453c6e7ff81c
此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!