RocketMQ-后台监控和HelloWorld
程序员文章站
2022-07-15 08:04:22
...
使用RocketMQ命令来查看集群状态,查看topic信息时是比较麻烦的,而且非常的不直观,功能不全面。这个时候,我们可以使用一些web项目来管理rocketmq。
因为我使用的RocketMQ版本是3.2.6,也是阿里将RocketMQ贡献给apache开源社区前的版本。所以我使用的是一个老版本的工具。如果你的是贡献之后的版本,可以参考下面的网址来搭建监控平台。
开始搭建:
1、首先到这个百度网盘连接去下载压缩包,解压后import到MyEclipse等等ide打包成war包。
https://pan.baidu.com/s/1mhM0cDM
2、然后将war包放到linux下安装的tomcat的webapps****意:记得安装好JDK和Tomcat)
3、修改Config.properties文件。第一,我们可以等Tomcat启动时将war解压后再修改,第二,我们直接自己解压修改就好了。
4、当然是自己解压修改了。
创建一个文件夹来存放解压后的项目
# mkdir /usr/locla/apache-tomcat-9.0.10/webapps/rocketmq-console
解压war包
# unzip xxx.war -d rocketmq-console/
修改Config.properties
# vim rocket-console/WEB-INF/classes/config.properties
5、修改rockemq.namesrv.addr,记得带上端口号9876:
6、启动tomcat
# /usr/local/apache-tomcat-9.0.10/bin/startup.sh
7、如果我们已经开启了RocketMQ的集群,就能看到类似下面的信息了:
8、上个HelloWorld的代码测试一下RocketMQ:
Producer:
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
//绑定nameserver的ip地址
producer.setNamesrvAddr("192.168.1.111:9876;192.168.1.112:9876");
//启动
producer.start();
System.out.println("producer start....");
for(int i=1;i<=100;i++){
Message msg = new Message();
msg.setTopic("TopicQuickStart"); //消息主题
msg.setTags("TagA"); //消息主题标签,一个主题能有多个标签
msg.setBody(("Hello world"+i).getBytes()); //消息主体
SendResult sendResult = producer.send(msg);
System.out.println(sendResult); //sendResult包含messageId....等信息
}
//最后记得释放producer
producer.shutdown();
}
}
Consumer:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
consumer.setNamesrvAddr("192.168.1.111:9876;192.168.1.112:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
* 第一个参数是主题名称
* 第二个参数是主题标签表达式:*表示所有。如果是多个可以这么写:tagA||tagB
*/
consumer.subscribe("TopicQuickStart", "*");
/**
* 添加监听器来消费消息(批量消费)
* consumer可以设置一次消费多少条
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : messages){
String topic = msg.getTopic();
String tags = msg.getTags();
String body = new String(msg.getBody(),"utf-8");
System.out.println("收到的消息:"+topic+" "+tags+" "+body);
}
} catch (Exception e) {
e.printStackTrace();
//失败的话稍后再给Consumer发送消息
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//返回标识:成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start(); //最后记得启动
System.out.println("Consumer start.....");
}
}
9、执行Producer:
可以发现,数据不定向地往两个Master里头放。做到了负载均衡了。
后台也能看到数据了。
我们还可以根据返回数据的msgid到后台查看消息的详细数据。
10、执行Consumer:
控制台也打印了被消费的消息了
再看看后台: