【SpringBoot】【分布式事务】【RocketMQ】整合消息队列,从单机到集群
程序员文章站
2022-06-13 19:04:36
...
一、使用:
一、引入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
二、举例:生产者创建订单---->生产者发送消息----->MQ服务接受消息----->消费者监听消息并减库存
【生产者】:
application.yml
rocketmq: name-server: 192.168.85.128:9876 # rocketMQ地址 producer: group: producer-group-test # 生产者的组名需要和消费者监听consumerGroup一致
业务代码:
@Service public class OrderServiceImpl extends ServiceImpl<OrderMapper, TbOrder> implements OrderService { @Resource private RocketMQTemplate rocketMQTemplate; @Override public void create() { //创建订单--> 发送消息 --> 消息发送成功后调用本地事务提交 --> TbOrder order = new TbOrder(); order.setCount(10); order.setMoney(BigDecimal.valueOf(10)); order.setProductId(1L); order.setStatus(1); order.setUserId(1L); sendMsg(order); } @Override public void sendMsg(TbOrder order){ /** * String txProducerGroup, 生产者分组 * String destination, topic * Message<?> message, 消息 * Object arg 消息参数 */ Message<String> build = MessageBuilder.withPayload(JSONObject.toJSONString(order)).build(); rocketMQTemplate.sendMessageInTransaction("tx-producer-group","txmsg-topic",build , null); } }
创建 ProducerTxmsgListener 并实现 RocketMQLocalTransactionListener:
@Component // txProducerGroup 的值和发送事务消息指定的 txProducerGroup 相同 @RocketMQTransactionListener(txProducerGroup = "txmsg-producer-group") public class ProducerTxmsgListener implements RocketMQLocalTransactionListener { @Resource private OrderService orderService; /** * @Description: 执行本地事务提交 */ @Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class); System.out.println(tbOrder); orderService.save(tbOrder); return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费 }catch (Exception e){ return RocketMQLocalTransactionState.ROLLBACK; //本地事务执行异常,将消息遗弃 } } /** * @Description: 检查本地事务是否执行成功 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class); TbOrder order = orderService.getById(tbOrder.getId()); // 不为null 则表示执行成功 if (order != null){ return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费 } // 执行本地事务发生问题或还没执行完成, UNKNOWN 表示会继续回查 return RocketMQLocalTransactionState.UNKNOWN; } }
【消费者】:
application.yml
rocketmq: name-server: 127.0.0.1:9876 # rocketMQ地址 producer: group: producer-test-group # 生产者的组名需要和消费者监听consumerGroup一致
创建MyListener 并实现 RocketMQListener 接口:
// topic 对应生产者发消息是的topic @RocketMQMessageListener(topic = "test-topic" , consumerGroup = "consumer-group") public class MyListener implements RocketMQListener<String> { @Override public void onMessage(String message) { //执行 减库存业务 如果发生异常,则消息会隔段时间再次消费 System.out.println(message); } }
原理图:
上一篇: 学校教务网验证码识别