欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot RocketMQ 整合使用和监控

程序员文章站 2022-07-15 08:03:22
...

原文章出处:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/

                        https://blog.csdn.net/qq_27529917/article/details/79871052

项目用到了rocketMQ,资料整理一波,以便学习使用。

在 IDEA 创建一个 SpringBoot 项目,项目结构如下:

SpringBoot RocketMQ 整合使用和监控

pom 文件

引入 RocketMQ 的一些相关依赖,最后的 pom 文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.zhisheng</groupId>
	<artifactId>rocketmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>rocketmq</name>
	<description>Demo project for Spring Boot RocketMQ</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-common</artifactId>
			<version>4.2.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.2.0</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

配置文件

application.properties 中如下:

1
2
3
4
5
6
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.zhisheng.rocketmq.client;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

import javax.annotation.PostConstruct;

/**
 * Created by zhisheng_tian on 2018/2/6
 */
@Component
public class RocketMQClient {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();

          	 //创建一个消息实例,包含 topic、tag 和 消息体
             //如下:topic 为 "TopicTest",tag 为 "push"
            Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));

            StopWatch stop = new StopWatch();
            stop.start();

            for (int i = 0; i < 10000; i++) {
                SendResult result = producer.send(message);
                System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
            }
            stop.stop();
            System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.zhisheng.rocketmq.server;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * Created by zhisheng_tian on 2018/2/6
 */
@Component
public class RocketMQServer {
    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("TopicTest", "push");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        System.out.println("messageExt: " + messageExt);//输出消息内容

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

启动类

1
2
3
4
5
6
7
8
9
10
11
12
package com.zhisheng.rocketmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketmqApplication {

	public static void main(String[] args) {
		SpringApplication.run(RocketmqApplication.class, args);
	}
}

RocketMQ

代码已经都写好了,接下来我们需要将与 RocketMQ 有关的启动起来。

启动 Name Server

在前面文章中已经写过怎么启动,http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer

进入到目录 :

1
cd distribution/target/apache-rocketmq

启动:

1
2
3
nohup sh bin/mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log //通过日志查看是否启动成功

启动 Broker

1
2
3
nohup sh bin/mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker.log	//通过日志查看是否启动成功

然后运行启动类,运行效果如下:

SpringBoot RocketMQ 整合使用和监控

监控

RocketMQ有一个对其扩展的开源项目 ocketmq-console ,如今也提交给了 Apache ,地址在:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console ,官方也给出了其支持的功能的中文文档:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md , 那么该如何安装?

Docker 安装

1、获取 Docker 镜像

1
docker pull styletang/rocketmq-console-ng

2、运行,注意将你自己的 NameServer 地址替换下面的 127.0.0.1

1
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

非 Docker 安装

我们 git clone 一份代码到本地:

1
2
3
git clone https://github.com/apache/rocketmq-externals.git

cd rocketmq-externals/rocketmq-console/

需要 jdk 1.7 以上。 执行以下命令:

1
mvn spring-boot:run

或者

1
2
3
mvn clean package -Dmaven.test.skip=true

java -jar target/rocketmq-console-ng-1.0.0.jar

注意:

1、如果你下载依赖缓慢,你可以重新设置 maven 的 mirror 为阿里云的镜像

1
2
3
4
5
6
7
8
<mirrors>
    <mirror>
          <id>alimaven</id>
          <name>aliyun maven</name>
          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
          <mirrorOf>central</mirrorOf>        
    </mirror>
</mirrors>

2、如果你使用的 RocketMQ 版本小于 3.5.8,如果您使用 rocketmq < 3.5.8,请在启动 rocketmq-console-ng 时添加 -Dcom.rocketmq.sendMessageWithVIPChannel = false(或者您可以在 ops 页面中更改它)

3、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在ops页面中更改它)

错误解决方法

1、Docker 启动项目报错

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

SpringBoot RocketMQ 整合使用和监控

将 Docker 启动命令改成如下以后:

1
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

报错信息改变了,新的报错信息如下:

1
2
3
ERROR op=global_exception_handler_print_error

org.apache.rocketmq.console.exception.ServiceException: This date have't data!

SpringBoot RocketMQ 整合使用和监控

看到网上有人也遇到这个问题,他们都通过自己的方式解决了,但是方法我都试了,不适合我。不得不说,阿里,你能再用心点吗?既然把 RocketMQ 捐给 Apache 了,这些文档啥的都必须更新啊,不要还滞后着呢,不然少不了被吐槽!

搞了很久这种方法没成功,暂时放弃!mmp

2、非 Docker 安装,只好把源码编译打包了。

1) 注意需要修改如下图中的配置:

SpringBoot RocketMQ 整合使用和监控

1
2
3
4
rocketmq.config.namesrvAddr=localhost:9876		//注意替换你自己的ip

#如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false,默认是 true, 这个可以在源码中可以看到的
rocketmq.config.isVIPChannel=

2) 执行以下命令:

1
mvn clean package -Dmaven.test.skip=true

编译成功:

SpringBoot RocketMQ 整合使用和监控

可以看到已经打好了 jar 包:

运行:

1
java -jar rocketmq-console-ng-1.0.0.jar

