spring-boot 2.3.x 整合rocketmq
spring-boot 2.3.x 整合rocketmq
文章目录
本地项目的基础环境
环境 | 版本 |
---|---|
jdk | 1.8.0_201 |
maven | 3.6.0 |
Spring-boot | 2.3.3.RELEASE |
1、rocketMq的安装(docker形式)
这里使用docker,做一个快速的单机版本安装,需要更详细的其他形式的安装,可以查看其他的相关资料或者官网地址
https://github.com/apache/rocketmq-docker
《docker环境下安装rockermq以及rockermq-console》
1.1、docker-compose.yml
version: '3'
services:
namesrv:
image: apacherocketmq/rocketmq
container_name: namesrv
ports:
- 9876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
command: sh mqnamesrv
broker:
image: apacherocketmq/rocketmq
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./data/broker/broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
depends_on:
- namesrv
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
depends_on:
- namesrv
2、构建一个rocketMq的项目
主要是导入rocketmq-spring-boot-starter
的包,以及spring-boot-starter-web
的包;
导入web的包,是再消费的时候,没有守护线程,程序启动后,就会自动退出,导入web包后,tomcat容器启动,消费的线程就不退出了;
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.badger</groupId>
<artifactId>badger-spring-boot-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>badger-spring-boot-rocketmq</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.1、定义yml配置文件
rocketmq:
name-server: localhost:9876
producer:
group: test-producer-group
org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.class
具体更可以参看配置类
3.2、生产者测试代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { RocketmqApplicaltion.class })
public class TestApp {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void syncSend() {
for (int i = 0; i < 100; i++) {
// 同步消息
Message<String> bashMessage = new GenericMessage<String>("test_producer" + i);
SendResult syncSend = rocketMQTemplate.syncSend("test_producer", bashMessage);
System.out.println(syncSend);
}
}
@Test
public void asyncSend() {
for (int i = 0; i < 100; i++) {
// 异步消息
Message<String> message = new GenericMessage<String>("test_producer" + i);
rocketMQTemplate.asyncSend("test_producer", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败");
}
});
}
}
@Test
public void sendOneWay() {
for (int i = 0; i < 100; i++) {
// 单向发送消息
Message<String> message = new GenericMessage<String>("test_producer" + i);
rocketMQTemplate.sendOneWay("test_producer", message);
System.out.println("只发送一次");
}
}
@Test
public void syncSendOrder() {
// 发送有序消息
String[] tags = new String[] { "TagA", "TagC", "TagD" };
for (int i = 0; i < 10; i++) {
// 加个时间前缀
Message<String> message = new GenericMessage<String>("我是顺序消息" + i);
SendResult sendResult = rocketMQTemplate.syncSendOrderly("test_producer:" + tags[i % tags.length], message,
i + "");
System.out.println(sendResult);
}
}
}
注意:
1、跟原生rocketmq Api对比,rocketMQ start 的做了二次的封装,把同步异步发送消息,用方法名称做了区别;相同的是,无论是原生的api还是二次封装的api,异步调用的时候,回调是在参数体里的,毕竟异步发送需要等待回调,而同步发送可以只有回调。
2、顺序消息:严格顺序消息模式下,消费者收到的所有消息均是有顺序的
发送消息的时候,消息被存储在MessageQueue
队列里的,默认的时候,是4个队列;为了保证消息的顺序,是需要把相同业务的数据按照顺序写入对应的队列中,单个队列下,数据是严格有序的;
rocketMQ start 对原生api做了二次封装,提供了默认的MessageQueue
选择器,用的字符串的hash算法实现的,如果不满足实际需求,需要重写选择器。
3.3、定义消费端代码
@Component
@RocketMQMessageListener(topic = "test_producer", consumerGroup = "test_consumer-group")
public class DemoConsumer implements RocketMQListener<String> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(String message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
注意:
1、消费端的注解@RocketMQMessageListener
属性中consumerGroup
,多个消费端(消费集群)消费同一个topic的时候,需要定义成一致;
2、消费端消费的时候,是会多线程的形式消费topic里的4个MessageQueue的,如果要消费顺序消息,需要指定属性consumeMode
为ConsumeMode.ORDERLY
,表示同步消费;
3.4、主启动类
@SpringBootApplication
public class RocketmqApplicaltion {
public static void main(String[] args) throws Exception {
SpringApplication.run(RocketmqApplicaltion.class, args);
}
}
详细代码也可以参看《码云》
下一篇: springboo2开启http2
推荐阅读
-
spring-boot 2.3.x 整合rocketmq
-
SpringBoot RocketMQ docker整合使用
-
springboot 整合 RocketMQ
-
SpringBoot整合阿里RocketMQ
-
springboot 整合rocketmq
-
SpringBoot RocketMQ 整合使用和监控
-
SpringBoot RocketMQ 整合使用和监控
-
rocketmq(2): springboot 整合rocketmq
-
整合RocketMq提示RemotingTooMuchRequestException: sendDefaultImpl call timeout
-
SpringBoot 整合 RocketMQ 实现消息生产消费(RocketMQTemplate实现)