欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot RocketMQ docker整合使用

程序员文章站 2022-07-15 08:07:23
...

RocketMQ介绍

参考文章
简单来说
Broker 是管理消息队列
NameServer 是管理Broker的
生产者 是生产消息队列的
消费者 是出来消息队列的

发送方式有两种:
1.push生产者发送了消息,消费者会去Broker轮询拉取消息
2.pull生产者只发送消息,消费者需要主动去拉消息

发送方法有三种:
同步:可靠的同步传输广泛应用于重要通知消息,短信通知,短信营销系统等。
异步:异步传输一般用于响应时间敏感的业务场景。
单向:单向传输用于需要中等可靠性的情况,例如日志收集。

RocketMQ不使用Docker部署

我的简书

RocketMQ docker部署

使用的是foxiswho大佬的docker镜像
参考文章

#执行

git clone  https://github.com/foxiswho/docker-rocketmq.git

cd docker-rocketmq

cd rmq


chmod +x  start.sh

./start.sh

将会一键生成3个容器
如果输出如下信息,表名 创建成功

Creating rmqnamesrv ... done
Creating rmqbroker  ... done
Creating rmqconsole ... done

浏览器访问

# 这里访问的rocketmq后台管理
localhost:8180

RocketMQ和项目交互测试

docker镜像和本地idea交互

到下载git的目录 rmq/brokerconf/broker.conf 里面修改brokerIP1=127.0.0.1
这里的意思是如果项目在docker里面就可以不设置(#brokerIP1=127.0.0.1
这)
如果项目在项目不在docker里需要设置brokerIP1=宿主机IP地址

如果项目在项目不在docker里需要自己下载rocketmq管理后台

# 下载地址
https://github.com/apache/rocketmq
# 用idea 打开rocketmq-console
# 下载依赖
# 打开http://localhost:8080/

打开成功

SpringBoot RocketMQ docker整合使用

编写生产者和消费者
导入依赖pom.xml

        <!--    rocketMQ    -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

编写application.properties

# application.properties

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 同步生产者的组名
apache.rocketmq.producer.syncProducerGroup=syncProducerGroup
# 异步生产者的组名
apache.rocketmq.producer.asyncProducerGroup=asyncProducerGroup
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876

生产者

@Component
public class RocketMQClient {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.syncProducerGroup}")
    private String syncProducerGroup;

    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.asyncProducerGroup}")
    private String asyncProducerGroup;


    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    /**
     * 同步发送
     */
    @PostConstruct
    public void SyncProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);
        // 同步发送消息重试次数,默认为 2
        producer.setRetryTimesWhenSendFailed(3);

        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();

            //创建一个消息实例,包含 topic、tag 和 消息体
            //如下:topic 为 "demo",tag 为 "push"
            Message message = new Message("demo", "push", "发送消息----同步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));

//            目前RocketMQ只支持固定精度级别的定时消息,服务器按照1-N定义了如下级别:
//            “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
//            ;若要发送定时消息,在应用层初始化Message消息对象之后,
//            调用setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s:
            message.setDelayTimeLevel(2);


            SendResult result = producer.send(message);
            System.out.println("发送同步响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }


    /**
     * 异步发送
     */
    @PostConstruct
    public void AsyncProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(asyncProducerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {

            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);

            Message msg = new Message("demo",
                    "push",
                    "发送消息----异步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //重点在这里 异步发送回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送异步响应:MsgId:" + sendResult.getMsgId() + ",发送状态:" + sendResult.getSendStatus());
                    producer.shutdown();
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    producer.shutdown();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }


    }


    /**
     * onewag
     */
    @PostConstruct
    public void OnewayProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();
            
            Message message = new Message("demo", "push", "发送消息----单向信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));


            producer.sendOneway(message);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }

}

消费者