成功,不报错了,开心????,访问 http://localhost:8080/

SpringBoot RocketMQ 整合使用和监控

SpringBoot RocketMQ 整合使用和监控

SpringBoot RocketMQ 整合使用和监控

SpringBoot RocketMQ 整合使用和监控

SpringBoot RocketMQ 整合使用和监控

SpringBoot RocketMQ 整合使用和监控

SpringBoot RocketMQ 整合使用和监控

整个监控大概就是这些了。

然后我运行之前的 SpringBoot 整合项目,查看监控信息如下:

SpringBoot RocketMQ 整合使用和监控

总结

整篇文章讲述了 SpringBoot 与 RocketMQ 整合和 RocketMQ 监控平台的搭建。

 

另外:再看下RocketMQ中的NameServer:

先看NameServer在RocketMQ架构中的功能图: 
SpringBoot RocketMQ 整合使用和监控

NameServer,很多时候称为命名发现服务,其在RocketMQ中起着中转承接的作用,是一个无状态的服务,多个NameServer之间不通信。任何Producer,Consumer,Broker与所有NameServer通信,向NameServer请求或者发送数据。而且都是单向的,Producer和Consumer请求数据,Broker发送数据。正是因为这种单向的通信,RocketMQ水平扩容变得很容易。

下面我将按照Broker,Producer,Consumer的顺序来说明NameServer与他们之间的通信和发挥的作用。

NameServer与Broker间的通信和其发挥的作用:

  1. 在NameServer启动后,启动Broker。Broker在启动时会加载配置中的topic信息。加载当前Broker上的所有topic信息:topic名称;topic的Queue权限,可读,可写,可继承等等;Queue的个数。然后将这些数据传输到NameServer,美其名曰registerBroker。
  2. NameServer与Broker间维持着一个SocketChannel,长连接,Broker每隔30S向其配置的所有的NameServer执行registerBroker工作,这就是Broker和NameServer间的心跳。
  3. NameServer在接受到Broker传递的心跳信息时,若这次心跳是其第一次心跳,那么创建BrokerData,创建BrokerLiveInfo,保存其dataVersion和lastUpdateTimestamp;如果不是第一次,那么更新其lastUpdateTimestamp和dataVersion。
  4. 如果这个Broker是Master,且这次心跳信息是其第一次心跳,那么会创建当前Broker的QueueData。如果不是第一次心跳,但当前Broker的dataVersion与NameServer上保存的不一致(当Broker上新增加了topic时会更新dataVersion,dataVersion主要用当前时间戳表示),此时会用当前心跳的数据覆盖之前注册的数据。
  5. 如果当前Broker是Slave,那么将Master的brokerAddr放入心跳注册结果中,返回给Slave,这样Slave就能与Master间进行数据传输。
  6. NameServer维护着与其他组件的SocketChannel对象,针对所有组件(Broker和Client)的长连接注册了ChannelEventListener,监听此SocketChannel的连接事件。当某个SocketChannel出现异常或断开时(注意是长连接断开而不是心跳停止!),会循环遍历所有Broker的长连接,如果发现断开长连接是属于某个Broker的,那么清除此Broker的BrokerData和QueueData,如果不属于Broker,则什么都不做。这样当Client(Producer,Consumer)下次请求指定topic的TopicRouteData时,就不会包含此Broker的的数据了,也就是MessageQueue上不再包含此Broker上的Queue。
  7. 因为ChannelEventListener的连接事件处理里只对Broker做相应处理,没有涉及到Client。所以在Broker宕机或者增加时,不会实时通知Client,Client最晚需要30S时间才能感知到这种变化,因为Client更新TopicRouteData的间隔是30S。
  8. NameServer每隔30S对所有Broker的长连接进行扫描,当发现其lastUpdateTimestamp距离当前时间超过2m时,断开长连接,清空相应数据。

下面讲Producer和Consumer与NameServer间的通信:

  1. Producer在发送消息时,首先根据消息的topic查看自身是否含有此topic相应的MessageQueue( 正常情况下第一次发送时是没有的 )。当没有MessageQueue时,会从NameServer处请求指定topic的TopicRouteData,也就是List< BrokerData>和 List< QueueData>。然后根据QueueData里的writeQueueNums和BrokerData里的topic, HashMap< brokerId , broker address > 生成MessageQueue,MessageQueue的id从0开始,依次递增。注意如果BrokerData里不含有Master的Adress,那么其对应的QueueData将会被废弃,因为只有Master才能写入消息。
  2. 生成MessageQueue后,会将此topic存在生产者客户端,客户端每隔30S向NameServer请求此topic的TopicRouteData,生成MessageQueue,覆盖上次更新的值。
  3. Consumer在启动之前就需要指定订阅的topic,因此其在启动时就会想NameServer请求相应topic的TopicRouteData,同样的形式生成MessageQueue,但和Producer不同的是,Consumer可以从Slave处拉取消息,所以不会过滤Master宕机的Broker数据。
  4. Consumer客户端也每隔30S从NameServer处更新当前topic的数据,覆盖上次的值。

以上就是NameServer与各个组件间的通信机制及其发挥的作用。