RocketMQ的安装使用以及踩过的坑
好久没写博客了,由于最近刚工作需要用到RocketMQ,所以自己也简单搭建了一下RocketMQ,本文为最简单的搭建方法,只有一个namesrv和一个broker,本文只提供搭建方法、demo和踩过的坑,并不解释RocketMQ的原理等,至于原理,请同学们自己度娘或者点本文下面的连接。
1.所需软件:
Cent OS
alibaba-rocketmq-3.2.6.tar.gz,在进行安装前请确认安装上了jdk和maven并配置了环境变量
2. 安装方法:
1)解压到 /usr/local下
2)cd bin目录下
3)赋予权限 chmod +x *
4)修改hosts文件:vim /etc/hosts
添加信息:192.168.0.11 mqnameserver1
192.168.0.11 broker-a
(前面地址为你自己虚拟机的地址,如果不知道的ifconfig一下)
5)配置文件模板生成
cd /bin/
mkdir ../conf/me-2m-2s-async/
cd /me-2m-2s-async
vim broker.p
添加如下内容:
namesrvAddr=mqnameserver1:9876
brokerIP1=192.168.0.11
brokerName=broker-a
brokerClusterName=TestCluster
brokerId=0
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/root/store
storePathCommitLog=/root/store/commitlog
flushIntervalCommitLog=1000
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=72
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
6) 启动namesrv和broker
cd /bin
nohup sh mqnamesrv &
nohup sh mqbroker -c ../conf/me-2m-2s-async/broker.p
如果出现boot success则启动成功,
如果出现:
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Error: Could not find or load main class com.alibaba.rocketmq.namesrv.NamesrvStartup
解决方法:
echo "ROCKETMQ_HOME=`pwd`" >> ~/.bash_profile
source ~/.bash_profile
如果有内存不足错误出现,修改runbroker.sh runserver.sh文件的最小堆最大堆等。
两个修改方法基本一致,展示一个:
vim runbroker.sh
都改成256m。
查询集群状态命令:
./mqadmin clusterlist -n 192.168.147.129:9876,可以看到
下面为Demo:
Producer.class
package com.ac.test;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
DefaultMQProducer producer=new DefaultMQProducer("group");
producer.setNamesrvAddr("192.168.147.129:9876");
producer.start();
for(int i=0;i<10;i++){
Message message=new Message("orders", ("order"+i).getBytes());
SendResult result=producer.send(message);
System.out.println(result);
System.out.println(message+" send out!");
Thread.sleep(100);
}
producer.shutdown();
}
}
Consumer.class
package com.ac.test;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
/**
* @Desp: push模式
* @param args
*/
public static void main(String[] args) {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("consumers");
consumer.setNamesrvAddr("192.168.147.129:9876");
try {
//订阅消息
consumer.subscribe("orders", null);
//程序第一次启动为消息队列头取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
// TODO Auto-generated method stub
Message message=list.get(0);
System.out.println(new String(message.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
启动顺序无所谓,但push模式最好先启动producer,可以看到结果如下:
在Demo中可能遇到的问题,
1. 可能遇到no topic的情况,可能由于部分jar包没有导入进去
建议把jar包都导入里面,当然工作中不可能这么干,为了省事我都导入进去了
2. 可能会遇到如下问题
com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [3003]ms, Topic: WaybillTopicBOSServer, BrokersSent: [broker-a, null, null]
如果遇到这类问题请把Linux防火墙关闭,另外看下是否缺少第5-6个步骤
参考链接:
1.https://www.bilibili.com/video/av11074519/
2.https://my.oschina.net/xcafe/blog/814135
3.https://github.com/alibaba/RocketMQ/issues/44
4.http://blog.csdn.net/u011579004/article/details/73822106
下一篇: 记录一次Flink作业异常的排查过程
推荐阅读
-
解决Django transaction进行事务管理踩过的坑
-
详解安装mitmproxy以及遇到的坑和简单用法
-
firebug如何使用以及firebug安装的图文步骤
-
微信公众号,那些踩过的坑
-
AndroidStudio修改默认C盘配置文件夹(.android.gradle.AndroidStudio)以及修改后避免踩的坑
-
微信小程序订阅消息,我踩过的坑都在这里了!
-
Laravel Intervention/image图片处理扩展包的安装、使用与可能遇到的坑详解
-
Element-UI踩坑之Pagination组件的使用
-
8 种经常被忽视的 SQL 错误用法,你有没有踩过坑?
-
助讯通是什么?助讯通的安装使用教程以及不能登录的解决方法