@Component
public class RocketMQServer {
    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;


    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("demo", "push");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        System.out.println("messageExt: " + messageExt);//输出消息内容

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    // 接收失败重试
                    if (list.get(0).getReconsumeTimes() == 3){
                        // 重试3次
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                    }else {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

rocketmq信息
SpringBoot RocketMQ docker整合使用

rocketmq管理后台发送主题
SpringBoot RocketMQ docker整合使用

rocketmq管理后台发送主题的状态
SpringBoot RocketMQ docker整合使用

idea 接受mq
SpringBoot RocketMQ docker整合使用

rocketmq管理后台查看管理后台
SpringBoot RocketMQ docker整合使用

下面介绍的是项目打包成docker镜像运行

PS:
因为Mac和Windos是不支持 --net:host 方法。我暂时不知道怎么把项目的镜像和rocketmq镜像交互。
所以我是liunx服务器进行dokcer测试运行的

打包本地项目镜像

如果是本地已经有了项目的docker镜像

# cd到你要保存文件的文件夹 xxx是你的docker镜像(name or id)、xxx.tar是打包成为tar的压缩文件
docker save xxx xxx.tar
# 上传到服务器
# 导出镜像
docker load xxx.tar

如果是本地没有镜像、把jar包上传到liunx服务器(你需要的文件夹里面)
在和jar包同级文件夹里面编写Dockerfile
vim Dockerfile

# openjdk1.8 如果没有就下载镜像 如果有就不下载
FROM openjdk:8-jdk-alpine
# VOLUME指向了一个/tmp的目录,由于Spring Boot使用内置的Tomcat容器,Tomcat默认使用/tmp作为工作目录。效果就是在主机的/var/lib/docker目录下创建了一个临时文件,并连接到容器的/tmp。
VOLUME /tmp
# 修改名字
ADD docker-0.0.1-SNAPSHOT.jar app.jar
# RUN bash -c 'touch /app.jar'
# ENTRYPOINT 执行项目 app.jar。为了缩短 Tomcat 启动时间,添加一个系统属性指向/dev/urandom 作为 Entropy Source
ENTRYPOINT ["java","-jar","/app.jar"]

构建镜像

# 这个命令就是使用Docker的build命令来构建镜像,并给镜像起了一个名字name其tag为tag在当前文件夹******意后面有一个.
docker build -t name:tag .

启动项目镜像

ps:先启动rocketmq镜像

# --net=host 容器内镜像通信(还有几个通信方式、可以自己百度)
# -d 后台执行
# -p 9000:9000 把项目9000端口映射到外部的9000端口
# 372cca80dbcc 镜像的id 也是可以镜像的名字
docker run --net=host -d -p 9000:9000  372cca80dbcc

因为上面的启动方式不好看输出
下面的展示我是用这个方式启动的

# -t 以交互的方式启动容器
docker run --net=host -t -p 9000:9000  372cca80dbcc

容器启动
SpringBoot RocketMQ docker整合使用

rocketmq后台发送信息
SpringBoot RocketMQ docker整合使用

容器接收信息
SpringBoot RocketMQ docker整合使用

下面介绍的是docker compose方式启动

ps:其还可以以docker compose方式启动、不过需要创建项目镜像。上面已经说了怎么构建。
在foxiswhofoxiswho大佬项目中

# 到rmq路径
cd 你的路径/docker-rocketmq/rmq
# 编辑docker-compose.yml
vim docker-compose.yml

编辑docker-compose.yml

version: '3.5'

services:
  rmqnamesrv: # namesrv
    image: foxiswho/rocketmq:4.7.0 # 镜像
    container_name: rmqnamesrv # 镜像名字
    ports: # 端口映射
      - 9876:9876
    volumes: # 数据持久话变成的路径
      - ./rmqs/logs:/opt/logs
      - ./rmqs/store:/opt/store
    environment: # 内存设置
      JAVA_OPT_EXT: "-Duser.home=/opt -Xms512M -Xmx512M -Xmn128m"
    command: ["sh","mqnamesrv"]
    networks: # 镜像通信
        rmq:
          aliases:
            - rmqnamesrv
  rmqbroker: # broker
    image: foxiswho/rocketmq:4.7.0
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./rmq/logs:/opt/logs
      - ./rmq/store:/opt/store
      - ./rmq/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        JAVA_OPT_EXT: "-Duser.home=/opt -Xms512M -Xmx512M -Xmn128m"
    command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf","-n","rmqnamesrv:9876","autoCreateTopicEnable=true"]
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8180:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

  mydocker: # 我新建的项目
      image: docker:latest 
      container_name: mydocker
      ports: #对应端口
        - 9000:9000
      depends_on: #启动依赖必须等下面启动了 项目才可以启动
        - rmqnamesrv
        - rmqbroker
        - rmqconsole
      network_mode: "host" # --net=host启动

networks: # 镜像通信
  rmq:
    name: rmq
    driver: bridge

其实docker数据持久化还可以这么操作

  1. docker ps -a找到我们上次运行的容器id
  2. docker restart id 即可。
    你没看错就这样就完了。
    网上有很多说用docker volume实现数据持久化,我觉得是小题大做了。
    Docker Volume本质上是容器与主机之间共享的目录或者文件,这样Docker Volume中的数据可以在主机和容器中实时同步

删除所有未运行的容器

sudo docker rm $(sudo docker ps -a -q)
相关标签: java