欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

阿里 RocketMQ 安装与简介

程序员文章站 2024-01-05 15:52:52
原文:阿里 RocketMQ 安装与简介 一、简介 官方简介: l RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: l 能够保证严格的消息顺序 l 提供丰富的消息拉取模式 l 高效的订阅者水平扩展能力 l 实时的消息订阅机制 l 亿级消息堆积能力 二、网络架构 三、特性 1. n ......

 原文:阿里 rocketmq 安装与简介

一、简介

官方简介:

l  rocketmq是一款分布式、队列模型的消息中间件,具有以下特点:

l  能够保证严格的消息顺序

l  提供丰富的消息拉取模式

l  高效的订阅者水平扩展能力

l  实时的消息订阅机制

l  亿级消息堆积能力

二、网络架构

 阿里 RocketMQ 安装与简介

三、特性

1.      nameserver

相对来说,nameserver的稳定性非常高。原因有二:

1 、nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。

2 、nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

2.      broker

与nameserver关系

l  连接

     单个broker和所有nameserver保持长连接

l  心跳

     心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。

     心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。

l  断开

     时机:broker挂掉;心跳超时导致nameserver主动关闭连接

     动作:一旦连接断开,nameserver会立即感知,更新topc与队列的对应关系,但不会通知生产者和消费者

负载均衡

l  一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。

l  如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。

l  topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。

可用性

   由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。

这里有两个关键点:

l  一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。

l  消费者得到master宕机通知后,转向slave消费,但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。

可靠性

l  所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高

l  同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠

l  异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电

消息清理

l  扫描间隔

     默认10秒,由broker配置参数cleanresourceinterval决定

l  空间阈值

     物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%

l  清理时机

     默认每天凌晨4点,由broker配置参数deletewhen决定;或者磁盘空间达到阈值

l  文件保留时长

     默认72小时,由broker配置参数filereservedtime决定

读写性能

l  文件内存映射方式操作文件,避免read/write系统调用和实时文件读写,性能非常高

l  永远一个文件在写,其他文件在读

l  顺序写,随机读

l  利用linux的sendfile机制,将消息内容直接输出到sokect管道,避免系统调用

系统特性

l  大内存,内存越大性能越高,否则系统swap会成为性能瓶颈

l  io密集

l  cpu load高,使用率低,因为cpu占用后,大部分时间在io wait

l  磁盘可靠性要求高,为了兼顾安全和性能,采用raid10阵列

l  磁盘读取速度要求快,要求高转速大容量磁盘

3.      消费者

与nameserver关系

l  连接

     单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。

l  心跳

与nameserver没有心跳

l  轮询时间

默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由defaultmqpushconsumer的pollnameserverinteval参数决定,可手动配置。

与broker关系

l  连接

单个消费者和该消费者关联的所有broker保持长连接。

l  心跳

默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由defaultmqpushconsumer的heartbeatbrokerinterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费

l  断开

时机:消费者挂掉;心跳超时导致broker主动关闭连接

动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费

负载均衡

集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

消费机制

l  本地队列

        消费者不间断的从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由defaultmqpushconsumer的pullthresholdforqueue属性控制,默认1000,可手动设置。

l  轮询间隔

     消息拉取线程每隔多久拉取一次?间隔时间由defaultmqpushconsumer的pullinterval属性控制,默认为0,可手动设置。

l  消息消费数量

     监听器每次接受本地队列的消息是多少条?这个参数由defaultmqpushconsumer的consumemessagebatchmaxsize属性控制,默认为1,可手动设置。

消费进度存储

     每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由defaultmqpushconsumer的persistconsumeroffsetinterval属性控制,默认为5秒,可手动设置。

如果一个topic在某broker上有3个队列,一个消费者消费这3个队列,那么该消费者和这个broker有几个连接?

     一个连接,消费单位与队列相关,消费连接只跟broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务

4.      生产者

与nameserver关系

l  连接

     单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。

l  轮询时间

默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间由defaultmqproducer的pollnameserverinteval参数决定,可手动配置。

l  心跳

与nameserver没有心跳

与broker关系

l  连接

单个生产者和该生产者关联的所有broker保持长连接。

l  心跳

默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由defaultmqproducer的heartbeatbrokerinterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。

