原文章出处:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/
https://blog.csdn.net/qq_27529917/article/details/79871052
项目用到了rocketMQ,资料整理一波,以便学习使用。
在 IDEA 创建一个 SpringBoot 项目,项目结构如下:
pom 文件
引入 RocketMQ 的一些相关依赖,最后的 pom 文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhisheng</groupId>
<artifactId>rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rocketmq</name>
<description>Demo project for Spring Boot RocketMQ</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
|
配置文件
application.properties 中如下:
1
2
3
4
5
6
|
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876
|
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package com.zhisheng.rocketmq.client;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import javax.annotation.PostConstruct;
/**
* Created by zhisheng_tian on 2018/2/6
*/
@Component
public class RocketMQClient {
/**
* 生产者的组名
*/
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQProducer() {
//生产者的组名
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(namesrvAddr);
try {
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
//创建一个消息实例,包含 topic、tag 和 消息体
//如下:topic 为 "TopicTest",tag 为 "push"
Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
StopWatch stop = new StopWatch();
stop.start();
for (int i = 0; i < 10000; i++) {
SendResult result = producer.send(message);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
stop.stop();
System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
|
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package com.zhisheng.rocketmq.server;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* Created by zhisheng_tian on 2018/2/6
*/
@Component
public class RocketMQServer {
/**
* 消费者的组名
*/
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消费者的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("TopicTest", "push");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);//输出消息内容
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
启动类
1
2
3
4
5
6
7
8
9
10
11
12
|
package com.zhisheng.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqApplication.class, args);
}
}
|
RocketMQ
代码已经都写好了,接下来我们需要将与 RocketMQ 有关的启动起来。
启动 Name Server
在前面文章中已经写过怎么启动,http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer
进入到目录 :
1
|
cd distribution/target/apache-rocketmq
|
启动:
1
2
3
|
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log //通过日志查看是否启动成功
|
启动 Broker
1
2
3
|
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log //通过日志查看是否启动成功
|
然后运行启动类,运行效果如下:
监控
RocketMQ有一个对其扩展的开源项目 ocketmq-console ,如今也提交给了 Apache ,地址在:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console ,官方也给出了其支持的功能的中文文档:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md , 那么该如何安装?
Docker 安装
1、获取 Docker 镜像
1
|
docker pull styletang/rocketmq-console-ng
|
2、运行,注意将你自己的 NameServer 地址替换下面的 127.0.0.1
1
|
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
|
非 Docker 安装
我们 git clone 一份代码到本地:
1
2
3
|
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals/rocketmq-console/
|
需要 jdk 1.7 以上。 执行以下命令:
或者
1
2
3
|
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar
|
注意:
1、如果你下载依赖缓慢,你可以重新设置 maven 的 mirror 为阿里云的镜像
1
2
3
4
5
6
7
8
|
<mirrors>
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
|
2、如果你使用的 RocketMQ 版本小于 3.5.8,如果您使用 rocketmq < 3.5.8,请在启动 rocketmq-console-ng 时添加 -Dcom.rocketmq.sendMessageWithVIPChannel = false
(或者您可以在 ops 页面中更改它)
3、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在ops页面中更改它)
错误解决方法
1、Docker 启动项目报错
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
将 Docker 启动命令改成如下以后:
1
|
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
|
报错信息改变了,新的报错信息如下:
1
2
3
|
ERROR op=global_exception_handler_print_error
org.apache.rocketmq.console.exception.ServiceException: This date have't data!
|
看到网上有人也遇到这个问题,他们都通过自己的方式解决了,但是方法我都试了,不适合我。不得不说,阿里,你能再用心点吗?既然把 RocketMQ 捐给 Apache 了,这些文档啥的都必须更新啊,不要还滞后着呢,不然少不了被吐槽!
搞了很久这种方法没成功,暂时放弃!mmp
2、非 Docker 安装,只好把源码编译打包了。
1) 注意需要修改如下图中的配置:
1
2
3
4
|
rocketmq.config.namesrvAddr=localhost:9876 //注意替换你自己的ip
#如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false,默认是 true, 这个可以在源码中可以看到的
rocketmq.config.isVIPChannel=
|
2) 执行以下命令:
1
|
mvn clean package -Dmaven.test.skip=true
|
编译成功:
可以看到已经打好了 jar 包:
运行:
1
|
java -jar rocketmq-console-ng-1.0.0.jar
|
成功,不报错了,开心????,访问 http://localhost:8080/
整个监控大概就是这些了。
然后我运行之前的 SpringBoot 整合项目,查看监控信息如下:
总结
整篇文章讲述了 SpringBoot 与 RocketMQ 整合和 RocketMQ 监控平台的搭建。
另外:再看下RocketMQ中的NameServer:
先看NameServer在RocketMQ架构中的功能图:
NameServer,很多时候称为命名发现服务,其在RocketMQ中起着中转承接的作用,是一个无状态的服务,多个NameServer之间不通信。任何Producer,Consumer,Broker与所有NameServer通信,向NameServer请求或者发送数据。而且都是单向的,Producer和Consumer请求数据,Broker发送数据。正是因为这种单向的通信,RocketMQ水平扩容变得很容易。
下面我将按照Broker,Producer,Consumer的顺序来说明NameServer与他们之间的通信和发挥的作用。
NameServer与Broker间的通信和其发挥的作用:
- 在NameServer启动后,启动Broker。Broker在启动时会加载配置中的topic信息。加载当前Broker上的所有topic信息:topic名称;topic的Queue权限,可读,可写,可继承等等;Queue的个数。然后将这些数据传输到NameServer,美其名曰registerBroker。
- NameServer与Broker间维持着一个SocketChannel,长连接,Broker每隔30S向其配置的所有的NameServer执行registerBroker工作,这就是Broker和NameServer间的心跳。
- NameServer在接受到Broker传递的心跳信息时,若这次心跳是其第一次心跳,那么创建BrokerData,创建BrokerLiveInfo,保存其dataVersion和lastUpdateTimestamp;如果不是第一次,那么更新其lastUpdateTimestamp和dataVersion。
- 如果这个Broker是Master,且这次心跳信息是其第一次心跳,那么会创建当前Broker的QueueData。如果不是第一次心跳,但当前Broker的dataVersion与NameServer上保存的不一致(当Broker上新增加了topic时会更新dataVersion,dataVersion主要用当前时间戳表示),此时会用当前心跳的数据覆盖之前注册的数据。
- 如果当前Broker是Slave,那么将Master的brokerAddr放入心跳注册结果中,返回给Slave,这样Slave就能与Master间进行数据传输。
- NameServer维护着与其他组件的SocketChannel对象,针对所有组件(Broker和Client)的长连接注册了ChannelEventListener,监听此SocketChannel的连接事件。当某个SocketChannel出现异常或断开时(注意是长连接断开而不是心跳停止!),会循环遍历所有Broker的长连接,如果发现断开长连接是属于某个Broker的,那么清除此Broker的BrokerData和QueueData,如果不属于Broker,则什么都不做。这样当Client(Producer,Consumer)下次请求指定topic的TopicRouteData时,就不会包含此Broker的的数据了,也就是MessageQueue上不再包含此Broker上的Queue。
- 因为ChannelEventListener的连接事件处理里只对Broker做相应处理,没有涉及到Client。所以在Broker宕机或者增加时,不会实时通知Client,Client最晚需要30S时间才能感知到这种变化,因为Client更新TopicRouteData的间隔是30S。
- NameServer每隔30S对所有Broker的长连接进行扫描,当发现其lastUpdateTimestamp距离当前时间超过2m时,断开长连接,清空相应数据。
下面讲Producer和Consumer与NameServer间的通信:
- Producer在发送消息时,首先根据消息的topic查看自身是否含有此topic相应的MessageQueue( 正常情况下第一次发送时是没有的 )。当没有MessageQueue时,会从NameServer处请求指定topic的TopicRouteData,也就是List< BrokerData>和 List< QueueData>。然后根据QueueData里的writeQueueNums和BrokerData里的topic, HashMap< brokerId , broker address > 生成MessageQueue,MessageQueue的id从0开始,依次递增。注意如果BrokerData里不含有Master的Adress,那么其对应的QueueData将会被废弃,因为只有Master才能写入消息。
- 生成MessageQueue后,会将此topic存在生产者客户端,客户端每隔30S向NameServer请求此topic的TopicRouteData,生成MessageQueue,覆盖上次更新的值。
- Consumer在启动之前就需要指定订阅的topic,因此其在启动时就会想NameServer请求相应topic的TopicRouteData,同样的形式生成MessageQueue,但和Producer不同的是,Consumer可以从Slave处拉取消息,所以不会过滤Master宕机的Broker数据。
- Consumer客户端也每隔30S从NameServer处更新当前topic的数据,覆盖上次的值。
以上就是NameServer与各个组件间的通信机制及其发挥的作用。