欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

从安装到使用springboot集成RocketMq4.5.2

程序员文章站 2023-12-04 18:23:40
1.安装RocketMq使用 Docker 安装软件(不会Docker?先去学!!!贼好用!!!)docker pull foxiswho/rocketmq:serverdocker pull foxiswho/rocketmq:brokerdocker pull styletang/rocketmq-console-ng2.配置、启动Mq启动nameserverdocker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:s...

1.安装RocketMq

使用 Docker 安装软件(不会Docker?先去学!!!贼好用!!!)

docker pull foxiswho/rocketmq:server
docker pull foxiswho/rocketmq:broker
docker pull styletang/rocketmq-console-ng

从安装到使用springboot集成RocketMq4.5.2

2.配置、启动Mq

启动nameserver

docker run -d -p 9876:9876 --name rmqserver  foxiswho/rocketmq:server

启动broker
注意:
1) broker中/etc/rocketmq下没有broker.conf 需要自己挂载文件
2) broker默认对外开放的ip为内网ip,需手动修改配置文件,添加brokerIP1 = xxx.xxx.xxx.xxx

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

* 1)解决方法:自己建个broker.conf文件将上面内容拷贝进去,上面为broker默认配置文件
* 2)解决方法:在配置文件最上面加上 brokerIP1 = xxx.xxx.xxx.xxx

从安装到使用springboot集成RocketMq4.5.2

store为数据文件
docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/logs:/root/logs -v `pwd`/store:/root/store -v `pwd`/broker.conf:/etc/rocketmq/broker.conf --name rmqbroker --link rmqserver:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m" foxiswho/rocketmq:broker

3.可视化工具

docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv\
 -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876\
 -Dcom.rocketmq.sendMessageWithVIPChannel=false"\
 -t styletang/rocketmq-console-ng

访问 ip:8180,及可进入下图页面,然后可切换中文界面
从安装到使用springboot集成RocketMq4.5.2

4.SpringBoot 集成

 <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>2.0.3</version>
 </dependency>
# RocketMQ YML配置
rocketmq:
  #nameserver集群地址用;隔开
  name-server: 192.168.1.169:9876
  #生产者参数配置
  producer:
    group: my-group
    sendMessageTimeout: 300000
    compress-message-body-threshold: 4096
    max-message-size: 4194304
    retry-times-when-send-async-failed: 0
    retry-next-server: true
    retry-times-when-send-failed: 2
生产者

我是使用测试类启动的,请自行修改

import com.zz.rocketmq.CloudRocketmqApplication;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Description: 消息生产者
 * @Author: zhao
 * @Date: 2020-07-02
 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CloudRocketmqApplication.class)
public class MqProduceTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private static String topic = "zz_topic";

    /**
     * 发送同步消息
     */
    @Test
    public void syncSend() {
        //第一个参数是发送的目的地,一般是放topic,也可以放topic:tag;第二个参数是消息
        SendResult sendResult = rocketMQTemplate.syncSend(topic, "Hello, sychronized message!");
        System.out.println("同步消息的结果:" + sendResult);
    }

    /**
     * 发送异步消息
     */
    @Test
    public void asyncSend() {
        //第一个参数是发送的目的地,一般是放topic,也可以放topic:tag;第二个参数是消息;
        //第三个参数是异步消息发送结果的回调
        rocketMQTemplate.asyncSend(topic, "Hello, asychronized message!", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步消息发送成功,发送结果:"+sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步消息发送失败,消息回调");
            }
        });
    }
    @Test
    public void sendMsg() {
        //发送oneway消息
        //oneway消息就是只管发送,不管发送的结果如何
        rocketMQTemplate.sendOneWay(topic,"Hello, oneway message!");

        //发送带有tag的消息
        SendResult tagResult1 = rocketMQTemplate.syncSend(topic + ":test1", "Hello, tags:test1 message!");
        System.out.println("带有tag:test1的消息发送结果:"+tagResult1);

        rocketMQTemplate.convertAndSend(topic, "hell aaa");
    }
}
消费者
package com.zz.rocketmq;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @Description: 消息消费者
 * @Author: zhao
 * @Date: 2020-07-02
 **/
@Service
@RocketMQMessageListener(topic = "zz_topic",consumerGroup = "my_consumer")
public class Consumer implements RocketMQListener<MessageExt> {

    /**
     *  注意: topic 要和发送者的一致,不然收不到
     * MessageExt 限定数据类型,可改为RocketMQListener<String>
     * @param messageExt
     */
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("收到了消息:"+new String(messageExt.getBody()));
    }
}

要注意编码规范哦,如:topic 常量应定义在配置类中或配置文件中,以便管理
~对你有帮助的话点个赞呗。
~有天津的小伙伴吗,一起交流下呀。

本文地址:https://blog.csdn.net/qq_39035267/article/details/107087572