l  连接断开

移除broker上的生产者信息

负载均衡

     生产者时间没有关系,每个生产者向队列轮流发送消息

四、broker集群配置方式及优缺点

1.      单个 master

   这种方式风险较大,一旦broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

2.      多 master 模式

   一个集群无 slave,全是 master,例如 2 个 master 或者 3 个 master

   优点:配置简单,单个master 宕机或重启维护对应用无影响,在磁盘配置为 raid10 时,即使机器宕机不可恢复情况下,由与 raid10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。

   缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。

   ###  先启动 nameserver,例如机器 ip 为:172.16.8.106:9876

1

nohup sh mqnamesrv &

   ###  在机器 a,启动第一个 master

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-noslave/broker-a.properties &

   ###  在机器 b,启动第二个 master

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-noslave/broker-b.properties &

3.      多 master 多 slave 模式,异步复制

   每个 master 配置一个 slave,有多对master-slave,ha 采用异步复制方式,主备有短暂消息延迟,毫秒级。

   优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 master 宕机后,消费者仍然可以从 slave 消费,此过程对应用透明。不需要人工干预。性能同多 master 模式几乎一样。

   缺点:master 宕机,磁盘损坏情况,会丢失少量消息。

   ###  先启动 nameserver,例如机器 ip 为:172.16.8.106:9876

1

nohup sh mqnamesrv &

   ###  在机器 a,启动第一个 master

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-async/broker-a.properties &

   ###  在机器 b,启动第二个 master

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-async/broker-b.properties &

   ###  在机器 c,启动第一个 slave

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-async/broker-a-s.properties &

   ###  在机器 d,启动第二个 slave

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-async/broker-b-s.properties &

4.      多 master 多 slave 模式,同步双写

   每个 master 配置一个 slave,有多对master-slave,ha 采用同步双写方式,主备都写成功,向应用返回成功。

   优点:数据与服务都无单点,master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高

   缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 rt 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。

   ###  先启动 nameserver,例如机器 ip 为:172.16.8.106:9876

1

nohup sh mqnamesrv &

   ###  在机器 a,启动第一个 master

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-sync/broker-a.properties &

   ###  在机器 b,启动第二个 master

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-sync/broker-b.properties &

   ###  在机器 c,启动第一个 slave

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-sync/broker-a-s.properties &

   ###  在机器 d,启动第二个 slave

1

nohup sh mqbroker -n 172.16.8.106:9876 -c$rocketmq_home/conf/2m-2s-sync/broker-b-s.properties &

   以上 broker 与 slave 配对是通过指定相同的brokername 参数来配对,master 的 brokerid 必须是 0,slave的brokerid 必须是大与 0 的数。另外一个 master 下面可以挂载多个 slave,同一 master 下的多个 slave 通过指定不同的 brokerid 来区分。

五、安装 

1.      安装

下载rocketmq,在每个节点,解压到指定目录

1
2
alibaba-rocketmq-3.2.6.tar.gz
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz  -c /usr/local

解压后的文件夹:alibaba-rocketmq

进入bin目录

   注:rocketmq需要jdk1.7及以上版本

2.      启动nameserver

1
2
3
[root@m106 2m-2s-sync]# nohup sh mqnamesrv &
[2] 17938
[root@m106 2m-2s-sync]# nohup: ignoring input and appending output to `nohup.out'

查看nohup.out文件中:

the name server boot success.表示nameserver启动成功

jps查看nameserver进程

3.      启动brokerserver a, brokerserver b

在m106上启动master a

1
2
[root@m106 bin]# nohup sh mqbroker -n 172.16.8.106:9876 -c ../conf/2m-noslave/broker-a.properties &
[1] 17206

在m107上启动master b

1
2
[root@m107 bin]# nohup sh mqbroker -n 172.16.8.106:9876 -c ../conf/2m-noslave/broker-b.properties &
[1] 14488

jps查看服务启动情况

1
2
3
4
5
6
7
[root@m106 bin]# jps
12494 hregionserver
12240 kafka
16556 datanode
18499 namesrvstartup
13101 runjar
17210 brokerstartup

