RocketMq 本地(Windows)安装配置,客户端启动,简单代码实现
程序员文章站
2022-07-14 23:37:19
...
RocketMq
1、描述
本程序是结合网上一些资料进行整合的,进行对RocketMq的简单的demo,其中包括RocketMq安装,RocketMq客户端查看,以及简单代码实现。
提供网站地址:
SpringBoot(17)---SpringBoot整合RocketMQ:https://www.cnblogs.com/qdhxhz/p/11109696.html
windows下RocketMQ安装部署:https://www.jianshu.com/p/4a275e779afa
RocketMQ可视化管理控制台rocketmq-console-ng:https://www.jianshu.com/p/4a275e779afa
启动本地MQ命令:(1)start mqnamesrv.cmd (2)start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
提供demo代码git: https://github.com/kinber123/rocketMqDemo.git
2、安装流程
本人是通过网络搜索查看到的,个人觉不错,提供网站:
windows下RocketMQ安装部署:https://www.jianshu.com/p/4a275e779afa
3、RocketMQ可视化管理控制台
提供网站:https://www.jianshu.com/p/4a275e779afa
4、实现代码demo
1、添加rocketmq包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
2、JmsConfig(配置类)
连接RocketMQ服务器配置类,这里为了方便直接写成常量。
/**
* \* @author wcy
* \* @date: 2020-05-15 17:42
* \* Description: 类,安装实际开发这里的信息 都是应该写在配置里,来读取,这里为了方便所以写成常量
* \
*/
public class JmsConfig {
/**
* Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
*/
public static final String NAME_SERVER = "127.0.0.1:9876";
/**
* 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
*/
public static final String TOPIC = "topic_family";
}
3、Producer (生产者)
package com.rocketmq.demo.service.simpleness;
import com.rocketmq.demo.config.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
* \* @author wcy
* \* @date: 2020-05-15 17:36
* \* Description: 一般情況下需要程序启动就开始初始化
* \
*/
@Slf4j
@Component
public class SimplenessConsumer {
/**
* 消费者实体对象
*/
private DefaultMQPushConsumer consumer;
/**
* 消费者组
*/
public static final String CONSUMER_GROUP = "test_consumer";
/**
* 通过构造函数 实例化对象
*/
public SimplenessConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅主题和 标签( * 代表所有标签)下信息
consumer.subscribe(JmsConfig.TOPIC, "*");
// //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
try {
for (Message msg : msgs) {
//消费者获取消息 这里只输出 不做后面逻辑处理
String body = new String(msg.getBody(), "utf-8");
log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
}
} catch (UnsupportedEncodingException e) {
log.error("rocketMq error:{}", ExceptionUtils.getStackTrace(e));
// 异常重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
log.info("消费者 启动成功=======");
}
}
4、Consumer (消费者)
package com.rocketmq.demo.service.simpleness;
import com.rocketmq.demo.config.JmsConfig;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.stereotype.Component;
import org.apache.rocketmq.common.message.Message;
/**
* \* @author wcy
* \* @date: 2020-05-15 17:34
* \* Description: 类
* \
*/
@Slf4j
@Component
public class SimplenessProducer {
private String producerGroup = "test_producer";
private DefaultMQProducer producer;
public SimplenessProducer() {
start();
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start() {
//示例生产者
producer = new DefaultMQProducer(producerGroup);
//不开启vip通道 开通口端口会减2
producer.setVipChannelEnabled(false);
//绑定name server
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
// 设置超过多大进行compress压缩
producer.setCompressMsgBodyOverHowmuch(1024 * 10);
// 设置发送失败的尝试次数。
producer.setRetryTimesWhenSendFailed(3);
// 设置如果返回值不是send_ok,是否要重新发送
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
// 设置限制最大的文件大小
producer.setMaxMessageSize(1024*50);
// 设置默认主题对应的队列数
producer.setDefaultTopicQueueNums(4);
// 设置发送超时时间 ms
producer.setSendMsgTimeout(1000);
try {
this.producer.start();
} catch (MQClientException e) {
log.info(ExceptionUtils.getStackTrace(e));
}
}
/**
* 生产者生产方法
* @param topic 主题
* @param tags 标签,用来给消费者进行过滤的
* @param keys 作为key
* @param body 发送的内容
*/
@SneakyThrows
public SendResult producerSendMes(String topic, String tags, String keys, String body) {
Message message = new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送
SendResult sendResult = this.producer.send(message);
return sendResult;
}
/**
* 生产者生产方法
* @param topic 主题
* @param tags 标签,用来给消费者进行过滤的
* @param body 发送的内容
*/
@SneakyThrows
public SendResult producerSendMes(String topic, String tags, String body) {
Message message = new Message(topic, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送
SendResult sendResult = this.producer.send(message);
return sendResult;
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown() {
this.producer.shutdown();
}
}
5、测试
package com.rocketmq.demo.controller;
import com.rocketmq.demo.config.JmsConfig;
import com.rocketmq.demo.service.simpleness.SimplenessProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* \* @author wcy
* \* @date: 2020-05-15 17:30
* \* Description: 类
* \
*/
@Slf4j
@RestController
@RequestMapping(value = "/text")
public class RocketMqController {
@Autowired
private SimplenessProducer producer;
private List<String> mesList;
/**
* 初始化消息
*/
public RocketMqController() {
mesList = new ArrayList<>();
mesList.add("小小");
mesList.add("爸爸");
mesList.add("妈妈");
mesList.add("爷爷");
mesList.add("奶奶");
mesList.add("外公");
mesList.add("外婆");
}
@RequestMapping("/rocketmq")
public Object callback() throws Exception {
//总共发送五次消息
for (String s : mesList) {
//创建生产信息
SendResult sendResult = producer.producerSendMes(JmsConfig.TOPIC, "testtag", ("小小一家人的称谓:" + s));
log.info("输出生产者信息={}",sendResult);
}
return "成功";
}
/**
* 测试是否能访问
* @param test
* @return
* @throws Exception
*/
@RequestMapping("/test")
public Object test(String test) {
return "success "+test;
}
}
6、结果显示
控制台
rocketMq控制台
注:再次感谢网络作者的贡献