分布式事务-RocketMq
1.rocketmq版本:4.4.0
2.rocketmq集群部署图
rocketmq消息事务流程图:
4.rocketmq通过docker快速搭建,可参考:https://blog.csdn.net/leadseczgw01/article/details/106939085。
5.项目代码:https://github.com/kickTec/springCloudDemo/tree/transaction-rocketmq
项目结构:
eureka-server: 服务注册与发现。
hello: 生产者,添加用户信息。
feign-consumer: 消费者,修改用户信息。
rockertmq: 需要namesrc+broker。
6.数据库:
用户表:
CREATE TABLE `user` (
`user_id` varchar(30) NOT NULL,
`name` varchar(50) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
消息表:
CREATE TABLE `message_tx` (
`message_id` varchar(40) NOT NULL COMMENT '消息ID',
`mq_msg_id` varchar(50) DEFAULT NULL COMMENT 'mq消息ID',
`topics` varchar(30) DEFAULT NULL COMMENT '主题',
`tags` varchar(30) DEFAULT NULL COMMENT '标签',
`keys` varchar(30) DEFAULT NULL COMMENT '消息关键词',
`message_body` varchar(200) DEFAULT NULL COMMENT '消息主体',
`service_type` int(11) DEFAULT NULL COMMENT '业务类型',
`service_id` varchar(40) DEFAULT NULL COMMENT '业务ID',
`relate_id` varchar(40) DEFAULT NULL COMMENT '关联ID',
`message_status` int(11) DEFAULT NULL COMMENT '消息状态(1、初始化 2、发送成功 3、本地异常(发送失败) 4、消费成功 5、消费失败 6、已死亡 7、已删除)',
`remark` varchar(50) DEFAULT NULL COMMENT '备注',
`modify_date` datetime DEFAULT NULL COMMENT '修改时间',
`create_date` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
7.关键代码:
hello服务保存用户信息方法:先将保存用户信息的消息持久化到本地数据库(在第二个方法中持久化的),防止由于网络或其它原因导致不能发送,而造成业务失败(真实场景可补充处理这部分数据);再向rocketmq发送半消息(需要实现监听器方法,一个是执行本地事务的方法,一个是回查本地事务执行情况的方法),类似于2PC,只有当收到mq的确认消息后,才会执行本地事务,成功直接提交半消息,mq后续会投递该消息;本地执行事务失败后,rollback该半消息;若是本地事务完成后迟迟未返回commit或rollback,mq会通过回查接口,查询本地事务情况。
public int saveUserByMsgTx(String userId, String name, int age) throws Exception{
// 封装消息数据
JSONObject userJson = new JSONObject();
userJson.put("userId", userId);
userJson.put("name", name);
userJson.put("age", age);
userJson.put("messageId", System.currentTimeMillis()+"user");
userJson.put("serviceType", 1); // 业务类型 1 添加用户
userJson.put("serviceId", userId); // 业务id
Message message = new Message("kenick", "2020", "KEY2020", userJson.toJSONString().getBytes());
// 发送半消息,在回调方法中收到broker的确认信息后,再执行业务操作
rocketMqService.sendTransactionMessage(message, new TransactionListener() {
// rocketmq broker回调方法,标识该半消息已收到,开始执行业务操作
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
logger.debug("rocketmq确认的半消息,信息{}", msg.toString());
JSONObject msgJson = JSON.parseObject(new String(msg.getBody()));
String messageId = msgJson.getString("messageId");
try{
// 通过spring代理调用真正的业务操作方法,这样会触发方法上的本地事务注解
User saveUser = JSONObject.parseObject(msgJson.toJSONString(), User.class);
IUserService userService = applicationContext.getBean(IUserService.class);
userService.saveUser(saveUser.getUserId(), saveUser.getName(), saveUser.getAge()); // 保存用户业务方法
logger.debug("本地事务执行完毕!");
unusualMapper.updateMessageTxStatus(messageId, 2); // 修改消息状态为已发送
if(saveUser.getAge() == 28){ // 返回未知,触发后续回查本地事务状态
logger.debug("触发本地事务返回未知状态条件!");
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE; // 本地执行无误后,提交半消息
}catch (Exception e){
logger.debug("本地事务执行异常,消息:{},异常:{}", msg.toString(), e.getMessage());
unusualMapper.updateMessageTxStatus(messageId, 3); // 修改消息状态为发送失败,本地事务异常,可后续查看及处理
return LocalTransactionState.ROLLBACK_MESSAGE; // 出现异常,回滚半消息
}
}
// rocketmq回查本地事务 当rocketmq收到并确认半消息后,由于本地业务操作异常等情况,造成未commit/rollback该半消息
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
logger.debug("开始回查本地事务! 消息:{}", msg);
JSONObject msgJson = JSON.parseObject(new String(msg.getBody()));
String messageId = msgJson.getString("messageId");
Map<String, Object> objectMap = unusualMapper.selectMessageTxById(messageId);
logger.debug("查询本地消息表结果为:{}", objectMap);
String messageStatus = objectMap.get("message_status").toString();
if("2".equals(messageStatus)){
return LocalTransactionState.COMMIT_MESSAGE; // 本地事务正常完成,commit
}else if("3".equals(messageStatus)){
return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务异常,rollback,消息会被撤销
}else{
return LocalTransactionState.UNKNOW; // 本地执行失败等情况,rocketmq暂不处理消息,等待本地事务执行
}
}
});
return 1;
}
@Override
public TransactionSendResult sendTransactionMessage(Message message, TransactionListener transactionListener) {
TransactionSendResult transactionSendResult = null;
try{
// 发送半消息前,持久化消息数据,若发送失败,可尝试重新发送
String messageBody = new String(message.getBody());
JSONObject msgBody = JSON.parseObject(messageBody);
String messageId = msgBody.getString("messageId");
if(StringUtils.isNotBlank(messageId)){
unusualMapper.insertMessageTx(messageId, message.getTopic(), message.getTags(),
message.getKeys(), messageBody, msgBody.getInteger("serviceType"), msgBody.getString("serviceId"),
msgBody.getString("relateId"), 1);
}
// 向rocketmq发送消息
TransactionMQProducer transactionMQProducer = rocketMqProducer.getTransactionMqProducer(transactionListener);
transactionSendResult = transactionMQProducer.sendMessageInTransaction(message, transactionMQProducer.getTransactionListener());
logger.debug("发送消息结果:{}", transactionSendResult);
// 发送完消息后,将本地消息关联mq消息id
if(transactionSendResult != null && StringUtils.isNotBlank(transactionSendResult.getMsgId())){
unusualMapper.updateMessageTxMqId(messageId, transactionSendResult.getMsgId());
}
}catch (Exception e){
logger.debug("rocketmq发送事务消息异常:{}", e);
}
return transactionSendResult;
}
8.消费者代码:
// spring创建该类后,立刻开始订阅消息
@PostConstruct
public void initMethod(){
try {
DefaultMQPushConsumer defaultMQPushConsumer = rocketMqProducer.getDefaultMQPushConsumer();
defaultMQPushConsumer.setConsumerGroup("hello-consumer"); // 设置消费组
defaultMQPushConsumer.subscribe("kenick","2020"); // 订阅指定topic tag的消息
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10); // 批量消费最大数量
// 注册消息消费监听器
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
logger.debug("本次共收到{}条消息", msgList.size());
for (MessageExt messageExt: msgList) { // 批量消费
String jsonStr = new String(messageExt.getBody());
logger.debug("接收到消息: " + jsonStr);
JSONObject msgJson = JSON.parseObject(jsonStr);
String messageId = msgJson.getString("messageId");
try{
// 重复消息处理 或 幂等处理
Map<String, Object> msgMap = unusualMapper.selectMessageTxById(messageId);
String messageStatus = msgMap.get("message_status").toString();
if("4".equals(messageStatus)){
continue; // 已消费成功,跳过
}
// 消费业务 修改名称
User user = JSON.parseObject(jsonStr, User.class);
user.setName(user.getName()+"-consumer");
if(user.getAge() == 30){
throw new RuntimeException("age 30消费发生异常!");
}
userService.updateUser(user);
// 消费完成后,修改消费记录状态
unusualMapper.updateMessageTxStatus(messageId, 4); // 修改消费记录状态 成功
}catch (Exception e){
logger.debug("消费发生异常!", e);
unusualMapper.updateMessageTxStatus(messageId, 5); // 修改消费记录状态 失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 只要有一个失败了就停止
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 全部成功,才算成功
});
defaultMQPushConsumer.start();
} catch (Exception e) {
logger.debug("rocketmq消费者订阅发生异常!");
}
}
下一篇: 19/01/24 22:39:30 WARN ipc.Client: Failed to connect to server: cluster-6/192.168.79.136:8485: 的解决办法
推荐阅读
-
Python多进程入门、分布式进程数据共享实例详解
-
spring整合atomikos实现分布式事务的方法示例
-
mongo分布式锁Java实现方法(推荐)
-
mongodb分布式部署(mongodb三种部署方式)
-
SQL Server误区30日谈 第28天 有关大容量事务日志恢复模式的误区
-
Android 用SQLite实现事务的方法
-
用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
-
分布式幂等问题解决方案三部曲
-
springcloud学习之路: (五) springcloud集成SpringCloudConfig分布式配置中心
-
MySQL 系列(三)你不知道的 视图、触发器、存储过程、函数、事务、索引、语句