4.      创建topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@m106 bin]# sh mqadmin updatetopic
usage: mqadmin updatetopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
       -t <arg> [-u <arg>] [-w <arg>]
 -b,--brokeraddr <arg>       create topic to which broker
 -c,--clustername <arg>      create topic to which cluster
 -h,--help                   print help
 -n,--namesrvaddr <arg>      name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -o,--order <arg>            set topic's order(true|false
 -p,--perm <arg>             set topic's permission(2|4|6), intro[2:r; 4:w; 6:rw]
 -r,--readqueuenums <arg>    set read queue nums
 -s,--hasunitsub <arg>       has unit sub (true|false
 -t,--topic <arg>            topic name
 -u,--unit <arg>             is unit topic (true|false
 -w,--writequeuenums <arg>   set write queue nums

实例:

1
2
3
[root@m106 bin]# sh mqadmin updatetopic -n 172.16.8.106:9876 -c defaultcluster -t topictest1
create topic to 172.16.8.107:10911 success.
topicconfig [topicname=topictest1, readqueuenums=8, writequeuenums=8, perm=rw-, topicfiltertype=single_tag, topicsysflag=0, order=false]

5.      删除topic

1
2
3
[root@m106 bin]# sh mqadmin deletetopic -n 172.16.8.106:9876 -c defaultcluster -t topictest1
delete topic [topictest1] from cluster [defaultcluster] success.
delete topic [topictest1] from nameserver success.

6.      查看topic信息

1
2
3
4
5
[root@m106 bin]# sh mqadmin topiclist -n 172.16.8.106:9876
benchmarktest
topictest1
broker-a
defaultcluster

7.      查看topic统计信息

1
2
3
4
5
6
7
8
9
10
[root@m106 bin]# sh mqadmin topicstatus -n 172.16.8.106:9876 -t topictest1
#broker name            #qid  #min offset      #max offset             #last updated
broker-a                          0     0                     0                     
broker-a                          1     0                     0                      
broker-a                          2     0                     0                     
broker-a                          3     0                     0                     
broker-a                          4     0                     0                      
broker-a                          5     0                     0                     
broker-a                          6     0                     0                     
broker-a                          7     0                     0

8.      查看所有消费组group

1
[root@m106 bin]# sh mqadmin consumerprogress -n 172.16.8.106:9876

9.      查看指定消费组下的所有topic数据堆积情况

1
[root@m106 bin]# sh mqadmin consumerprogress -n 172.16.8.106:9876 -g consumergroupname

六、使用指南

1.      客户端寻址方式

l   在代码中指定nameserver地址

producer.setnamesrvaddr(“192.168.8.106:9876”);

consumer.setnamesrvaddr(“192.168.8.106:9876”);

l   java启动参数中指定nameserver地址

-drocketmq.namesrv.addr=192.168.8.106:9876

l   环境变量指定nameserver地址·

export namesrv_addr=192.168.8.106:9876

l   http静态服务器寻址

客户端启动后,会定时访问一个静态的http服务器,地址如下:

这个url的返回内容如下:

192.168.8.106:9876

客户端默认每隔2分钟访问一次这个http服务器,并更新本地的nameserver地址。url已经在代码中写死,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

10.232.22.67   jmenv.taobao.net

2.      客户端的公共配置类:clientconfig

 

参数名

默认值

说明

namesrvaddr

 

nameserver地址列表,多个nameserver地址用分号隔开

clientip

本机ip

客户端本机ip地址,某些机器会发生无法识别客户端ip地址情况,需要应用在代码中强制指定

instancename

default

客户端实例名称,客户端创建的多个producer,consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等)

clientcallbackexecutorthreads

4

通信层异步回调线程数

pollnameserverinteval

30000

轮训name server 间隔时间,单位毫秒

heartbeatbrokerinterval

30000

向broker发送心跳间隔时间,单位毫秒

persistconsumeroffsetinterval

5000

持久化consumer消费进度间隔时间,单位毫秒

 

3.      producer配置

 

参数名

默认值

说明

producergroup

default_producer

producer组名,多个producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。

createtopickey

tbw102

在发送消息时,自动创建服务器不存在的topic,需要指定key

defaulttopicqueuenums

4

在发送消息时,自动创建服务器不存在的topic,默认创建的队列数

sendmsgtimeout

10000

发送消息超时时间,单位毫秒

compressmsgbodyoverhowmuch

4096

消息body超过多大开始压缩(consumer收到消息会自动解压缩),单位字节

retryanotherbrokerwhennotstoreok

false

如果发送消息返回sendresult,但是sendstatus!=send_ok,是否重试发送

maxmessagesize

131072

客户端限制的消息大小,超过报错,同时服务端也会限制(默认128k)

transactionchecklistener

 

事物消息回查监听器,如果发送事务消息,必须设置

checkthreadpoolminsize

1

broker回查producer事务状态时,线程池大小

checkthreadpoolmaxsize

1

broker回查producer事务状态时,线程池大小

checkrequestholdmax

2000

broker回查producer事务状态时,producer本地缓冲请求队列大小

 

4.      pushconsumer配置

 

 

参数名

 

默认值

 

说明

consumergroup

default_consumer

consumer组名,多个consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组

messagemodel

clustering

消息模型,支持以下两种1.集群消费2.广播消费

consumefromwhere

consume_from_last_offset

consumer启动后,默认从什么位置开始消费

allocatemessagequeuestrategy

allocatemessagequeueaveragely

rebalance算法实现策略

subscription

{}

订阅关系

messagelistener

 

消息监听器

offsetstore

 

消费进度存

consumethreadmin

10

消费线程池数量

consumethreadmax

20

消费线程池数量

consumeconcurrentlymaxspan

2000

单队列并行消费允许的最大跨度

pullthresholdforqueue

1000

拉消息本地队列缓存消息最大数

pullinterval

0

拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒

consumemessagebatchmaxsize

1

批量消费,一次消费多少条消息

pullbatchsize

32

批量拉消息,一次最多拉多少条

 

 

5.      pullconsumer配置

 

 

参数名

 

默认值

 

说明

consumergroup

 

conusmer组名,多个consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组

brokersuspendmaxtimemillis

20000

长轮询,consumer拉消息请求在broker挂起最长时间,单位毫秒

consumerpulltimeoutmillis

10000

非长轮询,拉消息超时时间,单位毫秒

consumertimeoutmilliswhensuspend

30000

长轮询,consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒

messagemodel

broadcasting

消息模型,支持以下两种:1集群消费 2广播模式

messagequeuelistener

 

监听队列变化

offsetstore

 

消费进度存储

registertopics

 

注册的topic集合

allocatemessagequeuestrategy

 

rebalance算法实现策略

 

 

6.      broker配置参数

查看broker默认配置

sh mqbroker -m

 

 

参数名

 

默认值

 

说明

consumergroup

 

conusmer组名,多个consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组

listenport

10911

broker对外服务的监听端口

namesrvaddr

null

name server地址

brokerip1

本机ip

本机ip地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。

brokername

本机主机名

 

brokerclustername

defaultcluster

broker所属哪个集群

brokerid

0

brokerid,必须是大等于0的整数,0表示master,>0表示slave,一个master可以挂多个slave,master和slave通过brokername来配对

storepathcommitlog

$home/store/commitlog

commitlog存储路径

storepathconsumequeue

$home/store/consumequeue

消费队列存储路径

storepathindex

$home/store/index

消息索引存储队列

deletewhen

4

删除时间时间点,默认凌晨4点

filereservedtime

48

文件保留时间,默认48小时

maxtransferbytesonmessageinmemory

262144

单次pull消息(内存)传输的最大字节数

maxtransfercountonmessageinmemory

32

单次pull消息(内存)传输的最大条数

maxtransferbytesonmessageinmemory

65535

单次pull消息(磁盘)传输的最大字节数

maxtransfercountonmessageindisk

8

单次pull消息(磁盘)传输的最大条数

messageindexenable

true

是否开启消息索引功能

messageindexsafe

false

是否提供安全的消息索引机制,索引保证不丢

brokerrole

async_master

broker的角色

-async_master异步复制master

-sync_master同步双写master

-slave

flushdisktype

async_flush

刷盘方式

-async_flush异步刷盘

-sync_flush同步刷盘

cleanfileforciblyenable

true

磁盘满,且无过期文件情况下true表示强制删除文件,优先保证服务可用

false标记服务不可用,文件不删除