RocketMQ(三)——HelloWorld
声明:
目录:
1.代码示例
2.代码阐释
3.运行效果
4.内容补充
讲个9·3阅兵时程序员间流传的笑话:
同学们,现在向我们走来的是程序员方阵!他们穿着拖鞋,披着毛巾,左手拿着键盘,右手举着鼠标,腋下夹着USB转换器。他们因睡眠不足而显得精神不振,喊着微弱的口号走过主席台,主席问候:程序员们辛苦了!程序员方队异口同声地答道:Hello World!
——研究一项技术,如果不提及“Hello World”,那指定是外行。
上篇博客,搭了一个最简单的集群——双主群集,这篇博客就利用这个环境,写一个简单的生产者、消费者,来快速体验一下RocketMQ的HelloWorld。
代码示例
Maven配置
引一下jar包,这里还是用3.2.6这一比较经典的版本:pom.xml
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
-
生产者
写一个简单的Producer类,来发送消息:
/**
* Producer,发送消息
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group_name");
producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
producer.start();
for (int i = 0; i < 100; i++) {
try {
Message msg = new Message("TopicTest", // topic
"TagA", // tag
("HelloWorld - RocketMQ" + i).getBytes() // body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
生产者
-
消费者
写一个简单的Consumer类,来接收消息:
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg: msgs) {
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String msgBody = new String(msg.getBody(),"utf-8");
System.out.println("收到消息--" + " topic:" + topic + " ,tags:" + tags + " ,msg:" +msgBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
消费者
代码阐释
根据上面的生产者和消费者,说明几点内容:
-
GroupName:
无论生产者、消费者都必须给出GroupName,而且具有唯一性! -
Topic、Tag:
生产到哪个Topic的哪个Tag下,消费者也是从Topic的哪个Tag进行消费,可见这个Tag有点类似于JMS Selector机制,即实现消息的过滤。 -
NameServer:
生产者、消费者需要设置NameServer地址。
消费方式:这里,采用的是Consumer Push的方式,即设置Listener机制回调,相当于开启了一个线程。
运行效果
光说不练嘴把式,来看一下运行效果:
- 生产者
生产者运行结果
仔细看看生产者结果输出,就会发现,有的消息发往broker-a,有的在broker-b上,自动实现了消息的负载均衡!
- 消费者
消费者运行结果
这里消费消息是没有什么顺序的,以后我们在来谈消息的顺序性。
- 管控台
消费前
会发现消息分布在2个broker上。
消费后
内容补充
-
启动顺序
务必保证先启动消费者进行Topic订阅,然后在启动生产者进行生产
(否则极有可能导致消息的重复消费,重复消费,重复消费!重要的事情说三遍!)。而且在实际开发中,有时候不会批量的处理消息,而是原子性的,单线程的去一条一条的处理消息,这样就是实时的在处理消息。(批量的处理海量的消息,可以考虑Kafka) -
持久化
在ActiveMQ中,生产消息的时候会提供是否持久化的选择,但是对于RocketMQ而言,消息是一定会被持久化的! -
宕机处理
在多Master模式中,如果某个Master进程挂了,显然这台broker将不可用,上面的消息也将无法消费,要知道开源版本的RocketMQ是没有提供切换程序,来自动恢复故障的,因此在实际开发中,我们一般提供一个监听程序,用于监控Master的状态。 -
单批次消息消费数量
上面的消费者采用的是Push Consumer的方式,那么监听的Listener中的消息List到底是多少条呢?虽然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),实际上即使设置了批量的条数,但是注意了,是最大是10,并不意味着每次batch的都是10,只有在消息有挤压的情况下才有可能。而且Push Consumer的最佳实践方式就是一条条的消费,如果需要batch,可以使用Pull Consumer。来做个测试:
1) 改一下消费者代码单批次消息消费数量测试代码
2) 运行效果–先启动消费者,再启动生产者先启消费者后启生产者运行效果
3) 运行效果–先启动生产者(这样消息会有挤压),再启动消费者先启生产者后启消费者运行效果
上一篇: php怎么修改图片
下一篇: 43. Multiply Strings