欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springboot 整合rocketmq

程序员文章站 2022-07-15 08:03:34
...

1. rocketmq 环境搭建(windows)

1.1 下载rocketmq 客户端

        下载地址为:http://rocketmq.apache.org/release_notes/release-notes-4.3.2/

1.2 配置环境变量

springboot 整合rocketmq

1.3 启动NameServer

       进入mq 的bin 目录下 输入 start mqnamesrv.cmd ,打印出如下日志则表示启动成功
springboot 整合rocketmq

1.4 启动broker

        进入mq 的bin 目录下
修改 文件 runbroker.cmd 最后第二行为set “JAVA_OPT=%JAVA_OPT% -cp “%CLASSPATH%””,然后输入 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 若日志打印如下内容则表示启动成功。
springboot 整合rocketmq
一般情况下 autoCreateTopicEnable 的权限不下放给客户端,topic 应该由管理员创建。

2. 工程创建

       新建springboot 工程 并引入依赖;

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

在applaction.yml 中添加变量

rocketmq:
   namesrvAddr: 127.0.0.1:9876

工程目录如下
springboot 整合rocketmq

3. 添加consumer 与product

3.1 新建消费监听

package com.ambity.rocketmq.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.stereotype.Component;

import java.util.List;
@Component
@Slf4j
public class MyConsumer implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt me = list.get(0);
        try {
            String topic = me.getTopic();
            String tags = me.getTags();
            String keys = me.getKeys();
            String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
            log.info("消费记录为topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
        } catch (Exception e) {
            log.error(e.getMessage());
            // 返回再次消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        // 返回消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

3.2 注册生产者和消费者

package com.ambity.rocketmq.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqBean {

    @Value("${rocketmq.namesrvAddr}")
    private String namesrvAddr;
    @Autowired
    private MyConsumer myConsumer;
    // 注册生产者
    @Bean(initMethod = "start",destroyMethod = "shutdown")
    DefaultMQProducer defaultMQProducer(){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setProducerGroup("example_group_product");
        defaultMQProducer.setNamesrvAddr(namesrvAddr);
        return defaultMQProducer;
    }

    // 注册消费者
    @Bean(initMethod = "start",destroyMethod = "shutdown")
    DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
        defaultMQPushConsumer.setConsumerGroup("test_group_consumer");
        defaultMQPushConsumer.setConsumeThreadMin(10);
//        设置最大的消费线程池
        defaultMQPushConsumer.setConsumeThreadMax(20);
        //设置每个消息最多消费次数
        defaultMQPushConsumer.setMaxReconsumeTimes(3);
        //设置消费的跨度
//        consumer.setConsumeConcurrentlyMaxSpan(10);
        //设置消费的
        //设置从那个地方开始消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //订阅的主题 第一个参数为topic 第二个参数为支持正则表达式的tag ,* 代表所有
        defaultMQPushConsumer.subscribe("mqtest", "*");
        //设置消息的消费模式 MessageModel.CLUSTERING 表示集群模式,MessageModel.BROADCASTING表示广播模式
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        defaultMQPushConsumer.registerMessageListener(myConsumer);
        return defaultMQPushConsumer;
    }

}

3.3 编写测试类

package com.ambity.rocketmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class TestController {

    @Autowired
    private DefaultMQProducer defaultMQProducer;

    @RequestMapping("/product")
    public void product(){
        try {
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("mqtest" ,
                        "TagA" ,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                        /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = defaultMQProducer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
        }catch (Exception e){
            log.error(e.getMessage());
        }
    }
}

启动后,调用接口可以看到日志:
springboot 整合rocketmq

至此,集成完成。