RocketMQ事务消息实现分布式事务
程序员文章站
2022-07-14 23:34:52
...
一、前言
分布式事务详解:https://hucheng.blog.csdn.net/article/details/102975855
分布式事务,我们一般都是强调的最终一致性,而不是强一致性。
RocketMQ
分布式事务执行流程图:
RocketMQ事务消息中概念
Half(Prepare) Message
指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
消息回查
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
事务消息状态
-
TransactionStatus.CommitTransaction
:提交事务,它允许消费者消费消息。 -
TransactionStatus.RollbackTransaction
:回滚事务,它代表消息将被删除,不允许被消费。 -
TransactionStatus.Unknown
:中间状态,它代表需要检查消息队列来确认状态。
二 、RoctetMQ事务消息实现分布式事务
我们举一个扣款的例子,在单体项目中,我们很容易的去控制以下流程的事务:
而后面变成分布式架构后,数据库也可能分库分表,此时我们控制分布式事务很大,那么岂不是单体应用的响应速度大于分布式系统?
这里我们可以将大事务转换为本地事务+异步消息
这里会存在一个问题,如何保证本地事务和异步消息的同时执行或者回滚?
RocketMQ
事务消息即可保证这点:
事务消息producer
的设置:
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法中执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tag = msg.getTags();
//本地事务执行
if (StringUtils.equals("TransactionA", msg.getTags())) {
try{
//执行本地业务代码
}catch(Exception e){
//本地业务代码异常,则也不发送消息
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 对状态为LocalTransactionState.UNKNOW会进行回查
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//这里进行回查处理
return LocalTransactionState.COMMIT_MESSAGE;
}
});
//发送事务消息
SendResult result = producer.sendMessageInTransaction(msg, null);
三 、RoctetMQ事务消息的限制
- 事务消息不支持延时消息和批量消息
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过
Broker
配置文件的transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过N次的话(N= transactioncheckmax
)则Broker
将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener
类来修改这个行为。 - 事务消息将在
Broker
配置文件中的参数transactionMsgTimeout
这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS
来改变这个限制,该参数优先于transactionMsgTimeout
参数。 - 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过
RocketMQ
本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。 - 事务消息的生产者
ID
不能与其他类型消息的生产者ID
共享。与其他类型的消息不同,事务消息允许反向查询、MQ
服务器能通过它们的生产者ID
查询到消费者。