欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Cloud Stream + RocketMq实现事务性消息

程序员文章站 2022-07-14 22:54:19
...

上一篇Spring Cloud消息中间件抽象Stream介绍了Spring Cloud Stream的rabbitmq例子,本文介绍Spring Cloud Stream实现事务性消息的例子。

什么是事务性消息

通过场景来看:
生成订单记录 -> MQ -> 增加积分,我们需要保证消息的发送与订单数据的插入要么都成功,要么都失败。
Spring Cloud Stream + RocketMq实现事务性消息
我们是应该先创建订单,还是先发送MQ消息?
1、先发送MQ消息:如果消息发送成功,而订单创建失败,没办法把消息收回来。
2、先创建订单:如果订单创建成功后MQ消息发送失败,抛出异常,因为两个操作在一个事务代码块中,所以订单数据会回滚。
但是网络是不稳定的,如果MQ端确实收到了这条消息,只是返回给客户端的响应丢失了,就出现跟1一样的问题。

这就是事务性消息的需求:本地事务 和 消息的发送 需要具有原子性。

RocketMQ事务性消息原理

RocketMQ支持这种事务性消息,它的主要逻辑分为两个流程:
Spring Cloud Stream + RocketMq实现事务性消息

  • 事务消息发送及提交
    1、发送 half消息
    2、MQ服务端 响应消息写入发送结果
    3、根据发送结果执行 本地事务 (如果写入失败,此时half消息 不可见, 本地逻辑不执行)
    4、根据本地事务状态执行 Commit 或者 Rollback (Commit操作生成消息索引,消息对消费者 可见 )

  • 回查流程:
    1、对于长时间没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次 回查
    2、Producer收到回查消息,检查回查消息对应的 本地事务状态
    3、根据本地事务状态,重新 Commit 或者 Rollback

逻辑时序图:
Spring Cloud Stream + RocketMq实现事务性消息

编码实践

安装并运行rocketmq,可以参考:https://zhuanlan.zhihu.com/p/85500306

github源码地址: https://github.com/guzhangyu/learn-spring-cloud/tree/master/spring-cloud-stream/spring-cloud-stream-transaction-sender

pom依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <!--            <version>2.2.1.RELEASE</version>-->
        </dependency>

application.yaml配置

rocketmq:
  name-server: 192.168.2.174:9876
  producer:
    enable-msg-trace: true
    group: DefaultCluster

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/test?serverTimeZone=UTC&useSSL=false&allowPublicKeyRetrieval=true
    username: root
    password: root

mybatis:
  type-aliases-package: com.learn.springcloud.dao
  mapper-locations: classpath:mybatis/mapper/*.xml

消息发送代码

发送消息的类

import com.learn.springcloud.dao.WebsitesMapper;
import com.learn.springcloud.entity.Websites;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @Author zhangyugu
 * @Date 2020/8/13 3:45 下午
 * @Version 1.0
 */
@Service
public class RocketMqTestService {

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Autowired
    WebsitesMapper websitesMapper;

    @Transactional
    public void testTransaction() {
        Websites websites = new Websites();
        websites.setAlexa(2);
        websites.setCountry("CN");
        websites.setName("谷章雨");
        websites.setUrl("http://www.baidu.com");
        websitesMapper.insert(websites);

        String transactionId = "trans-1"; // 事务id
        rocketMQTemplate.sendMessageInTransaction("test-group",
                "test1",
                MessageBuilder.withPayload(new MessageDTO(3, "first message"))
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .setHeader("share_id", 3).build(),
                websites
        );
        System.out.println(" prepare 消息发送成功");
        // 这里消息发送只是prepare发送,
        // 后面消息队列中prepare成功后,在TestTransactionListener中的executeLocalTransaction的方法中决定是否要提交本地事务
    }
}

发送之后用于控制原子性的类

import com.alibaba.fastjson.JSON;
import com.learn.springcloud.entity.Websites;
import com.learn.springcloud.service.MessageDTO;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;

/**
 * @Author zhangyugu
 * @Date 2020/8/13 3:55 下午
 * @Version 1.0
 */
@RocketMQTransactionListener(txProducerGroup = "test-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {

    /**
     * rocketmq 消息发送成功之后,提交本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        MessageDTO messageDTO = JSON.parseObject(new String((byte[])message.getPayload()), MessageDTO.class);
        Websites websites = (Websites)o;
        System.out.println(String.format("half message\npayload:%s, arg:%s, transactionId:%s", messageDTO, websites, message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID)));
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * rocketmq 回查时,告诉它要提交,还是回滚
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 根据message去查询本地事务是否执行成功,如果成功,则commit
        return RocketMQLocalTransactionState.COMMIT;
    }
}

运行结果

half message
payload:aaa@qq.com, arg:Websites [Hash = 2050529121, id=35, name=谷章雨, url=http://www.baidu.com, alexa=2, country=CN, serialVersionUID=1], transactionId:trans-1
 prepare 消息发送成功

从这个运行结果可以看出,在消息发送之后,收到rocketmq的发送结果通知后才提交的本地事务。