springboot 整合rocketmq
程序员文章站
2022-07-15 08:03:34
...
1. rocketmq 环境搭建(windows)
1.1 下载rocketmq 客户端
下载地址为:http://rocketmq.apache.org/release_notes/release-notes-4.3.2/
1.2 配置环境变量
1.3 启动NameServer
进入mq 的bin 目录下 输入 start mqnamesrv.cmd ,打印出如下日志则表示启动成功
1.4 启动broker
进入mq 的bin 目录下
修改 文件 runbroker.cmd 最后第二行为set “JAVA_OPT=%JAVA_OPT% -cp “%CLASSPATH%””,然后输入 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 若日志打印如下内容则表示启动成功。
一般情况下 autoCreateTopicEnable 的权限不下放给客户端,topic 应该由管理员创建。
2. 工程创建
新建springboot 工程 并引入依赖;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
在applaction.yml 中添加变量
rocketmq:
namesrvAddr: 127.0.0.1:9876
工程目录如下
3. 添加consumer 与product
3.1 新建消费监听
package com.ambity.rocketmq.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class MyConsumer implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt me = list.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("消费记录为topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
} catch (Exception e) {
log.error(e.getMessage());
// 返回再次消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 返回消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
3.2 注册生产者和消费者
package com.ambity.rocketmq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqBean {
@Value("${rocketmq.namesrvAddr}")
private String namesrvAddr;
@Autowired
private MyConsumer myConsumer;
// 注册生产者
@Bean(initMethod = "start",destroyMethod = "shutdown")
DefaultMQProducer defaultMQProducer(){
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setProducerGroup("example_group_product");
defaultMQProducer.setNamesrvAddr(namesrvAddr);
return defaultMQProducer;
}
// 注册消费者
@Bean(initMethod = "start",destroyMethod = "shutdown")
DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
defaultMQPushConsumer.setConsumerGroup("test_group_consumer");
defaultMQPushConsumer.setConsumeThreadMin(10);
// 设置最大的消费线程池
defaultMQPushConsumer.setConsumeThreadMax(20);
//设置每个消息最多消费次数
defaultMQPushConsumer.setMaxReconsumeTimes(3);
//设置消费的跨度
// consumer.setConsumeConcurrentlyMaxSpan(10);
//设置消费的
//设置从那个地方开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅的主题 第一个参数为topic 第二个参数为支持正则表达式的tag ,* 代表所有
defaultMQPushConsumer.subscribe("mqtest", "*");
//设置消息的消费模式 MessageModel.CLUSTERING 表示集群模式,MessageModel.BROADCASTING表示广播模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
defaultMQPushConsumer.registerMessageListener(myConsumer);
return defaultMQPushConsumer;
}
}
3.3 编写测试类
package com.ambity.rocketmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class TestController {
@Autowired
private DefaultMQProducer defaultMQProducer;
@RequestMapping("/product")
public void product(){
try {
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("mqtest" ,
"TagA" ,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = defaultMQProducer.send(msg);
System.out.printf("%s%n", sendResult);
}
}catch (Exception e){
log.error(e.getMessage());
}
}
}
启动后,调用接口可以看到日志:
至此,集成完成。
上一篇: 数组的修饰符