Spring Cloud Stream + RocketMq实现事务性消息
上一篇Spring Cloud消息中间件抽象Stream介绍了Spring Cloud Stream的rabbitmq例子,本文介绍Spring Cloud Stream实现事务性消息的例子。
什么是事务性消息
通过场景来看:
生成订单记录 -> MQ -> 增加积分,我们需要保证消息的发送与订单数据的插入要么都成功,要么都失败。
我们是应该先创建订单,还是先发送MQ消息?
1、先发送MQ消息:如果消息发送成功,而订单创建失败,没办法把消息收回来。
2、先创建订单:如果订单创建成功后MQ消息发送失败,抛出异常,因为两个操作在一个事务代码块中,所以订单数据会回滚。
但是网络是不稳定的,如果MQ端确实收到了这条消息,只是返回给客户端的响应丢失了,就出现跟1一样的问题。
这就是事务性消息的需求:本地事务 和 消息的发送 需要具有原子性。
RocketMQ事务性消息原理
RocketMQ支持这种事务性消息,它的主要逻辑分为两个流程:
-
事务消息发送及提交
1、发送 half消息
2、MQ服务端 响应消息写入发送结果
3、根据发送结果执行 本地事务 (如果写入失败,此时half消息 不可见, 本地逻辑不执行)
4、根据本地事务状态执行 Commit 或者 Rollback (Commit操作生成消息索引,消息对消费者 可见 ) -
回查流程:
1、对于长时间没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次 回查
2、Producer收到回查消息,检查回查消息对应的 本地事务状态
3、根据本地事务状态,重新 Commit 或者 Rollback
逻辑时序图:
编码实践
安装并运行rocketmq,可以参考:https://zhuanlan.zhihu.com/p/85500306
github源码地址: https://github.com/guzhangyu/learn-spring-cloud/tree/master/spring-cloud-stream/spring-cloud-stream-transaction-sender
pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<!-- <version>2.2.1.RELEASE</version>-->
</dependency>
application.yaml配置
rocketmq:
name-server: 192.168.2.174:9876
producer:
enable-msg-trace: true
group: DefaultCluster
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test?serverTimeZone=UTC&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: root
mybatis:
type-aliases-package: com.learn.springcloud.dao
mapper-locations: classpath:mybatis/mapper/*.xml
消息发送代码
发送消息的类
import com.learn.springcloud.dao.WebsitesMapper;
import com.learn.springcloud.entity.Websites;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @Author zhangyugu
* @Date 2020/8/13 3:45 下午
* @Version 1.0
*/
@Service
public class RocketMqTestService {
@Autowired
RocketMQTemplate rocketMQTemplate;
@Autowired
WebsitesMapper websitesMapper;
@Transactional
public void testTransaction() {
Websites websites = new Websites();
websites.setAlexa(2);
websites.setCountry("CN");
websites.setName("谷章雨");
websites.setUrl("http://www.baidu.com");
websitesMapper.insert(websites);
String transactionId = "trans-1"; // 事务id
rocketMQTemplate.sendMessageInTransaction("test-group",
"test1",
MessageBuilder.withPayload(new MessageDTO(3, "first message"))
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", 3).build(),
websites
);
System.out.println(" prepare 消息发送成功");
// 这里消息发送只是prepare发送,
// 后面消息队列中prepare成功后,在TestTransactionListener中的executeLocalTransaction的方法中决定是否要提交本地事务
}
}
发送之后用于控制原子性的类
import com.alibaba.fastjson.JSON;
import com.learn.springcloud.entity.Websites;
import com.learn.springcloud.service.MessageDTO;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
/**
* @Author zhangyugu
* @Date 2020/8/13 3:55 下午
* @Version 1.0
*/
@RocketMQTransactionListener(txProducerGroup = "test-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {
/**
* rocketmq 消息发送成功之后,提交本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
MessageDTO messageDTO = JSON.parseObject(new String((byte[])message.getPayload()), MessageDTO.class);
Websites websites = (Websites)o;
System.out.println(String.format("half message\npayload:%s, arg:%s, transactionId:%s", messageDTO, websites, message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID)));
return RocketMQLocalTransactionState.COMMIT;
}
/**
* rocketmq 回查时,告诉它要提交,还是回滚
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// 根据message去查询本地事务是否执行成功,如果成功,则commit
return RocketMQLocalTransactionState.COMMIT;
}
}
运行结果
half message
payload:aaa@qq.com, arg:Websites [Hash = 2050529121, id=35, name=谷章雨, url=http://www.baidu.com, alexa=2, country=CN, serialVersionUID=1], transactionId:trans-1
prepare 消息发送成功
从这个运行结果可以看出,在消息发送之后,收到rocketmq的发送结果通知后才提交的本地事务。
推荐阅读
-
SpringCloud之Spring Cloud Stream:消息驱动
-
Spring Cloud Stream微服务消息框架原理及实例解析
-
Spring Cloud Stream + RocketMq实现事务性消息
-
使用Spring Cloud Stream玩转RabbitMQ,RocketMQ和Kafka
-
Spring Cloud Alibaba - 消息队列(一)(RocketMQ)(介绍(这篇不重要,下一篇很重要,主要是下一篇坑太多了))
-
Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
-
Spring Cloud Alibaba(五)RocketMQ 异步通信实现
-
Spring Cloud 终结篇之消息驱动--stream大集合
-
SpringCloud核心组件之 Spring Cloud Stream消息驱动组件
-
Spring Cloud(6)消息总线 Spring Cloud Bus + Spring Cloud Config 实现全自动刷新集群配置