欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ的安装使用以及踩过的坑

程序员文章站 2022-07-15 08:05:52
...

  好久没写博客了,由于最近刚工作需要用到RocketMQ,所以自己也简单搭建了一下RocketMQ,本文为最简单的搭建方法,只有一个namesrv和一个broker,本文只提供搭建方法、demo和踩过的坑,并不解释RocketMQ的原理等,至于原理,请同学们自己度娘或者点本文下面的连接。

1.所需软件:

   Cent OS

  alibaba-rocketmq-3.2.6.tar.gz,在进行安装前请确认安装上了jdk和maven并配置了环境变量

2. 安装方法:

1)解压到 /usr/local下

2)cd bin目录下

3)赋予权限 chmod +x *

4)修改hosts文件:vim /etc/hosts

添加信息:192.168.0.11 mqnameserver1

   192.168.0.11 broker-a

(前面地址为你自己虚拟机的地址,如果不知道的ifconfig一下)

5)配置文件模板生成

cd /bin/

mkdir ../conf/me-2m-2s-async/

cd /me-2m-2s-async

vim broker.p

 

添加如下内容:

namesrvAddr=mqnameserver1:9876

 

brokerIP1=192.168.0.11

 

brokerName=broker-a

 

brokerClusterName=TestCluster

 

brokerId=0

 

autoCreateTopicEnable=true

 

autoCreateSubscriptionGroup=true

 

rejectTransactionMessage=false

 

fetchNamesrvAddrByAddressServer=false

 

storePathRootDir=/root/store

 

storePathCommitLog=/root/store/commitlog

 

flushIntervalCommitLog=1000

 

flushCommitLogTimed=false

 

deleteWhen=04

 

fileReservedTime=72

 

maxTransferBytesOnMessageInMemory=262144

 

maxTransferCountOnMessageInMemory=32

 

maxTransferBytesOnMessageInDisk=65536

 

maxTransferCountOnMessageInDisk=8

 

accessMessageInMemoryMaxRatio=40

 

messageIndexEnable=true

 

messageIndexSafe=false

 

haMasterAddress=

 

brokerRole=ASYNC_MASTER

 

flushDiskType=ASYNC_FLUSH

 

cleanFileForciblyEnable=true

 

6) 启动namesrvbroker

    cd /bin

nohup sh mqnamesrv &

nohup sh mqbroker -c ../conf/me-2m-2s-async/broker.p

如果出现boot success则启动成功,

如果出现:

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0  

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0  

Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.  

Error: Could not find or load main class com.alibaba.rocketmq.namesrv.NamesrvStartup  

 

解决方法:

echo "ROCKETMQ_HOME=`pwd`" >> ~/.bash_profile  

source ~/.bash_profile   

 

如果有内存不足错误出现,修改runbroker.sh  runserver.sh文件的最小堆最大堆等。

两个修改方法基本一致,展示一个:

vim runbroker.sh

 RocketMQ的安装使用以及踩过的坑

都改成256m

 

查询集群状态命令:

./mqadmin clusterlist -n 192.168.147.129:9876,可以看到

 RocketMQ的安装使用以及踩过的坑

下面为Demo

Producer.class

package com.ac.test;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class Producer {
	public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		DefaultMQProducer producer=new DefaultMQProducer("group");
	    producer.setNamesrvAddr("192.168.147.129:9876");
	    producer.start();
	    for(int i=0;i<10;i++){
	    	Message message=new Message("orders", ("order"+i).getBytes());
	    	SendResult result=producer.send(message);
	    	System.out.println(result);
	    	System.out.println(message+" send out!");
	    	Thread.sleep(100);
	    	
	    }		
		producer.shutdown();
	}

	

}

Consumer.class

package com.ac.test;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {
	/**
	 * @Desp: push模式
	 * @param args
	 */
	public static void main(String[] args) {
		DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("consumers");
		consumer.setNamesrvAddr("192.168.147.129:9876");
		try {
			//订阅消息
			consumer.subscribe("orders", null);
			//程序第一次启动为消息队列头取数据
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
					// TODO Auto-generated method stub
					Message message=list.get(0);
					System.out.println(new String(message.getBody()));
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
			
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
		
		
	}

}

启动顺序无所谓,但push模式最好先启动producer,可以看到结果如下:

 RocketMQ的安装使用以及踩过的坑

 

Demo中可能遇到的问题,

1. 可能遇到no topic的情况,可能由于部分jar包没有导入进去

建议把jar包都导入里面,当然工作中不可能这么干,为了省事我都导入进去了

 

2. 可能会遇到如下问题

com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [3003]ms, Topic: WaybillTopicBOSServer, BrokersSent: [broker-a, null, null]

如果遇到这类问题请把Linux防火墙关闭,另外看下是否缺少第5-6个步骤

参考链接:

1.https://www.bilibili.com/video/av11074519/

2.https://my.oschina.net/xcafe/blog/814135

3.https://github.com/alibaba/RocketMQ/issues/44

4.http://blog.csdn.net/u011579004/article/details/73822106