从安装到使用springboot集成RocketMq4.5.2
程序员文章站
2023-12-04 18:23:40
1.安装RocketMq使用 Docker 安装软件(不会Docker?先去学!!!贼好用!!!)docker pull foxiswho/rocketmq:serverdocker pull foxiswho/rocketmq:brokerdocker pull styletang/rocketmq-console-ng2.配置、启动Mq启动nameserverdocker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:s...
1.安装RocketMq
使用 Docker 安装软件(不会Docker?先去学!!!贼好用!!!)
docker pull foxiswho/rocketmq:server
docker pull foxiswho/rocketmq:broker
docker pull styletang/rocketmq-console-ng
2.配置、启动Mq
启动nameserver
docker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:server
启动broker
注意:
1) broker中/etc/rocketmq下没有broker.conf 需要自己挂载文件
2) broker默认对外开放的ip为内网ip,需手动修改配置文件,添加brokerIP1 = xxx.xxx.xxx.xxx
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
* 1)解决方法:自己建个broker.conf文件将上面内容拷贝进去,上面为broker默认配置文件
* 2)解决方法:在配置文件最上面加上 brokerIP1 = xxx.xxx.xxx.xxx
store为数据文件
docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/logs:/root/logs -v `pwd`/store:/root/store -v `pwd`/broker.conf:/etc/rocketmq/broker.conf --name rmqbroker --link rmqserver:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m" foxiswho/rocketmq:broker
3.可视化工具
docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv\
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876\
-Dcom.rocketmq.sendMessageWithVIPChannel=false"\
-t styletang/rocketmq-console-ng
访问 ip:8180,及可进入下图页面,然后可切换中文界面
4.SpringBoot 集成
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
# RocketMQ YML配置
rocketmq:
#nameserver集群地址用;隔开
name-server: 192.168.1.169:9876
#生产者参数配置
producer:
group: my-group
sendMessageTimeout: 300000
compress-message-body-threshold: 4096
max-message-size: 4194304
retry-times-when-send-async-failed: 0
retry-next-server: true
retry-times-when-send-failed: 2
生产者
我是使用测试类启动的,请自行修改
import com.zz.rocketmq.CloudRocketmqApplication;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Description: 消息生产者
* @Author: zhao
* @Date: 2020-07-02
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CloudRocketmqApplication.class)
public class MqProduceTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static String topic = "zz_topic";
/**
* 发送同步消息
*/
@Test
public void syncSend() {
//第一个参数是发送的目的地,一般是放topic,也可以放topic:tag;第二个参数是消息
SendResult sendResult = rocketMQTemplate.syncSend(topic, "Hello, sychronized message!");
System.out.println("同步消息的结果:" + sendResult);
}
/**
* 发送异步消息
*/
@Test
public void asyncSend() {
//第一个参数是发送的目的地,一般是放topic,也可以放topic:tag;第二个参数是消息;
//第三个参数是异步消息发送结果的回调
rocketMQTemplate.asyncSend(topic, "Hello, asychronized message!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步消息发送成功,发送结果:"+sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("异步消息发送失败,消息回调");
}
});
}
@Test
public void sendMsg() {
//发送oneway消息
//oneway消息就是只管发送,不管发送的结果如何
rocketMQTemplate.sendOneWay(topic,"Hello, oneway message!");
//发送带有tag的消息
SendResult tagResult1 = rocketMQTemplate.syncSend(topic + ":test1", "Hello, tags:test1 message!");
System.out.println("带有tag:test1的消息发送结果:"+tagResult1);
rocketMQTemplate.convertAndSend(topic, "hell aaa");
}
}
消费者
package com.zz.rocketmq;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @Description: 消息消费者
* @Author: zhao
* @Date: 2020-07-02
**/
@Service
@RocketMQMessageListener(topic = "zz_topic",consumerGroup = "my_consumer")
public class Consumer implements RocketMQListener<MessageExt> {
/**
* 注意: topic 要和发送者的一致,不然收不到
* MessageExt 限定数据类型,可改为RocketMQListener<String>
* @param messageExt
*/
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到了消息:"+new String(messageExt.getBody()));
}
}
要注意编码规范哦,如:topic 常量应定义在配置类中或配置文件中,以便管理
~对你有帮助的话点个赞呗。
~有天津的小伙伴吗,一起交流下呀。
本文地址:https://blog.csdn.net/qq_39035267/article/details/107087572