欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java RocketMQ快速入门基础知识

程序员文章站 2022-04-06 13:24:50
如何使用 1、引入 rocketmq-client org.apache.rock...

如何使用

1、引入 rocketmq-client

<dependency>
<groupid>org.apache.rocketmq</groupid>
<artifactid>rocketmq-client</artifactid>
<version>4.1.0-incubating</version>
</dependency>

2、编写producer

defaultmqproducer producer = new defaultmqproducer("producer_demo");
//指定nameserver地址
producer.setnamesrvaddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的
/**
* producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();

for (int i = 0; i < 997892; i++) {
try {
//构建消息
message msg = new message("topictest" /* topic */,
"taga" /* tag */,
("测试rocketmq" + i).getbytes(remotinghelper.default_charset)
);
//发送同步消息
sendresult sendresult = producer.send(msg);
system.out.printf("%s%n", sendresult);
} catch (exception e) {
e.printstacktrace();
thread.sleep(1000);
}
}
producer.shutdown();

3、编写consumer

/**
* consumer group,非常重要的概念,后续会慢慢补充
*/
defaultmqpushconsumer consumer = new defaultmqpushconsumer("consumer_demo");
//指定nameserver地址,多个地址以 ; 隔开
consumer.setnamesrvaddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的
/**
* 设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);
consumer.subscribe("topictest", "*");
consumer.registermessagelistener(new messagelistenerconcurrently() {
@override
public consumeconcurrentlystatus consumemessage(list<messageext> msgs,
consumeconcurrentlycontext context) {
try {
for(messageext msg:msgs){
string msgbody = new string(msg.getbody(), "utf-8");
system.out.println(" messagebody: "+ msgbody);//输出消息内容
}
} catch (exception e) {
e.printstacktrace();
return consumeconcurrentlystatus.reconsume_later; //稍后再试
}
return consumeconcurrentlystatus.consume_success; //消费成功
}
});
consumer.start();
system.out.printf("consumer started.%n");

4、说明

各位根据自己的环境,修改namesrvaddr的值,我的集群请参考:rocketmq集群部署配置。稍后通过rocketmq管控台就可以看到之前搭建的多master多slave模式,异步复制集群模式。

5、通过rocketmq管控台

rocketmq-console-ng获取方式为:rocketmq-console-ng,之后通过mavne进行编译获取jar,命令如下:

mvn clean package -dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\boot-inf\classes\application.properties文件,根据自己的namesrvaddr进行修改rocketmq.config.namesrvaddr的值。

直接启动:

java -jar rocketmq-console-ng-1.0.0.jar

java RocketMQ快速入门基础知识

管控台是基于springboot的,的确springboot非常方便和非常火了,所以有必要去学习下springboot了(其实还是spring系列,所以spring也必要深入学习下),稍后通过管控台进行观察运行。

6、运行观察

一个好的习惯是先运行consumer,之后在运行producer,之后通过rocketmq-console-ng管控台观察

java RocketMQ快速入门基础知识

运行完成之后,的确broker-a的数据加上broker-b的数据量就等于我们发送的数据量,而且slave的数量也master的数量也是一致的,效果如下:

java RocketMQ快速入门基础知识

查看发送这些数据,2台机器的磁盘情况如下:

java RocketMQ快速入门基础知识

java RocketMQ快速入门基础知识

到目前位置,关于rocketmq快速入门就结束了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。