欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ原理及集群搭建步骤

程序员文章站 2022-03-23 12:54:01
...

为什么使用RocketMQ?

      RocketMQ是一款分布式队列模型消息中间件,是由阿里巴巴团队设计的,具有以下特点:

1、亿级消息堆积能力,消息堆积后,写入低延迟。

2、支持重试机制

3、支持持久化机制

4、支持Topic与Queue两种模式

5、能够保证严格的消息顺序、事务消息

6、强调集群无单点,可扩展

7、丰富的消息拉取模式

8、历经多次天猫双十一海量消息考验

9、RocketMQ是纯java编写,基于通信框架Netty

10、支持上万个队列

plus:    activemq本身不支持集群,若activemq需要集群需要配合zookeeper使用,消息堆积能力较弱。

 

RocketMQ包含的组件

RocketMQ原理及集群搭建步骤

NameServer:单点,供Producer和Consumer获取Broker地址,类似于注册中心

Producer:产生并发送消息

Consumer:接受并消费消息

Broker:消息暂存,消息转发

 

Name Server

Name Server是RocketMQ的寻址服务。用于把Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。

Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。

对于一个Name Server集群列表,客户端连接Name Server的时候,只会选择随机连接一个结点,以做到负载均衡。

Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。

如果中途所有Name Server全都挂了,影响到路由信息的更新,不会影响和Broker的通信。

 

Broker

Broker是处理消息存储,转发等处理的服务器。

Broker以group分开,每个group只允许一个master,若干个slave。

只有master才能进行写入操作,slave不允许。

slave从master中同步数据。同步策略取决于master的配置,可以采用同步双写,异步复制两种。

客户端消费可以从master和slave消费。在默认情况下,消费者都从master消费,在master挂后,客户端由于从Name Server中感知到Broker挂机,就会从slave消费。

Broker向所有的NameServer结点建立长连接,注册Topic信息。

 

RocketMQ环境安装

需要安装jdk环境,且必须是64bit,关闭防火墙

1、添加Host文件

vi /etc/hosts

10.211.55.24 rocketmq-nameserver1
10.211.55.24 rocketmq-master1
10.211.55.26 rocketmq-nameserver2
10.211.55.26 rocketmq-master2

service network restart

如果是克隆虚拟机的话,可能会存在MAC地址冲突

注意: Error:No suitable device found: no device found for connection "System eth0"

解决办法:

(1)ifconfig -a 查看物理 MAC HWADDR 的值

(2)vim 编辑文件 /etc/sysconfig/network-scripts/ifcfg-eth0中修改ifconfig中查出的MAC HWADDR值;

 

2、上传安装包

链接: https://pan.baidu.com/s/1pmxLLa-Nqt1Z-fKpijHQYg 密码: mu3x

上传alibaba-rocketmq-3.2.6.tar.gz文件至/usr/local

tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local

mv alibaba-rocketmq alibaba-rocketmq-3.2.6

ln -s alibaba-rocketmq-3.2.6 rocketmq

 

3、创建存储路径【两台机器】

mkdir /usr/local/rocketmq/store

mkdir /usr/local/rocketmq/store/commitlog

mkdir /usr/local/rocketmq/store/consumequeue

mkdir /usr/local/rocketmq/store/index

 

4、RocketMQ配置文件【两台机器】

vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties

vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties

broker-a.properties将配置brokerName=broker-a|broker-b修改为broker-a

broker-b.properties将配置brokerName=broker-a|broker-b修改为broker-b

 

5、修改日志配置文件【两台机器】

mkdir -p /usr/local/rocketmq/logs

cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

 

6、修改启动NameServer【两台机器】

vim /usr/local/rocketmq/bin/runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"

vim /usr/local/rocketmq/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"

 

7、启动NameServer【两台机器】

cd /usr/local/rocketmq/bin
nohup sh mqnamesrv &

输入回车,若不报任何错误则启动成功,输入jps,能看到2489 NamesrvStartup

 

8、启动

启动BrokerServer A

cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &

输入jps  能看到brokerstartup
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

启动BrokerServer B

cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

如果nameserver启动不起来,则可以查看日志cat nohup.out

 

9、RocketMQ Console

将rocketmq-web-console 部署到webapps目录中。

/usr/local/apache-tomcat-7.0.65/webapps/rocketmq-web-console/WEB-INF/classes/

修改config.properties

rocketmq.namesrv.addr= 10.211.55.24:9876;=10.211.55.26:9876

重启tomcat

 

10.进入后台管理系统查看

http://10.211.55.24:8080/rocketmq-web-console/

如果看到broker-a   broker-b则表示成功

 

 

RocketMQ使用

1、修改pom.xml

<dependencies>
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>3.0.10</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-all</artifactId>
			<version>3.0.10</version>
			<type>pom</type>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
