【RocketMQ高可用】RocketMQ 双主双从集群搭建
目录
一、集群架构
描述 | |
NameServer 集群 | NameServer 是一个无状态的节点,可集群部署,节点都是各自独立的,无任何信息同步 |
Broker 集群 |
① Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但一个 Slave 只能对应一 个Master ② Master 与 Slave 的对应关系通过制定相同的 BrokerName,不同的 BrokerID 来定义,id 为 0 表示 Master, 非 0 表示 Slave ③ 可以部署多个 Master 实现 Broker 集群,即多组 Master - Slaves 的 Broker 节点 ④ Master 通常用于写入数据,Slaves 用于读取数据 ⑤ 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer |
Producer 集群 |
① Producer 为消息的生产者,都是各自独立的无状态的节点,可以认为只要向 mq 中推送消息的节点都算作 Producer 节点。 ② Producer 节点与 NameServer 集群中的随机一个节点建立长连接,定期从 NameServer 取出 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接。且定时向 Master 发送心跳。 |
Customer 集群 |
① Customer 为消息的消费者,都是各自独立的无状态的节点,可以认为只要向 mq 中获取消息的节点都算作 Customer 节点。 ② Customer 节点与 NameServer 集群中的随机一个节点建立长连接,定期从 NameServer 取出 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接。且定时向 Master、Slave 发送心跳。 ③ Customer 节点既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定 |
二、四种集群模式
2.1 单 Master 模式
部署简单,但风险较大,一旦 Broker 重启或者宕机时,会导致服务不可用,不建议在线上环境使用,可以在本地测试
2.2 多 Master 模式
Broker 集群中无 Slave,全是 Master
优点:
配置简单,且单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复的情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
缺点:
单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅和消费,消费实时性会受到影响
2.3 多 Master 多 Slave 模式(异步)
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采取异步复制方式,主备有短暂消息延迟(毫秒级)。即提供者将消息发送到 Master 后,Master 即刻返回结果给提供者。同时异步复制到 Slave 中
优点:
即使磁盘损坏,丢失的消息非常少,且消息的实时性不会受到影响。且 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样
缺点:
Master 宕机,磁盘损坏的情况下会丢失少量消息
2.4 多 Master 多 Slave 模式(同步)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功。即提供者将消息发送到 Master后,Master 要将这条消息同步到所有的 Slave 上,同步全部完成后,才会返回结果给提供者。
优点:
数据与服务都无单点故障,在 Master 宕机的情况下消息无延迟,服务可用性与数据可用性都非常高
消息安全高,高可用性
缺点:
性能比异步复制模式略低(约低10%左右),发送单个消息的RT会略高,且在主节点宕机后,备机不能自动切换为主机,需要人为干涉
三、双主双从集群搭建
3.1 总体架构图
双主双从集群(2m-2s)同步双写方式,即使用两组 Master-Slave 形成集群
3.2 工作流程
1)启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
2)Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有
Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
3)收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
4)Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪
些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker
建立连接通道,开始消费消息。
3.3 双主双从集群搭建流程
3.3.1 防火墙开放端口
在实际工作中,应只对外暴露需要被外部访问的端口。
- nameserver: 默认使用 9876 端口
- master: 默认使用 10911 端口
- slave: 默认使用 11011 端口
开放命令:
# 开放name server默认端口
firewall-cmd --zone=public --add-port=9876/tcp --permanent
# 开放master默认端口
firewall-cmd --zone=public --add-port=10911/tcp --permanent
# 开放slave默认端口 (当前集群模式可不开启)
firewall-cmd --zone=public --add-port=11011/tcp --permanent
# 重启防火墙
firewall-cmd --reload
# 查看已开放的端口
firewall-cmd --list-ports
3.3.2 配置环境变量
进入配置文件
vim /etc/profile
在 profile 文件末尾增加
#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PAT
让配置立刻生效
source /etc/profile
3.3.3 创建消息存储路径
mq 获取到消息后,broker 会默认将消息持久化,持久化目录默认为 /home,故可以修改消息存储路径
# 创建消息存储文件夹
cd /usr/local/rocketmq
mkdir store
mkdir ./store/commitlog
mkdir ./store/consumequeue
mkdir ./store/index
创建slave文件夹,避免和master共用一个存储文件夹,否则会报 Lock failed,MQ already started
# 创建 slave 存储的文件夹
cd /usr/local/rocketmq
mkdir s_store
mkdir ./s_store/commitlog
mkdir ./s_store/consumequeue
mkdir ./s_store/index
3.3.4 配置 broker 配置文件
rocketmq 已经提供了一些集群模式的配置文件模板
cd /usr/local/rocketmq/conf
修改集群配置:
序号 | IP | 角色 | 架构模式 |
1 | 192.168.234.135 | nameserver、brokerserver | Master1、Slave2 |
2 | 192.168.234.139 | nameserver、brokerserver | Master2、Slave1 |
1)Master1 配置(192.168.234.135)
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties
修改配置文件内容:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
brokerIP1=192.168.234.135
#nameServer地址,分号分割
namesrvAddr=192.168.234.135:9876;192.168.234.139:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
2)Slave2 配置(192.168.234.135)
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties
修改配置文件内容:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
brokerIP1=192.168.234.135
#nameServer地址,分号分割
namesrvAddr=192.168.234.135:9876;192.168.234.139:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/s_store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/s_store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/s_store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/s_store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/s_store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/s_store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
3)Master2 配置(192.168.234.139)
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties
修改配置文件内容:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
brokerIP1=192.168.234.139
#nameServer地址,分号分割
namesrvAddr=192.168.234.139:9876;192.168.234.135:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
4)Slave1 配置(192.168.234.139)
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties
修改配置文件内容:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
brokerIP1=192.168.234.139
#nameServer地址,分号分割
namesrvAddr=192.168.234.135:9876;192.168.234.139:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/s_store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/s_store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/s_store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/s_store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/s_store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/s_store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
3.3.5 开放对应端口
根据自己在 Master 文件中设定的 ListenPort 值,计算需要开放的端口:
1) Master1
在 Master1 中定义的 listenPort 为 10911,故需要暴露:
vip通道端口:10911 - 2 = 10909
HA 高可用端口:10911 + 1 = 10912
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10912/tcp --permanent
firewall-cmd --reload
2) Master2
在 Master2 中定义的 listenPort 为 11011,故需要暴露:
vip通道端口:11011 - 2 = 11009
HA 高可用端口:11011 + 1 = 11012
firewall-cmd --zone=public --add-port=11009/tcp --permanent
firewall-cmd --zone=public --add-port=11012/tcp --permanent
firewall-cmd --reload
3.3.6 创建脚本文件
nameserver 脚本:
# 创建启动文件
vim name_start.sh
# 添加内容
nohup sh ./bin/mqnamesrv &
echo "nameserver 启动成功 .."
tail -f ~/logs/rocketmqlogs/namesrv.log
# 创建关闭文件
vim name_stop.sh
# 添加内容
sh bin/mqshutdown namesrv
echo "nameserver 关闭成功..."
echo "再次搜索结果为:"
sleep 3
jps
在135服务器上:
# 编写master1启动脚本
vim b_master1_start.sh
# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties &
tail -f ~/logs/rocketmqlogs/broker.log
# 编写slave2启动脚本
vim b_slave2_start.sh
# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log
在139服务器上:
# 编写master2启动脚本
vim b_master2_start.sh
# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties &
tail -f ~/logs/rocketmqlogs/broker.log
# 编写slave1启动脚本
vim b_slave1_start.sh
# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log
四、集群监控平台
4.1 概述
RocketMQ
有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫rocketmq-console
,这个便是管理控制台项目了,先将incubator-rocketmq-externals拉到本地,因为我们需要自己对rocketmq-console
进行编译打包运行。
4.2 编译和打包
下载项目:
git clone https://github.com/apache/rocketmq-externals
修改配置文件:
打包:
用git打包界面,输入下面的命令
mvn clean package -Dmaven.test.skip=true
将打包后的jar包放到服务器上,启动控制台:
java -jar rocketmq-console-ng-1.0.0.jar
如果服务器的防火墙没有开放 8080、11099 端口,则需要打开:
firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --zone=public --add-port=11009/tcp --permanent
firewall-cmd --reload
启动成功后,在浏览器访问 localhost:8080 进入控制台界面即可