欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocketmq4.0集群部署

程序员文章站 2022-05-07 18:46:43
...

一.学习前概念理解
生产者(Producer):消息发送方,将业务系统中产生的消息发送到brokers(brokers可以理解为消息代理,生产者和消费者之间是通过brokers进行消息的通信),rocketmq提供了以下消息发送方式:同步、异步、单向。
生产者组(Producer Group):相同角色的生产者被归为同一组,比如通常情况下一个服务会部署多个实例,这多个实例就是一个组,生产者分组的作用只体现在消息回查的时候,即如果一个生产者组中的一个生产者实例发送一个事务消息到broker后挂掉了,那么broker会回查此实例所在组的其他实例,从而进行消息的提交或回滚操作。
消费者(Consumer):消息消费方,从brokers拉取消息。站在用户的角度,有以下两种消费者。
主动消费者(PullConsumer):从brokers拉取消息并消费。
被动消费者(PushConsumer):内部也是通过pull方式获取消息,只是进行了扩展和封装,并给用户预留了一个回调接口去实现,当消息到底的时候会执行用户自定义的回调接口。
消费者组(Consumer Group):和生产者组类似。其作用体现在实现消费者的负载均衡和容错,有了消费者组变得异常容易。需要注意的是:同一个消费者组的每个消费者实例订阅的主题必须相同。
主题(Topic):主题就是消息传递的类型。一个生产者实例可以发送消息到多个主题,多个生产者实例也可以发送消息到同一个主题。同样的,对于消费者端来说,一个消费者组可以订阅多个主题的消息,一个主题的消息也可以被多个消费者组订阅。
消息(Message):消息就像是你传递信息的信封。每个消息必须指定一个主题,就好比每个信封上都必须写明收件人。
消息队列(Message Queues):在主题内部,逻辑划分了多个子主题,每个子主题被称为消息队列。这个概念在实现最大并发数、故障切换等功能上有巨大的作用。
标签(Tag):标签,可以被认为是子主题。通常用于区分同一个主题下的不同作用或者说不同业务的消息。同时也是避免主题定义过多引起性能问题,通常情况下一个生产者组只向一个主题发送消息,其中不同业务的消息通过标签或者说子主题来区分。
消息代理(Broker):消息代理是RockerMQ中很重要的角色。它接收生产者发送的消息,进行消息存储,为消费者拉取消息服务。它还存储消息消耗相关的元数据,包括消费群体,消费进度偏移和主题/队列信息。
命名服务(Name Server):命名服务作为路由信息提供程序。生产者/消费者进行主题查找、消息代理查找、读取/写入消息都需要通过命名服务获取路由信息。

二.系统环境:
64bit OS, Linux/Unix/Mac (我这里是centos7)
64bit JDK 1.8+;
机器list:

192.168.226.132 namesrv,master-a

192.168.226.133 namesrv,master-b

192.168.226.134 namesrv,master-a-s

192.168.226.135 namesrv,master-b-s

Rocketmq下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.0.0-incubating/

注释:下载包分为二进制包和源码编译包,此文只适合二进制包

三.部署

132机器下载4.0的二进制包:

Wget http://mirror.bit.edu.cn/apache/rocketmq/4.0.0-incubating/rocketmq-all-4.0.0-incubating-bin-release.zip

解压到 /opt/app/rocketmq40

目录结构:

├── benchmark

├── bin

├── conf

├── DISCLAIMER

├── lib

├── LICENSE

├── NOTICE

├── README.md

因为虚拟机资源有限,rocketmq默认启动jvm虚拟机参数过大,需要进行调整。

修改 bin/runserver.sh第39行为:

JAVA_OPT=”${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m -XX:PermSize=128m -XX:MaxPermSize=320m”

修改bin/runbroker.sh第39行为:

JAVA_OPT=”${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m”

启动namesrv服务:

nohup sh bin/mqnamesrv &

查看启动日志:

tail /root/logs/rocketmqlogs/namesrv.log

INFO main - The Name Server boot success. serializeType=JSON

有类似输出说明启动成功。

将132机器上的rocketmq40 目录复制到其他节点上:

scp -r /opt/app/rocketmq 192.168.226.133:/opt/app/
scp -r /opt/app/rocketmq 192.168.226.134:/opt/app/
scp -r /opt/app/rocketmq 192.168.226.135:/opt/app/
同样的方式启动133,134,135上的namesrv服务。

启动broker:

Rocketmq提供多种集群方式:

2m-2s-sync:两主两从同步双写(两个master,两个slave,数据同步双写到master和slave)
2m-2s-async:两主两从异步复制(两个master,两个slave,master数据通过异步复制到slave)
2m-noslave:两主(只有两个master,没有slave)
Sync:生产者发送的每一条消息都至少同步复制到一个slave后才返回告诉生产者成功,即“同步双写”。

   Async生产者发送的每一条消息只要写入master就返回告诉生产者成功。然后再“异步复制”到slave。

这里使用2m-2s-async模式

根据机器list角色编辑配置文件:

以master-a为例:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.226.132:9876;192.168.226.133:9876;192.168.226.134:9876;192.168.226.135:9876 ###四台机器的namesrv地址
listenPort=13201 ###broker-a master监听地址
storePathRootDir=/opt/app/rocketmq/store ###默认路径为~/store,设置下面两项必须设置本项,否则还是会使用默认路径
storePathCommitLog=/opt/app/rocketmq/store/commitlog ###commitLog存储路径
storePathConsumerQueue=/opt/app/rocketmq/store/consumerqueue ###消费队列存储路径

132:

nohup bash bin/mqbroker -c conf/2m-2s-async/broker-a.properties &

启动日志在/root/logs/rocketmqlogs/broker.log

134:

nohup bash bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &

133:

nohup bash bin/mqbroker -c conf/2m-2s-async/broker-b.properties &

135:

nohup bash bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &

查看集群状态:

bash bin/mqadmin clusterlist

报错:

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed

处理:查看输出发现链接namesrv为null,申明变量NAMESRV_ADDR

export NAMESRV_ADDR=”192.168.226.132:9876;192.168.226.133:9876;192.168.226.134:9876;192.168.226.135:9876”
bash bin/mqadmin clusterlist

Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE

DefaultCluster broker-a 0 192.168.226.132:13201 V4_0_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 424904.50 0.3278
DefaultCluster broker-a 1 192.168.226.134:13402 V4_0_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 424904.50 0.0737
DefaultCluster broker-b 0 192.168.226.133:13301 V4_0_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 424904.50 0.2464
DefaultCluster broker-b 1 192.168.226.135:13502 V4_0_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 424904.50 0.0741
四.rocketmq-console可视化管理界面

下载

git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals/rocketmq-console/

编辑配置文件

vim src/main/resources/application.properties
server.contextPath=
server.port=8080 ###这里设置端口

spring.application.index=true

spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml

if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876

rocketmq.config.namesrvAddr=192.168.226.132:9876;192.168.226.133:9876;192.168.226.134:9876;192.168.226.135:9876 ###填写namesrv的地址

if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true

rocketmq.config.isVIPChannel=

rocketmq-console’s data path:dashboard/monitor

rocketmq.config.dataPath=/tmp/rocketmq-console/data

set it false if you don’t want use dashboard.default true

rocketmq.config.enableDashBoardCollect=true
执行编译命令:mvn clean package -Dmaven.test.skip=true ,耗时会比较长。

完成后会生成target/rocketmq-console-ng-1.0.0.jar 该jar包

启动:

nohup java -jar rocketmq-console-ng-1.0.0.jar &

该服务日志默认位置为 ~/logs/consolelogs/rocketmq-console.log

五.测试
测试我采用的是rocketmq自带的测试工具

1.同样需要设置好NAMESRV_ADDR环境变量

生成消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

生成消费者:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

此时可以登陆rocketmq-console web界面查看

http://192.168.226.132:8080/#/cluster

rocketmq4.0集群部署

上图可得知,运行一次生成消息命令共产生1000条信息,broker-a和broker-b分别生成500,两个slave同步自己主节点的500,运行产生消费者命令,将产生的1000条全部消费。

##注意:两台机器的朋友一定要把broker的目录改为不同位置!!!不然主从无法同步的!