欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式事务-RocketMq

程序员文章站 2022-07-14 23:41:13
...

1.rocketmq版本:4.4.0

2.rocketmq集群部署图

分布式事务-RocketMq

rocketmq消息事务流程图:

分布式事务-RocketMq

4.rocketmq通过docker快速搭建,可参考:https://blog.csdn.net/leadseczgw01/article/details/106939085

5.项目代码:https://github.com/kickTec/springCloudDemo/tree/transaction-rocketmq

项目结构:

eureka-server: 服务注册与发现。
hello: 生产者,添加用户信息。
feign-consumer: 消费者,修改用户信息。
rockertmq: 需要namesrc+broker。

6.数据库:

用户表:

CREATE TABLE `user` (
  `user_id` varchar(30) NOT NULL,
  `name` varchar(50) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

消息表:

CREATE TABLE `message_tx` (
  `message_id` varchar(40) NOT NULL COMMENT '消息ID',
  `mq_msg_id` varchar(50) DEFAULT NULL COMMENT 'mq消息ID',
  `topics` varchar(30) DEFAULT NULL COMMENT '主题',
  `tags` varchar(30) DEFAULT NULL COMMENT '标签',
  `keys` varchar(30) DEFAULT NULL COMMENT '消息关键词',
  `message_body` varchar(200) DEFAULT NULL COMMENT '消息主体',
  `service_type` int(11) DEFAULT NULL COMMENT '业务类型',
  `service_id` varchar(40) DEFAULT NULL COMMENT '业务ID',
  `relate_id` varchar(40) DEFAULT NULL COMMENT '关联ID',
  `message_status` int(11) DEFAULT NULL COMMENT '消息状态(1、初始化 2、发送成功 3、本地异常(发送失败) 4、消费成功 5、消费失败 6、已死亡 7、已删除)',
  `remark` varchar(50) DEFAULT NULL COMMENT '备注',
  `modify_date` datetime DEFAULT NULL COMMENT '修改时间',
  `create_date` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

7.关键代码:

hello服务保存用户信息方法:先将保存用户信息的消息持久化到本地数据库(在第二个方法中持久化的),防止由于网络或其它原因导致不能发送,而造成业务失败(真实场景可补充处理这部分数据);再向rocketmq发送半消息(需要实现监听器方法,一个是执行本地事务的方法,一个是回查本地事务执行情况的方法),类似于2PC,只有当收到mq的确认消息后,才会执行本地事务,成功直接提交半消息,mq后续会投递该消息;本地执行事务失败后,rollback该半消息;若是本地事务完成后迟迟未返回commit或rollback,mq会通过回查接口,查询本地事务情况。

    public int saveUserByMsgTx(String userId, String name, int age) throws Exception{
        // 封装消息数据
        JSONObject userJson = new JSONObject();
        userJson.put("userId", userId);
        userJson.put("name", name);
        userJson.put("age", age);
        userJson.put("messageId", System.currentTimeMillis()+"user");
        userJson.put("serviceType", 1); // 业务类型 1 添加用户
        userJson.put("serviceId", userId); // 业务id
        Message message = new Message("kenick", "2020", "KEY2020", userJson.toJSONString().getBytes());

        // 发送半消息,在回调方法中收到broker的确认信息后,再执行业务操作
        rocketMqService.sendTransactionMessage(message, new TransactionListener() {
            // rocketmq broker回调方法,标识该半消息已收到,开始执行业务操作
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                logger.debug("rocketmq确认的半消息,信息{}", msg.toString());
                JSONObject msgJson = JSON.parseObject(new String(msg.getBody()));
                String messageId = msgJson.getString("messageId");
                try{
                    // 通过spring代理调用真正的业务操作方法,这样会触发方法上的本地事务注解
                    User saveUser = JSONObject.parseObject(msgJson.toJSONString(), User.class);
                    IUserService userService = applicationContext.getBean(IUserService.class);
                    userService.saveUser(saveUser.getUserId(), saveUser.getName(), saveUser.getAge()); // 保存用户业务方法
                    logger.debug("本地事务执行完毕!");

                    unusualMapper.updateMessageTxStatus(messageId, 2); // 修改消息状态为已发送

                    if(saveUser.getAge() == 28){ // 返回未知,触发后续回查本地事务状态
                        logger.debug("触发本地事务返回未知状态条件!");
                        return LocalTransactionState.UNKNOW;
                    }

                    return LocalTransactionState.COMMIT_MESSAGE; // 本地执行无误后,提交半消息
                }catch (Exception e){
                    logger.debug("本地事务执行异常,消息:{},异常:{}", msg.toString(), e.getMessage());
                    unusualMapper.updateMessageTxStatus(messageId, 3); // 修改消息状态为发送失败,本地事务异常,可后续查看及处理
                    return LocalTransactionState.ROLLBACK_MESSAGE; // 出现异常,回滚半消息
                }
            }

            // rocketmq回查本地事务  当rocketmq收到并确认半消息后,由于本地业务操作异常等情况,造成未commit/rollback该半消息
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                logger.debug("开始回查本地事务! 消息:{}", msg);
                JSONObject msgJson = JSON.parseObject(new String(msg.getBody()));
                String messageId = msgJson.getString("messageId");
                Map<String, Object> objectMap = unusualMapper.selectMessageTxById(messageId);
                logger.debug("查询本地消息表结果为:{}", objectMap);

                String messageStatus = objectMap.get("message_status").toString();
                if("2".equals(messageStatus)){
                    return LocalTransactionState.COMMIT_MESSAGE; // 本地事务正常完成,commit
                }else if("3".equals(messageStatus)){
                    return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务异常,rollback,消息会被撤销
                }else{
                    return LocalTransactionState.UNKNOW; // 本地执行失败等情况,rocketmq暂不处理消息,等待本地事务执行
                }
            }
        });
        return 1;
    }
    @Override
    public TransactionSendResult sendTransactionMessage(Message message, TransactionListener transactionListener) {
        TransactionSendResult transactionSendResult = null;
        try{
            // 发送半消息前,持久化消息数据,若发送失败,可尝试重新发送
            String messageBody = new String(message.getBody());
            JSONObject msgBody = JSON.parseObject(messageBody);
            String messageId = msgBody.getString("messageId");
            if(StringUtils.isNotBlank(messageId)){
                unusualMapper.insertMessageTx(messageId, message.getTopic(), message.getTags(),
                        message.getKeys(), messageBody, msgBody.getInteger("serviceType"), msgBody.getString("serviceId"),
                        msgBody.getString("relateId"), 1);
            }

            // 向rocketmq发送消息
            TransactionMQProducer transactionMQProducer = rocketMqProducer.getTransactionMqProducer(transactionListener);
            transactionSendResult = transactionMQProducer.sendMessageInTransaction(message, transactionMQProducer.getTransactionListener());
            logger.debug("发送消息结果:{}", transactionSendResult);

            // 发送完消息后,将本地消息关联mq消息id
            if(transactionSendResult != null && StringUtils.isNotBlank(transactionSendResult.getMsgId())){
                unusualMapper.updateMessageTxMqId(messageId, transactionSendResult.getMsgId());
            }
        }catch (Exception e){
            logger.debug("rocketmq发送事务消息异常:{}", e);
        }
        return transactionSendResult;
    }

