欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring-boot 2.3.x 整合rocketmq

程序员文章站 2022-07-15 10:24:46
...

spring-boot 2.3.x 整合rocketmq


本地项目的基础环境

环境 版本
jdk 1.8.0_201
maven 3.6.0
Spring-boot 2.3.3.RELEASE

1、rocketMq的安装(docker形式)

这里使用docker,做一个快速的单机版本安装,需要更详细的其他形式的安装,可以查看其他的相关资料或者官网地址

https://github.com/apache/rocketmq-docker

《docker环境下安装rockermq以及rockermq-console》

1.1、docker-compose.yml

version: '3'
services:
  namesrv:
    image: apacherocketmq/rocketmq
    container_name: namesrv
    ports:
    - 9876:9876
    volumes:
    - ./data/namesrv/logs:/home/rocketmq/logs
    command: sh mqnamesrv
  broker:
    image: apacherocketmq/rocketmq
    container_name: rmqbroker
    ports:
    - 10909:10909
    - 10911:10911
    - 10912:10912
    volumes:
    - ./data/broker/logs:/home/rocketmq/logs
    - ./data/broker/store:/home/rocketmq/store
    - ./data/broker/broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
    command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
    depends_on:
    - namesrv
  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
    - 8080:8080
    environment:
      JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
    depends_on:
    - namesrv

2、构建一个rocketMq的项目

主要是导入rocketmq-spring-boot-starter的包,以及spring-boot-starter-web的包;

导入web的包,是再消费的时候,没有守护线程,程序启动后,就会自动退出,导入web包后,tomcat容器启动,消费的线程就不退出了;

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.badger</groupId>
    <artifactId>badger-spring-boot-rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>badger-spring-boot-rocketmq</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3.1、定义yml配置文件

rocketmq:
  name-server: localhost:9876
  producer:
    group: test-producer-group

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.class具体更可以参看配置类

3.2、生产者测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { RocketmqApplicaltion.class })
public class TestApp {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void syncSend() {
        for (int i = 0; i < 100; i++) {
            // 同步消息
            Message<String> bashMessage = new GenericMessage<String>("test_producer" + i);
            SendResult syncSend = rocketMQTemplate.syncSend("test_producer", bashMessage);
            System.out.println(syncSend);
        }
    }

    @Test
    public void asyncSend() {
        for (int i = 0; i < 100; i++) {
            // 异步消息
            Message<String> message = new GenericMessage<String>("test_producer" + i);
            rocketMQTemplate.asyncSend("test_producer", message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.println("发送失败");
                }
            });
        }
    }

    @Test
    public void sendOneWay() {
        for (int i = 0; i < 100; i++) {
            // 单向发送消息
            Message<String> message = new GenericMessage<String>("test_producer" + i);
            rocketMQTemplate.sendOneWay("test_producer", message);
            System.out.println("只发送一次");
        }
    }

    @Test
    public void syncSendOrder() {
        // 发送有序消息
        String[] tags = new String[] { "TagA", "TagC", "TagD" };
        for (int i = 0; i < 10; i++) {
            // 加个时间前缀
            Message<String> message = new GenericMessage<String>("我是顺序消息" + i);
            SendResult sendResult = rocketMQTemplate.syncSendOrderly("test_producer:" + tags[i % tags.length], message,
                    i + "");
            System.out.println(sendResult);
        }
    }
}

注意:

1、跟原生rocketmq Api对比,rocketMQ start 的做了二次的封装,把同步异步发送消息,用方法名称做了区别;相同的是,无论是原生的api还是二次封装的api,异步调用的时候,回调是在参数体里的,毕竟异步发送需要等待回调,而同步发送可以只有回调。

2、顺序消息:严格顺序消息模式下,消费者收到的所有消息均是有顺序的

​ 发送消息的时候,消息被存储在MessageQueue队列里的,默认的时候,是4个队列;为了保证消息的顺序,是需要把相同业务的数据按照顺序写入对应的队列中,单个队列下,数据是严格有序的;

rocketMQ start 对原生api做了二次封装,提供了默认的MessageQueue选择器,用的字符串的hash算法实现的,如果不满足实际需求,需要重写选择器。

3.3、定义消费端代码

@Component
@RocketMQMessageListener(topic = "test_producer", consumerGroup = "test_consumer-group")
public class DemoConsumer implements RocketMQListener<String> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(String message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

注意:

​ 1、消费端的注解@RocketMQMessageListener属性中consumerGroup,多个消费端(消费集群)消费同一个topic的时候,需要定义成一致;

2、消费端消费的时候,是会多线程的形式消费topic里的4个MessageQueue的,如果要消费顺序消息,需要指定属性consumeModeConsumeMode.ORDERLY,表示同步消费;

3.4、主启动类

@SpringBootApplication
public class RocketmqApplicaltion {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(RocketmqApplicaltion.class, args);
    }
}

《官方文档github》

《官方文档》

详细代码也可以参看《码云》