欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMq 集群方式搭建 步骤教学包教包会

程序员文章站 2022-07-15 07:54:26
...

mq集群方式搭建

有段时间没写这些技术文章了, 今天抽空写一点,不然自己都快忘记了
这篇文章记录了rocketmq 集群方式搭建的过程, 也是自己半天的成果记录吧! 感兴趣的朋友点个赞在走呗!

好了,废话不多,下面开搞。

本文章参考https://blog.csdn.net/qq_35400008/article/details/82467562#comments 这个博客文章编写

准备工作

第一步:关闭要搭建的所有机器的防火墙
第二步:每台机器执行下如下步骤

[aaa@qq.com ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[aaa@qq.com~]# setenforce 0
[aaa@qq.com~]# getenforce

第三步:所有机器装好jdk, maven , zip , unzip , ssh 免密登录

配置crt连接: https://blog.csdn.net/cmqwan/article/details/61932792
安装maven参考老哥博客:  https://www.cnblogs.com/clicli/p/5866390.html
安装zip,unzip参考: http://www.rpmfind.net/linux/rpm2html/search.php?query=zip&submit=Search+...&system=&arch=
安装ssh参考: https://blog.csdn.net/m0_37590135/article/details/74275859
jdk自己百度哈, 很多参考博客的!

第四步: 如下命令是ssh机器之间copy用的命令

scp -r /home/administrator/test/ aaa@qq.com:/root/

RocketMq 集群方式搭建 步骤教学包教包会
第五步: 下载完成后, 解压

unzip  rocketmq-all-4.4.0-bin-release.zip 

第六步:进入解压后的文件夹rocketmq-bin4.4.0 , 在文件夹里面新建logs , data/store, data2/store 目录
RocketMq 集群方式搭建 步骤教学包教包会
第七步:安装顺序修改bin下面的几个启动文件, 因默认配置内存空间太大,本地启动会报错

1. vim runbroker.sh    对应地方更改为  -server -Xms512m -Xmx512m -Xmn256m
2. vim runserver.sh (同样的道理) -server -Xms512m -Xmx512m -Xmn126m -XX:PermSize=128m -XX:MaxPermSize=320m
3. vim tools.sh           -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m

第八步:到rocketmq-bin4.4.0/conf/2m-2s-async 下 修改这四个文件
RocketMq 集群方式搭建 步骤教学包教包会

注: 这里说明下哈, 我是用了三台机器,所有配置了130, 131和132一样的,你们2台机器完全可以用,131和132配置一台就可以了哈,ip自行更改哈。

第九步: 130主机器修改如下配置文件, broker-a.properties broker-b-s.properties两个文件 内容分别如下
broker-a.properties

brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
brokerIP1=192.168.175.130
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存300W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

broker-b-s.properties

brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
#nameserver地址,分号分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.130
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog2
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue2
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index2
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint2
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort2
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存300W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

第十步: 131, 132 机器只修改 broker-b.properties 和broker-a-s.properties 内容分别如下:
broker-b.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存300W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

broker-a-s.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

listenPort=10921
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

第十一步: 启动


三台都执行:

nohup sh bin/mqnamesrv > ./logs/namesrvrun.log 2>&1 &

130机器执行: 
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b-s.log 2>&1 &

131, 132 机器执行:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties  -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a-s.log 2>&1 &

执行之后,jps结果,有两个brokerstartup就行了, 如果报错的化,看下自己建的logs文件夹日志
RocketMq 集群方式搭建 步骤教学包教包会

好了,到此rocketmq 基础配置就搭建起来了,下面在讲一讲实战代码


导入依赖包

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.4.0</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-all</artifactId>
			<version>3.5.9</version>
			<type>pom</type>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

mq消息发送方


import java.io.UnsupportedEncodingException;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 消息发送者
 * @author LELE
 *
 */
public class Producer {

	public static void main(String[] args) throws MQClientException, InterruptedException {

		// 声明并初始化一个producer
		// 需要一个producer group名字作为构造方法的参数,这里为producer1
		DefaultMQProducer producer = new DefaultMQProducer("producer1");
		producer.setVipChannelEnabled(false);
		// 设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
		// NameServer的地址必须有
		// producer.setClientIP("xxxx");
		// producer.setInstanceName("Producer");
		producer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

		// 调用start()方法启动一个producer实例
		producer.start();

		// 发送1条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
		try {
			for(int i=0;i<30000;i++) {
				// 封装消息
				Message msg = new Message("TopicTest", // topic
						"TagA", // tag
						("Hello RocketMQ--------"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
				);
				// 调用producer的send()方法发送消息
				// 这里调用的是同步的方式,所以会有返回结果
				SendResult sendResult = producer.send(msg);
				// 打印返回结果
				System.out.println(sendResult);
			}
			
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		// 发送完消息之后,调用shutdown()方法关闭producer
		System.out.println("send success");
		producer.shutdown();
	}

}

消息消费者


import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 消息接收者, 需要服务器启动mq服务
 * @author LELE
 *
 */
public class Consumer {

	public static void main(String[] args) throws MQClientException {

		// 声明并初始化一个consumer
		// 需要一个consumer group名字作为构造方法的参数,这里为consumer1
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
		// consumer.setVipChannelEnabled(false);
		// 同样也要设置NameServer地址
		consumer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

		// 这里设置的是一个consumer的消费策略
		// CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
		// CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
		// CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
		consumer.subscribe("TopicTest", "*");

		// 设置一个Listener,主要进行消息的逻辑处理
		consumer.registerMessageListener(new MessageListenerConcurrently() {

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

				MessageExt msg = msgs.get(0);

				if (msg.getTopic().equals("TopicTest")) {

					// 执行TopicTest1的消费逻辑

					System.out.println("TagA:" + new String(msg.getBody()));

				}

				System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()
						+ "----------------------------------------------------------------------------------");
				// 返回消费状态
				// CONSUME_SUCCESS 消费成功
				// RECONSUME_LATER 消费失败,需要稍后重新消费
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		// 调用start()方法启动consumer
		consumer.start();
		System.out.println("Consumer Started.");

	}
}

启动开始尽情玩耍吧,少年, 记得点赞哦!