RocketMQ
介绍
RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。
具有以下特性:
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
- 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
- 支持拉(pull)和推(push)两种消息模式
- 单一队列百万消息的堆积能力
- 支持多种消息协议,如 JMS、MQTT 等
- 分布式高可用的部署架构,满足至少一次消息传递语义
- 提供 docker 镜像用于隔离测试和云集群部署
- 提供配置、指标和监控等功能丰富的 Dashboard
专业术语
Producer
消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。
Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。
Consumer
消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。
Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
Topic
Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
Message
Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。
Tag
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
Broker
Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
Name Server
Name Server 为 producer 和 consumer 提供路由信息。
集群部署模式
- 单 master 模式
也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用,适合个人学习使用。 - 多 master 模式
多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。
优点:所有模式中性能最高
缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。 - 多 master 多 slave 异步复制模式
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master
节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。
优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。 - 多 master 多 slave 同步双写模式
同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)
注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低
RocketMQ发送接收消息
1、下载压缩包并构建
unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0/
mvn -Prelease-all -DskipTests clean install –U
漫长的等待。。。
cd distribution/target/apache-rocketmq
2、启动name server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
[aaa@qq.com apache-rocketmq]# tail -f ~/logs/rocketmqlogs/namesrv.log
2018-05-30 11:34:04 INFO main - tls.client.keyPassword = null
2018-05-30 11:34:04 INFO main - tls.client.certPath = null
2018-05-30 11:34:04 INFO main - tls.client.authServer = false
2018-05-30 11:34:04 INFO main - tls.client.trustCertPath = null
2018-05-30 11:34:04 INFO main - Using OpenSSL provider
2018-05-30 11:34:04 INFO main - SSLContext created for server
2018-05-30 11:34:04 INFO NettyEventExecutor - NettyEventExecutor service started
2018-05-30 11:34:04 INFO main - The Name Server boot success. serializeType=JSON
2018-05-30 11:35:04 INFO NSScheduledThread1 - --------------------------------------------------------
2018-05-30 11:35:04 INFO NSScheduledThread1 - configTable SIZE: 0
3、启动broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
[aaa@qq.com apache-rocketmq]# tail -f ~/logs/rocketmqlogs/broker.log
2018-05-30 11:53:14 INFO main - load /root/store/config/consumerFilter.json OK
2018-05-30 11:53:14 INFO main - load /root/store/config/delayOffset.json OK
2018-05-30 11:53:14 INFO main - Set user specified name server address: localhost:9876
2018-05-30 11:53:14 INFO PullRequestHoldService - PullRequestHoldService service started
2018-05-30 11:53:14 INFO main - register broker to name server localhost:9876 OK
2018-05-30 11:53:14 INFO main - The broker[localhost.localdomain, 10.86.6.126:10911] boot success. serializeType=JSON and name server is localhost:9876
2018-05-30 11:53:24 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-05-30 11:53:24 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-05-30 11:53:24 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK
2018-05-30 11:53:54 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK
2018-05-30 11:54:24 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-05-30 11:54:24 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-05-30 11:54:24 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK
2018-05-30 11:54:54 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK
2018-05-30 11:55:24 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-05-30 11:55:24 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-05-30 11:55:24 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest
4、模拟发送/接收消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
5、关闭服务
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
IDEA中模拟收发消息
1、添加Maven依赖
<!-- alibaba rocketmq client -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.3</version>
</dependency>
2、生产者
package com.cqh.Test_alibaba_rocketmq;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import java.util.Date;
/**
* Created by yl1794 on 2018/5/30.
*/
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
// 多个地址之间用;分隔
producer.setNamesrvAddr("10.86.6.126:9876");
producer.setInstanceName("rmq-instance");
producer.start();
try {
// TopicA-test TagA
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,TopicA-test,TagA " + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
// TopicA-test TagB
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicA-test",// topic
"TagB",// tag
(new Date() + "Hello RocketMQ ,TopicA-test,TagB " + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
// TopicC-test TagC
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicC-test",// topic
"TagC",// tag
(new Date() + "Hello RocketMQ ,TopicC-test,TagC " + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
3、消费者
package com.cqh.Test_alibaba_rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* Created by yl1794 on 2018/5/30.
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("10.86.6.126:9876");
consumer.setInstanceName("rmq-instance");
// 设置的是一个consumer的消费策略
// CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
// CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
// CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicA-test", "TagA || TagB");
consumer.subscribe("TopicC-test","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
if(msg.getTopic().equals("TopicA-test")){
if(msg.getTags().equals("TagA")){
System.out.println("TopicA: " + "TagA: " + new String(msg.getBody()));
}else if(msg.getTags().equals("TagB")){
System.out.println("TopicA: " + "TagB: " + new String(msg.getBody()));
}
}else if (msg.getTopic().equals("TopicC-test")) {
System.out.println("TopicC: " + new String(msg.getBody()));
}
}
// 返回消费状态
// CONSUME_SUCCESS 消费成功
// RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
官网地址:http://rocketmq.apache.org/docs/quick-start/