</dependencies>

 

2、生产者

public class Producer {

	public static void main(String[] args) throws MQClientException {
                //1、设置分组
		DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
                //2、服务器集群的ip地址及端口号
		producer.setNamesrvAddr("10.211.55.24:9876;10.211.55.24:9876");
                //3、设置接口名称
		producer.setInstanceName("producer");
                //4、启动
		producer.start();
		try {
			for (int i = 0; i < 10; i++) {
				Thread.sleep(1000); // 每秒发送一次MQ
                //new Message(String topic,String tags,byte[] body)
				Message msg = new Message("my-topic", // topic 主题名称
						"TagA", // tag 临时值
						("mytopic-"+i).getBytes()// body 内容
				);
                                //投递给broker
				SendResult sendResult = producer.send(msg);
				System.out.println(sendResult.toString());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		producer.shutdown();
	}

}

 

3、消费者

public class Consumer {
	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
		consumer.setNamesrvAddr("10.211.55.24:9876;10.211.55.24:9876");
		consumer.setInstanceName("consumer");
		consumer.subscribe("my-topic", "TagA");
                //观察者设计模式,监听
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				for (MessageExt msg : msgs) {
					System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
				}
                                //返回值表示消费状态   1.消费成功   2.消费失败
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();
		System.out.println("Consumer Started.");
	}
}

因为是集群环境,在有网络延迟情况下,打印发现msg.getMsgId()有相同值,但相同的MsgId,brokerName不同,因为均摊。

所以在avtiveMQ中消息id可以作用全局唯一id,而在rocketmq里面却不能,它的消息id唯一性只对一个服务器而言是唯一的,在集群环境下,就可能不唯一了。

 

 

RocketMQ重试机制

MQ 消费者的消费者抛出异常或消费者没有及时消费时,即没有将消费结果通知生产者,可以通过设置返回状态达到消息重试的结果。如果try catch了异常,则不重试,除非在catch内书写  return  ConsumerConcurrentlyStatus.RECONSUME_LATER

MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

 

解决RocketMQ幂等性问题(重复消费问题)

由于activemq重试时,每次重试的消息id一致,而rocketMQ重试时,每次重试的消息不一致,所以不能使用消息id来判断是否重复消费。

解决办法:

1、使用自定义全局ID判断幂等,例如流水ID、订单号,也就是说每次重试发送消息,需要id一致。

2、使用msg.setKeys进行区分

 

解决思路:

使用全局唯一id,即每次发起重试请求,所携带的id必须一致。判断思路:当第一次消费结束后把该消息所携带的全局id放入redis中,那么假设发起第二次重试时,判断该消息的id与redis中是否相同,如果相同则视为重复消费,则调用return ConsumeConcurrentlyStatus.CONSUME_SUCCESS,无需重试。

下面我用map集合模拟redis,具体代码演示:

生产者设置唯一id   msg.setKeys()

public class Producer {
	public static void main(String[] args) throws MQClientException {
		DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
		producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
		producer.setInstanceName("producer");
		producer.start();
		try {
			for (int i = 0; i < 1; i++) {
				Thread.sleep(1000); // 每秒发送一次MQ
				Message msg = new Message("itmayiedu-topic", // topic 主题名称
						"TagA", // tag 临时值
						("myTopic1" + i).getBytes()// body 内容
				);
                //setKeys(value)只要是全局唯一id即可,可以使用uuid,可以使用业务唯一id
				msg.setKeys(System.currentTimeMillis() + "");
				SendResult sendResult = producer.send(msg);
				System.out.println(sendResult.toString());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		producer.shutdown();
	}

}

 

 

消费者判断全局id是否已在redis存在,已存在则重复消费。

注意:消息可以重复发送,但是业务不能重复执行,所以这个判断redis中是否存在的代码需要放在代码的第一行,如果存在则直接返回给生产者表示成功,无需重试。

	//模拟redis
    static private Map<String, String> logMap = new HashMap<>();

	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

		consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
		consumer.setInstanceName("consumer");
		consumer.subscribe("myTopic1", "TagA");

		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				String key = null;
				String msgId = null;
				try {
					for (MessageExt msg : msgs) {
						key = msg.getKeys();
                                   //判断放在第一行,如果id已存在redis中,告诉生产者成功。
						if (logMap.containsKey(key)) {
							// 无需继续重试。
							System.out.println("key:"+key+",无需重试...");
							return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
						}
						msgId = msg.getMsgId();
						System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
						int i = 1 / 0;
					}

				} catch (Exception e) {
					e.printStackTrace();
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				} finally {
                        //在finally块把全局唯一id放入redis中,用于重复消费场景判断
					logMap.put(key, msgId);
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();
		System.out.println("Consumer Started.");
	}

 

 

 

 

 

 

 

 

 

相关标签: rocketmq