SpringBoot 整合 RocketMQ 实现消息生产消费(RocketMQTemplate实现)
有时候我们在使用消息队列的时候,往往需要能够保证消息的顺序消费,而RocketMQ是可以支持消息的顺序消费的。
RocketMQ在发送消息的时候,是将消息发送到不同的队列中,然后消费端从多个队列中读取消息进行消费,很明显,在这种全局模式下,是无法实现顺序消费的。
为了实现顺序消费,我们需要把有顺序的消息按照他的顺序,将他们发送到同一个队列中,这样消费端在消费的时候,就保证了其顺序。
但是顺序消费的性能肯定也相对差一些,因为只能使用一个队列。
一、在pom.xml中添加依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
二、在application.yml中配置RocketMQ地址:
server:
port: 8888
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: ${spring.application.name}
sendMessageTimeout: 300000
备注:官方下载RocketMQ,本地启动RocketMQ。
三、 一个简单的生产消费案例:
生产者:向 stringTopic 的主题中发送一个 Hello RecketMQ 的字符串。
@RestController
@RequestMapping("/mq")
public class ProducerController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/sync/send1")
public String syncSendString() {
//发送一个同步消息,会返回值 ---发送到 stringTopic 主题
SendResult sendResult = rocketMQTemplate.syncSend("stringTopic", "Hello RocketMQ");
return sendResult.toString();
}
}
消费者:监听 stringTopic 主题。
@Service
@RocketMQMessageListener(topic = "stringTopic", consumerGroup = "string_consumer")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者接收消息:" + message);
}
}
1、启动当前服务。
2、用浏览器或者HTTP Client工具访问:http://localhost:8888/mq/sync/send1
3、查看控制台输出:【消费者接收消息:Hello RocketMQ】即表示消息消费成功。
四、实现顺序消费
生产者: 生产多条消息,方便观察顺序。向 orderTopic 主题发送5条消息,内容分别是 no1 no2 no3 no4 no5。第三个参数是order ,他的作用是会根据他的hash值计算发送到哪一个队列。用同一个值order,那么他们的hash一样。可以保证发送到同一个队列里。
@RestController
@RequestMapping("/mq")
public class ProducerController {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**************验证RocketMQ顺序消费***************/
@RequestMapping("/send/ordered")
public String sendOrderedMsg(){
/**
* hashKey: 为了保证报到同一个队列中,将消息发送到orderTopic主题上
*/
rocketMQTemplate.syncSendOrderly("orderTopic","no1","order");
rocketMQTemplate.syncSendOrderly("orderTopic","no2","order");
rocketMQTemplate.syncSendOrderly("orderTopic","no3","order");
rocketMQTemplate.syncSendOrderly("orderTopic","no4","order");
rocketMQTemplate.syncSendOrderly("orderTopic","no5","order");
return "success";
}
}
消费者:消费者在消费的时候,默认是异步多线程消费的,所以无法保证顺序,需要指定同步消费。指定 consumeMode = ConsumeMode.ORDERLY。默认值是 consumeMode = ConsumeMode.CONCURRENT。
@Service
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "ordered-consumer", consumeMode = ConsumeMode.ORDERLY)
public class OrderedConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("顺序消费,收到消息:" + message);
}
}
1、启动当前服务。
2、用浏览器或者HTTP Client工具访问:http://localhost:8888/mq/send/ordered
3、查看控制台输出:【顺序打印:no1 no2 no3 no4 no5】即表示消息消费成功。
本文地址:https://blog.csdn.net/netuser1937/article/details/107692371
下一篇: Android 迎来全新品牌设计
推荐阅读
-
源码分析RocketMQ顺序消息消费实现原理
-
源码分析之RocketMQ Producer生产消息的实现及其设计模式分析
-
SpringBoot 整合 RocketMQ 实现消息生产消费(RocketMQTemplate实现)
-
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
-
springboot 整合 RabbitMQ实现消息队列
-
springboot整合rocketmq实现分布式事务
-
springboot整合rabbitmq实现生产者消息确认、死信交换器、未路由到队列的消息处理
-
springboot整合rabbitmq实现生产者消息确认、死信交换器、未路由到队列的消息处理
-
SpringBoot 整合 RocketMQ 实现消息生产消费(RocketMQTemplate实现)
-
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理