RocketMQ & Spring Cloud Stream
一、RocketMQ简介
1. 概述
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
- 削峰填谷: 主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
- 系统解耦: 解决不同重要程度、不同能力级别系统之间依赖导致一死全死
- 提升性能: 当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
- 蓄流压测: 线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
2. RocketMQ
Apache Alibaba RocketMQ 是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。
3. RocketMQ特点
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
- 支持严格的消息顺序
- 支持 Topic 与 Queue 两种模式
- 亿级消息堆积能力
- 比较友好的分布式特性
- 同时支持 Push 与 Pull 方式消费消息
- 历经多次天猫双十一海量消息考验
4. RocketMQ 优势
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:
- 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
- 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
- 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
- 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
- 支持重复消费(RabbitMQ 不支持,Kafka 支持)
5. 消息队列对比参照表
6. 搭建RocketMQ
7. 搭建RocketMQ控制台
8. 基于Docker搭建RocketMQ
-
docker-compose.yml
注意:启动 RocketMQ Server + Broker + Console 至少需要 2G 内存
version: '3.5' services: rmqnamesrv: image: foxiswho/rocketmq:server container_name: rmqnamesrv ports: - 9876:9876 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store networks: rmq: aliases: - rmqnamesrv rmqbroker: image: foxiswho/rocketmq:broker container_name: rmqbroker ports: - 10909:10909 - 10911:10911 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf environment: NAMESRV_ADDR: "rmqnamesrv:9876" JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" command: mqbroker -c /etc/rocketmq/broker.conf depends_on: - rmqnamesrv networks: rmq: aliases: - rmqbroker rmqconsole: image: styletang/rocketmq-console-ng container_name: rmqconsole ports: - 8080:8080 environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - rmqnamesrv networks: rmq: aliases: - rmqconsole networks: rmq: name: rmq driver: bridge
-
borker.conf
RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在
./data/brokerconf/
目录下创建一个名为broker.conf
的配置文件,内容如下:# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 所属集群名字 brokerClusterName=DefaultCluster # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a, # 在 broker-b.properties 使用: broker-b brokerName=broker-a # 0 表示 Master,> 0 表示 Slave brokerId=0 # nameServer地址,分号分割 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP # brokerIP1=192.168.0.253 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=10911 # 删除文件时间点,默认凌晨4点 deleteWhen=04 # 文件保留时间,默认48小时 fileReservedTime=120 # commitLog 每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store # commitLog 存储路径 # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog # 消费队列存储 # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue # 消息索引存储路径 # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index # checkpoint 文件存储路径 # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint # abort 文件存储路径 # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=ASYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # 发消息线程池数量 # sendMessageThreadPoolNums=128 # 拉消息线程池数量 # pullMessageThreadPoolNums=128
-
RocketMQ控制台
9. RocketMQ进阶
二、Spring 消息编程模型
使用Spring消息编程模型整合RocketMQ。
1. 编写生产者
-
导入坐标
如果不指定版本的话,默认用的spring-boot-starter版本是2.0.2,对应的rocketmq版本是4.4.0,和我们下载的rockermq 4.5.1 版本 冲突,所以手动指定spring-boot-starter版本为 2.0.3
<!--rocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
-
添加配置
rocketmq: name-server: 127.0.0.1:9876 producer: #小坑:必须指定group group: test-group
-
编写代码,发送消息
@Autowired private RocketMQTemplate rocketMQTemplate;
//实体类 @Data @Builder @NoArgsConstructor @AllArgsConstructor public class UserAddBonusMsgDTO { /** * 为谁加积分 */ private Integer userId; /** * 加多少积分 */ private Integer bonus; }
//发送消息,让消费者去消费 this.rocketMQTemplate.convertSend("add-bonus", UserAddBonusMsgDTO.builder() .userId(share.getUserId()) .bonus(50).build());
2. 编写消费者
-
导入坐标
如果不指定版本的话,默认用的spring-boot-starter版本是2.0.2,对应的rocketmq版本是4.4.0,和我们下载的rockermq 4.5.1 版本 冲突,所以手动指定spring-boot-starter版本为 2.0.3
<!--rocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
-
添加配置
rocketmq: name-server: 127.0.0.1:9876
-
编写代码
@Service @Slf4j @RocketMQMessageListener(consumerGroup = "consumer-group",topic = "add-bonus") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> { private final UserMapper userMapper; private final BonusEventLogMapper bonusEventLogMapper; /** * 当收到消息的时候执行的业务 * @param message */ @Override public void onMessage(UserAddBonusMsgDTO message) { //1. 为用户加积分 Integer userId = message.getUserId(); Integer bonus = message.getBonus(); User user = this.userMapper.selectByPrimaryKey(userId); user.setBonus(user.getBonus()+bonus); this.userMapper.updateByPrimaryKey(user); //2. 记录日志到bonus_event_log 里面 this.bonusEventLogMapper.insert( BonusEventLog.builder() .userId(userId) .value(bonus) .event("CONTRIBUTE") .createTime(new Date()) .description("投稿加积分") .build()); log.info("积分添加完毕"); } }
3. 事务消息
我们在异步发送消息给消费者,让消费者操作其数据库时,由于发送消息是异步请求的,需要我们保持事务的一致性,即在生产者操作自己的数据库和消费者操作自己的数据库要么同时成功,要么同时失败,不可能让生产者在发生异常后停止了操作自己的数据库,但进行异步发送消息后,消费者取得消息却能操作自己的数据库。所以我们需要实现分布式事务。
- 流程图
3. 通过Spring 编程模型编码实现分布式事务(只需改动生产者的业务代码即可)
-
Service层
package com.banmingi.nodeapp.contentcenter.service; import com.alibaba.fastjson.JSON; import com.banmingi.nodeapp.contentcenter.dao.RocketMQTransactionLogMapper; import com.banmingi.nodeapp.contentcenter.dao.ShareMapper; import com.banmingi.nodeapp.contentcenter.domain.dto.ShareAuditDTO; import com.banmingi.nodeapp.contentcenter.domain.dto.ShareDTO; import com.banmingi.nodeapp.contentcenter.domain.dto.UserDTO; import com.banmingi.nodeapp.contentcenter.domain.dto.messaging.UserAddBonusMsgDTO; import com.banmingi.nodeapp.contentcenter.domain.entity.RocketMQTransactionLog; import com.banmingi.nodeapp.contentcenter.domain.entity.Share; import com.banmingi.nodeapp.contentcenter.domain.enums.AuditStatusEnum; import com.banmingi.nodeapp.contentcenter.feignclient.UserCenterFeignClient; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Objects; import java.util.UUID; /** * @auther 半命i 2020/3/22 * @description */ @Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareService { private final ShareMapper shareMapper; private final RocketMQTemplate rocketMQTemplate; private final RocketMQTransactionLogMapper rocketMQTransactionLogMapper; /** * 审核指定内容 * @param id * @param auditDTO * @return */ public Share auditById(Integer id, ShareAuditDTO auditDTO) { //1. 查询 share 是否存在,不存在或者当前的audit_status != NOT_YET,那么就抛异常 Share share = this.shareMapper.selectByPrimaryKey(id); if (share == null) { throw new IllegalArgumentException("参数非法!该分享不存在!"); } if (!Objects.equals("NOT_YET",share.getAuditStatus())) { throw new IllegalArgumentException("参数非法!该分享已审核通过或未通过!"); } //3. 如果是PASS,那么发送消息给rockerMQ,让用户中心去消费,为发布人添加积分 //异步执行 //3.1 发送半消息 if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) { String transactionId = UUID.randomUUID().toString(); this.rocketMQTemplate.sendMessageInTransaction( "tx-add-bonus-group", //group "add-bonus", //topic MessageBuilder.withPayload( //消息体 UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()) //消息头 有妙用 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("share_id",id) .build(), //arg 有大用处 auditDTO ); } else { //如果是REJECT(审核不通过),直接更新数据库中分享的状态即可 this.auditByIdInDB(id,auditDTO); } return share; } /** * 审核资源,将状态设为PASS/REJECT * @param id * @param auditDTO */ public void auditByIdInDB(Integer id,ShareAuditDTO auditDTO) { Share share = Share.builder() .id(id) .auditStatus(auditDTO.getAuditStatusEnum().toString()) .reason(auditDTO.getReason()) .build(); this.shareMapper.updateByPrimaryKeySelective(share); } /** * 审核资源,将状态设为PASS/REJECT,并记录事务日志 * @param id * @param auditDTO * @param transactionId */ @Transactional(rollbackFor = Exception.class) public void auditByIdWithRocketMQLog(Integer id,ShareAuditDTO auditDTO,String transactionId) { this.auditByIdInDB(id,auditDTO); this.rocketMQTransactionLogMapper.insertSelective( RocketMQTransactionLog.builder() .transactionId(transactionId) .log("审核分享...") .build() ); } }
-
执行本地事务、检测事务状态的类
package com.banmingi.nodeapp.contentcenter.rocketmq; import com.alibaba.fastjson.JSON; import com.banmingi.nodeapp.contentcenter.dao.RocketMQTransactionLogMapper; import com.banmingi.nodeapp.contentcenter.domain.dto.ShareAuditDTO; import com.banmingi.nodeapp.contentcenter.domain.entity.RocketMQTransactionLog; import com.banmingi.nodeapp.contentcenter.service.ShareService; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** * @auther 半命i 2020/4/19 * @description */ //写成和发布半消息一致的group @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusTransactionListener implements RocketMQLocalTransactionListener { private final ShareService shareService; private final RocketMQTransactionLogMapper rocketMQTransactionLogMapper; /** * 执行本地事务. (流程图中第三步) * @param message 发送半消息时构建的 message * @param o //发送半消息的参数 arg * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { //消息头 MessageHeaders headers = message.getHeaders(); //获取 transactionId String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); //获取 share_id //get的时候获取到的是字符串 Integer share_id = Integer.valueOf((String) headers.get("share_id")); //执行本地事务 try { this.shareService.auditByIdWithRocketMQLog(share_id, (ShareAuditDTO)o, transactionId); //本地事务执行成功 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } /** * 本地事务的检查接口.(流程图中第四步) * 检查本地事务的状态. * 这里通过查询事务日志表实现 * @param message 送半消息时构建的 message * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { //消息头 MessageHeaders headers = message.getHeaders(); //获取 transactionId String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); //查询RocketMQ事务日志表是否有记录 RocketMQTransactionLog rocketMQTransactionLog = this.rocketMQTransactionLogMapper.selectOne( RocketMQTransactionLog.builder() .transactionId(transactionId).build()); if (rocketMQTransactionLog != null) return RocketMQLocalTransactionState.COMMIT; return RocketMQLocalTransactionState.ROLLBACK; } }
-
消费者代码不变
@Service @Slf4j @RocketMQMessageListener(consumerGroup = "consumer-group",topic = "add-bonus") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> { private final UserMapper userMapper; private final BonusEventLogMapper bonusEventLogMapper; /** * 当收到消息的时候执行的业务 * @param message */ @Override public void onMessage(UserAddBonusMsgDTO message) { //1. 为用户加积分 Integer userId = message.getUserId(); Integer bonus = message.getBonus(); User user = this.userMapper.selectByPrimaryKey(userId); user.setBonus(user.getBonus()+bonus); this.userMapper.updateByPrimaryKey(user); //2. 记录日志到bonus_event_log 里面 this.bonusEventLogMapper.insert( BonusEventLog.builder() .userId(userId) .value(bonus) .event("CONTRIBUTE") .createTime(new Date()) .description("投稿加积分") .build()); log.info("积分添加完毕"); } }
三、Spring Cloud Stream
1. 概念
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 Spring Boot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration
与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe
、consumer groups
、partition
这些统一的概念。
Spring Cloud Stream 内部有两个概念:
- Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
- Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
2. Spring Cloud Stream整合RocketMQ的坐标
<!-- stream-rocketmq -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3. 编写生产者
-
加注解
在主启动类上加上
@EnableBinding(Source.class)
-
写配置
-
编写代码发送消息
4. 编写消费者
-
加注解
在主启动类上加上
@EnableBinding(Silk.class)
-
写配置
-
编写代码接收消息
5. 自定义接口-发送消息
- 定义接口
2. 主启动类上@Enableinding
注解注册接口
-
加配置
-
编写代码发送消息
-
测试结果
分析原因:
主启动类上Mybatis的
@MapperScan
注解把参数填写的包下面的所有接口扫描到,包括我们自定义的MySource
接口,但MySource
接口并不是Mybatis的配置文件,而我们也没有其xml文件与MySource
接口相对应,所以Mybatis报异常了。解决方法:把
@MapperScan
扫描的包范围缩小,让其扫不到MySource接口即可。
6. 自定义接口-消费消息
-
定义接口
-
主启动类上
@Enableinding
注解注册接口 -
@MapperScan
包扫描的范围也修改一下,避免扫描到自定义接口 -
写配置
-
编写代码消费消息
7. 自定义接口的本质
默认Source和Silk接口是这样的,Source用来发送消息,Silk用来接收消息。
其实还有一个默认接口继承了Source接口和Silk接口,它既可以用来发送消息,也可以用来接收消息。
不难发现,我们自定义的接口和默认的接口大同小异,只是@Input / @Output 注解里面的参数名称不同。(这个参数术语叫channel)
当我们定义好接口之后,然后在@EnableBinding指定了接口之后,就会创建一个名字和@Input / @Output 注解自定义的参数一样的代理,所以在配置文件里面配置的名字也必须以其自定义的参数一样,这样就能通过Spring IOC 通过这个名词去注入。
8. 消息过滤
Spring Cloud Stream 让RocketMQ的消费者对生产者产生的消息进行过滤
9. Spring Cloud Stream的监控
访问 /actuator 路径,可以发现多出了以下一个bindings端点,点进去可以查看bindings的详情信息。
多出了一个channels端点,其实就是我们在@Input / @Output 注解自定义的参数,术语就叫channels。
健康检查:
添加以下配置:
访问/ actuator/health 路径,展示了binder的健康状况。
10. Spring Cloud Stream异常处理
11. Spring Cloud Stream + RocketMQ实现分布式事务01——重构生产者
-
注释掉Spring编程模型整合RocketMQ的配置
-
添加配置
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: # 开启支持本地事务 transactional: true # @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group") 相一致 group: tx-add-bonus-group bindings: output: #用来指定 topic destination: add-bonus # stream-test-topic
-
生产者Service层代码
package com.banmingi.nodeapp.contentcenter.service; import com.alibaba.fastjson.JSON; import com.banmingi.nodeapp.contentcenter.dao.RocketMQTransactionLogMapper; import com.banmingi.nodeapp.contentcenter.dao.ShareMapper; import com.banmingi.nodeapp.contentcenter.domain.dto.ShareAuditDTO; import com.banmingi.nodeapp.contentcenter.domain.dto.ShareDTO; import com.banmingi.nodeapp.contentcenter.domain.dto.UserDTO; import com.banmingi.nodeapp.contentcenter.domain.dto.messaging.UserAddBonusMsgDTO; import com.banmingi.nodeapp.contentcenter.domain.entity.RocketMQTransactionLog; import com.banmingi.nodeapp.contentcenter.domain.entity.Share; import com.banmingi.nodeapp.contentcenter.domain.enums.AuditStatusEnum; import com.banmingi.nodeapp.contentcenter.feignclient.UserCenterFeignClient; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Objects; import java.util.UUID; /** * @auther 半命i 2020/3/22 * @description */ @Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareService { private final ShareMapper shareMapper; private final RocketMQTransactionLogMapper rocketMQTransactionLogMapper; private final Source source; /** * 审核指定内容 * @param id * @param auditDTO * @return */ public Share auditById(Integer id, ShareAuditDTO auditDTO) { //1. 查询 share 是否存在,不存在或者当前的audit_status != NOT_YET,那么就抛异常 Share share = this.shareMapper.selectByPrimaryKey(id); if (share == null) { throw new IllegalArgumentException("参数非法!该分享不存在!"); } if (!Objects.equals("NOT_YET",share.getAuditStatus())) { throw new IllegalArgumentException("参数非法!该分享已审核通过或未通过!"); } //3. 如果是PASS,那么发送消息给rockerMQ,让用户中心去消费,为发布人添加积分 //异步执行 //3.1 发送半消息 if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) { String transactionId = UUID.randomUUID().toString(); /* this.rocketMQTemplate.sendMessageInTransaction( "tx-add-bonus-group", //group "add-bonus", //topic MessageBuilder.withPayload( //消息体 UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()) //消息头 有妙用 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("share_id",id) .build(), //arg 有大用处 auditDTO );*/ this.source.output().send( MessageBuilder.withPayload( //消息体 UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()) //消息头 有妙用 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("share_id",id) //send方法不是传 arg, 但是必须用到,所以我们把arg放到Message的Header中即可 //header中传的对象,在get的时候拿到的是字符串,所以我们把对象转换成Json字符串 .setHeader("dto", JSON.toJSONString(auditDTO)) .build()); } else { //如果是REJECT(审核不通过),直接更新数据库中分享的状态即可 this.auditByIdInDB(id,auditDTO); } return share; } /** * 审核资源,将状态设为PASS/REJECT * @param id * @param auditDTO */ public void auditByIdInDB(Integer id,ShareAuditDTO auditDTO) { Share share = Share.builder() .id(id) .auditStatus(auditDTO.getAuditStatusEnum().toString()) .reason(auditDTO.getReason()) .build(); this.shareMapper.updateByPrimaryKeySelective(share); } /** * 审核资源,将状态设为PASS/REJECT,并记录事务日志 * @param id * @param auditDTO * @param transactionId */ @Transactional(rollbackFor = Exception.class) public void auditByIdWithRocketMQLog(Integer id,ShareAuditDTO auditDTO,String transactionId) { this.auditByIdInDB(id,auditDTO); this.rocketMQTransactionLogMapper.insertSelective( RocketMQTransactionLog.builder() .transactionId(transactionId) .log("审核分享...") .build() ); } }
-
执行本地事务、检测事务状态的类
package com.banmingi.nodeapp.contentcenter.rocketmq; import com.alibaba.fastjson.JSON; import com.banmingi.nodeapp.contentcenter.dao.RocketMQTransactionLogMapper; import com.banmingi.nodeapp.contentcenter.domain.dto.ShareAuditDTO; import com.banmingi.nodeapp.contentcenter.domain.entity.RocketMQTransactionLog; import com.banmingi.nodeapp.contentcenter.service.ShareService; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** * @auther 半命i 2020/4/19 * @description */ //写成和发布半消息一致的group @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusTransactionListener implements RocketMQLocalTransactionListener { private final ShareService shareService; private final RocketMQTransactionLogMapper rocketMQTransactionLogMapper; /** * 执行本地事务. (流程图中第三步) * @param message 发送半消息时构建的 message * @param o //发送半消息的参数 arg * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { //消息头 MessageHeaders headers = message.getHeaders(); //获取 transactionId String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); //获取 share_id //get的时候获取到的是字符串 Integer share_id = Integer.valueOf((String) headers.get("share_id")); //获取auditDTO String dtoString = (String) headers.get("dto"); ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class); //执行本地事务 try { this.shareService.auditByIdWithRocketMQLog(share_id, auditDTO,transactionId); //本地事务执行成功 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } /** * 本地事务的检查接口.(流程图中第四步) * 检查本地事务的状态. * 这里通过查询事务日志表实现 * @param message 送半消息时构建的 message * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { //消息头 MessageHeaders headers = message.getHeaders(); //获取 transactionId String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); //查询RocketMQ事务日志表是否有记录 RocketMQTransactionLog rocketMQTransactionLog = this.rocketMQTransactionLogMapper.selectOne( RocketMQTransactionLog.builder() .transactionId(transactionId).build()); if (rocketMQTransactionLog != null) return RocketMQLocalTransactionState.COMMIT; return RocketMQLocalTransactionState.ROLLBACK; } }
12. Spring Cloud Stream + RocketMQ实现分布式事务02——重构消费者
-
注释掉Spring编程模型整合RocketMQ的配置
-
更改配置文件topic
-
删除掉Spring 消息模型 消费消息的那个类
-
重写类消费消息
package com.banmingi.nodeapp.usercenter.rocketmq; import com.banmingi.nodeapp.usercenter.domain.dto.UserAddBonusMsgDTO; import com.banmingi.nodeapp.usercenter.service.UserService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; /** * @auther 半命i 2020/4/19 * @description */ @Service @Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusStreamConsumer { private final UserService userService; @StreamListener(Sink.INPUT) public void receive(UserAddBonusMsgDTO message) { this.userService.addBonus(message); } }
//这个方法在消费者Service里面,因为操作了两个表,需要添加@Transactional注解 /** * 加积分、记录日志 * @param message */ @Transactional(rollbackFor = Exception.class) public void addBonus(UserAddBonusMsgDTO message) { //1. 为用户加积分 Integer userId = message.getUserId(); Integer bonus = message.getBonus(); User user = this.userMapper.selectByPrimaryKey(userId); user.setBonus(user.getBonus()+bonus); this.userMapper.updateByPrimaryKey(user); //2. 记录日志到bonus_event_log 里面 this.bonusEventLogMapper.insert( BonusEventLog.builder() .userId(userId) .value(bonus) .event("CONTRIBUTE") .createTime(new Date()) .description("投稿加积分") .build()); log.info("积分添加完毕"); }
13. Spring Cloud Stream知识盘点
推荐阅读
-
spring cloud 之 Feign 使用HTTP请求远程服务的实现方法
-
Spring Cloud 配置中心内容加密的配置方法
-
Spring Cloud 动态刷新配置信息教程详解
-
详解使用spring boot admin监控spring cloud应用程序
-
使用Spirng Boot Admin监控Spring Cloud应用项目
-
Spring Cloud中Eureka开启密码认证的实例
-
使用IntelliJ IDEA 2017.2.5 x64中的Spring Initializr插件快速创建Spring Boot/Cloud工程(图解)
-
Spring cloud restTemplate 传递复杂参数的方式(多个对象)
-
详解spring cloud hystrix请求缓存(request cache)
-
详解Spring Cloud Zuul重试机制探秘