rocketmq-producer之发送事物消息
rocketmq支持普通消息、顺序消息,此外,还支持事物消息。实现方式是将一个大事务拆分成多个小事物异步执行,事物消息在其中起着桥梁作用。
rocketmq在发送事物消息时,会先发送一个prepared消息,返回消息所在地址。然后再执行本地事物,根据事物执行结果去更新prepared消息状态。消息接收者只能消费消息集群中消息状态为已提交的消息。
事物消息demo:
TransactionMQProducer producer = new TransactionMQProducer("unique_group_name");
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
producer.setTransactionCheckListener(transactionCheckListener);
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start();
Message msg =new Message(......);
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
当确认消息发送失败时,rocketmq会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送者确认,如果此时事物执行成功了是回滚还是继续发送确认消息?rocketmq会调用TransactionCheckListener 的实现类来做出相应的操作。
TransactionExecuterImpl –>本地事物的处理逻辑
发送事物消息源码(伪代码)
public TransactionSendResult sendMessageInTransaction(.....) {
// 1.发送消息
SendResult sendResult = this.send(msg);
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
// 2.如果消息发送成功,处理与消息关联的本地事务单元
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
}
// 3.结束事务
this.endTransaction(sendResult, localTransactionState, localException);
}
本地事物状态LocalTransactionState:
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW
public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, ......){
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionCommitType);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionRollbackType);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionNotType);
break;
default:
break;
}
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, ......);
}
endTransaction从sendResult中获取消息地址,将请求发送至broker,根据localTransactionState更新事物消息的最终状态。
如果endTransaction方法执行失败,导致数据没有发送到broker,broker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是TransactionNotType状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法,最后调用endTransactionOneway让broker来更新消息的最终状态。
发送事物消息时,groupname必须设置,回查时会根据group随机选择一台producer。
转载于:https://blog.csdn.net/qq_36569036/article/details/53448142
推荐阅读
-
RocketMq系列之Producer顺序消息发送源码分析(四)
-
rocketmq-producer之发送事物消息
-
RocketMq系列之Producer普通消息发送(三)
-
《Python核心编程》练习题之2-8:更新上一个练习的解决方案,修改它以使你的聊天服务现在成为全双工模式,意味着通信两端都可以发送并接收消息,并且二者相互独立
-
关于php微信订阅号开发之token验证后自动发送消息给订阅号但是没有消息返回的问题
-
关于php微信订阅号开发之token验证后自动发送消息给订阅号但是没有消息返回的问题,
-
微信订阅号开发之token验证后,自动回复消息功能做好,发送消息
-
微信订阅号开发之token验证后,自动回复消息效能做好,发送消息没有返回
-
微信订阅号开发之token验证后,自动回复消息功能做好,发送消息
-
微信订阅号开发之token验证后,自动回复消息功能做好,发送消息没有返回,token自动回复