rocketmq(2): springboot 整合rocketmq
程序员文章站
2022-07-15 08:03:16
...
rocketmq(2): springboot 整合rocketmq
1. 生产者
- 引入依赖
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
- 生产者配置
- springboot 配置文件
rocketmq: producer: groupName: ${spring.application.name} isOnOff: 'on' maxMessageSize: 4096 namesrvAddr: 127.0.0.1:9876 retryTimesWhenSendFailed: 2 sendMsgTimeOut: 3000
- 生产者配置类
package com.miya.system.configure;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Caixiaowei
* @ClassName MQProducerConfigure
* @Description mq producer 配置
* @createTime 2020/10/23 13:52
*/
@Slf4j
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfigure {
private String groupName;
private String namesrvAddr;
/**
* 消息最大值
*/
private Integer maxMessageSize;
/**
* 消息发送超时时间
*/
private Integer sendMsgTimeOut;
/**
* 失败重试次数
*/
private Integer retryTimesWhenSendFailed;
/**
* mq 生成者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
log.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
}
2. 消费者
- 引入依赖
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
- 消费者配置
rocketmq:
consumer:
consumeMessageBatchMaxSize: 1
consumeThreadMax: 32
consumeThreadMin: 5
groupName: ${spring.application.name}
isOnOff: 'on'
namesrvAddr: 127.0.0.1:9876
topics: TestTopic~TestTag;*~*
package com.miya.system.configure;
import com.miya.system.manager.MQConsumeMsgListenerProcessor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Caixiaowei
* @ClassName MQConsumerConfigure
* @Description MQ consumer 配置
* @createTime 2020/10/23 13:58
*/
@Slf4j
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfigure {
private String groupName;
private String namesrvAddr;
private String topics;
// 消费者线程数据量
private Integer consumeThreadMin;
private Integer consumeThreadMax;
private Integer consumeMessageBatchMaxSize;
@Autowired
private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
/**
* mq 消费者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
log.info("defaultConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 设置监听
consumer.registerMessageListener(consumeMsgListenerProcessor);
/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
String[] topicArr = topics.split(";");
for (String tag : topicArr) {
String[] tagArr = tag.split("~");
consumer.subscribe(tagArr[0], tagArr[1]);
}
consumer.start();
log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
} catch (MQClientException e) {
log.error("consumer 创建失败!");
}
return consumer;
}
}
- 消费者监听
package com.miya.system.manager;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @author Caixiaowei
* @ClassName MQConsumeMsgListenerProcessor
* @Description rocketmq 消费者监听
* @createTime 2020/10/23 14:01
*/
@Component
@Slf4j
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
/**
* 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
* @param msgList
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(msgList)) {
log.info("MQ接收消息为空,直接返回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgList.get(0);
log.info("MQ接收到的消息为:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body);
} catch (Exception e) {
log.error("获取MQ消息内容异常{}",e);
}
// TODO 处理业务逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
3. 测试
- 测试用例
@Test
public void test_rocketmq() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message();
message.setTopic("MyTopic");
message.setTags("TestTag");
message.setBody("hello rocketmq".getBytes());
SendResult send = defaultMQProducer.send(message);
System.out.println(send);
}
ps: 控制台是rocketmq-console, rocketmq(1): win10 docker 安装配置
推荐阅读
-
SpringBoot2.x整合Shiro出现cors跨域问题(踩坑记录)
-
SpringBoot2 整合Nacos组件,环境搭建和入门案例详解
-
SpringBoot 2.x 开发案例之 Shiro 整合 Redis
-
玩转 SpringBoot 2 之整合 JWT 上篇
-
玩转 SpringBoot 2 之整合 JWT 下篇
-
玩转 SpringBoot 2 快速整合拦截器
-
基于SpringBoot整合oauth2实现token认证
-
SpringBoot(六) SpringBoot整合Swagger2(自动化生成接口文档)
-
SpringBoot如何优雅的使用RocketMQ
-
SpringBoot 2 快速整合 | 统一异常处理