8.消费者代码:

    // spring创建该类后,立刻开始订阅消息
    @PostConstruct
    public void initMethod(){
        try {
            DefaultMQPushConsumer defaultMQPushConsumer = rocketMqProducer.getDefaultMQPushConsumer();
            defaultMQPushConsumer.setConsumerGroup("hello-consumer"); // 设置消费组
            defaultMQPushConsumer.subscribe("kenick","2020"); // 订阅指定topic tag的消息
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10); // 批量消费最大数量

            // 注册消息消费监听器
            defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
                logger.debug("本次共收到{}条消息", msgList.size());
                for (MessageExt messageExt: msgList) { // 批量消费
                    String jsonStr = new String(messageExt.getBody());
                    logger.debug("接收到消息: " + jsonStr);
                    JSONObject msgJson = JSON.parseObject(jsonStr);
                    String messageId = msgJson.getString("messageId");
                    try{
                        // 重复消息处理 或 幂等处理
                        Map<String, Object> msgMap = unusualMapper.selectMessageTxById(messageId);
                        String messageStatus = msgMap.get("message_status").toString();
                        if("4".equals(messageStatus)){
                            continue; // 已消费成功,跳过
                        }

                        // 消费业务 修改名称
                        User user = JSON.parseObject(jsonStr, User.class);
                        user.setName(user.getName()+"-consumer");
                        if(user.getAge() == 30){
                            throw new RuntimeException("age 30消费发生异常!");
                        }
                        userService.updateUser(user);

                        // 消费完成后,修改消费记录状态
                        unusualMapper.updateMessageTxStatus(messageId, 4); // 修改消费记录状态 成功
                    }catch (Exception e){
                        logger.debug("消费发生异常!", e);
                        unusualMapper.updateMessageTxStatus(messageId, 5); // 修改消费记录状态 失败
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 只要有一个失败了就停止
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 全部成功,才算成功
            });
            defaultMQPushConsumer.start();
        } catch (Exception e) {
            logger.debug("rocketmq消费者订阅发生异常!");
        }
    }