RocketMQ 发送消息的基本案例
RocketMQ 消息发送的基本样例
“一发、一存、一消费” 是消息中间件的本质,本文简单的记录来RocketMQ消息的基本样例,包含消息的发送(同步消息/异步消息/单向消息)、消息消费(负载均衡模式/广播模式)、顺序消息、延时消息、批量消息以及事务消息。
首先在测试项目中引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
一、基本样例
1、消息的发送
消息发送可以拆分为如下几个基本步骤:
1. 创建消息生产者producer,并制定生产者组名
2. 指定nameserver地址
3. 启动producter
4. 创建消息对象,指定topic、tag和消息体
5. 发送消息
6. 关闭生产者
a)发送同步消息
在重要的通知消息、SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。
public class BaseProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("base-group");
//2.指定NameServer地址
producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.启动生产者
producer.start();
//每隔一秒发送1条消息,循环发送100条
for (int i = 0; i < 100; i++) {
String msgBody = "This is a msg to MQ" + i;
System.out.println("发送消息:"+msgBody);
//4.创建消息对象 指定主题、标签、消息体
Message message = new Message("base-topic", "base-tag",
msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送顺序消息
SendResult result = producer.send(message);
//获取发送结果
System.err.println("发送响应:MsgId:【" + result.getMsgId()
+ "】,发送状态:【" + result.getSendStatus()+"】");
//线程休眠一秒再去发送下一条消息
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者
producer.shutdown();
}
}
启动执行main方法,控制台得到如下打印输出:
b)发送异步消息
异步传输通常用于对时间敏感的业务场景中,如果发送端不能容忍长时间的等待broker的响应的情况下可以选择发送异步消息。
与发送同步消息基本上一致,只有在第五步消息发送环节,通过提供的public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException 方法来发送异步消息。在发送时创建一个SendCallback接口的匿名内部实现类,实现消息发送成功的回调方法onSuccess(SendResult sendResult),以及消息发送失败出现异常时的onException(Throwable throwable)方法。
public class AsyncProducer1 {
public static void main(String[] args) throws Exception{
//1.创建消息生产者producer,并制定生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("async-group");
//2.指定NameServer地址
producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.启动生产者
producer.start();
//循环发送100条消息
for (int i = 0; i < 100; i++) {
String msgBody = "This is an async msg to MQ" + i;
System.out.println("发送消息:"+msgBody);
//4.创建消息对象 指定主题、标签、消息体
Message message = new Message("async-topic", "async-tag",
msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送异步消息
producer.send(message, new SendCallback() {
/**
* 发送成功的回调函数
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:"+sendResult);
}
/**
* 发送异常的回调函数
* @param throwable
*/
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常:"+throwable);
}
});
}
//6.关闭生产者
producer.shutdown();
}
}
启动执行main方法,控制台得到如下打印输出:
c)单向发送消息
单向消息一般用于不是特别关心发送结果的场景,例如发送日志。
发送单向消息,只是在基本单顺序消息发送案例的基础上使用public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException 方法对消息进行发送。
public class OnewayProducer1 {
public static void main(String[] args) throws Exception{
//1.创建消息生产者producer,并制定生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("one-way-group");
//2.指定NameServer地址
producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.启动生产者
producer.start();
//循环发送10条消息
for (int i = 0; i < 10; i++) {
String msgBody = "This is an one way msg to MQ" + i;
System.out.println("发送消息:"+msgBody);
//4.创建消息对象 指定主题、标签、消息体
Message message = new Message("one-way-topic", "one-way-tag",
msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送单向消息
producer.sendOneway(message);
}
//6.关闭生产者
producer.shutdown();
}
}
2、消费消息
消费消息可以分为如下几个步骤:
1. 创建消费者consumer,制定消费者组名
2. 指定nameserver地址
3. 订阅主题topic和tag
4. 设置回调函数,处理消息
5. 启动消费者consumer
消费者DefaultMQPushConsumer提供了public void setMessageModel(MessageModel messageModel)方法来设置消费的模式。
MessageModel.BROADCASTING 广播模式
MessageModel.CLUSTERING 负载均衡模式(默认)
a)负载均衡模式(默认)
负载均衡模式指的是:一个消费者组内所有对消费者,共同承担消息的消费,每一个消费者消费的消息都是不同的。
public class BaseConsumer {
public static void main(String[] args) throws Exception{
//1.创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("base-group");
//2.指定nameserver地址
consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.订阅主题topic和tag
consumer.subscribe("base-topic", "*");
//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
System.err.println("消费消息: " + new String(messageExt.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
}
});
//5.启动消费者
consumer.start();
}
}
c)广播模式
广播模式是指组内的所有消费者都会消费全量的消息,在在消费者启动前,设置消费模式为MessageModel.BROADCASTING即可。
public class BaseConsumer {
public static void main(String[] args) throws Exception{
//1.创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("base-group");
//2.指定nameserver地址
consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.订阅主题topic和tag
consumer.subscribe("base-topic", "*");
//设定消费模式 【负载均衡模式(默认)/广播模式MessageModel.BROADCASTING】
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
System.err.println("消费消息: " + new String(messageExt.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
}
});
//5.启动消费者
consumer.start();
}
}
二、顺序消息
消息中间件在数据结构上是一种先进先出的数据结构,消息的生产者将消息发送到broker中,存储在队列中,但实际上在一个broker中队列的数量往往大于一个,例如下图即为控制台中展示的broker中拥有四个队列。
可以得出,当有消息发送到broker中到时候,在默认的情况下消息会采取Round Robin轮询方式把消息发送到不同的queue(分区队列)。而消费者来消费消息时,是从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的,如何保证消息当顺序消费,是用户在使用时需要结合业务场景来考虑的问题之一。
果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上拉去消息,就保证了顺序。
- 将某些需要保持顺序的局部消息放到broker的同一个队列中。
- 用同一个线程保证其处理broker中的一个队列的消息。
订单模型类
public class OrderStep {
private Long orderId;
private String desc;
public OrderStep(Long orderId, String desc) {
this.orderId = orderId;
this.desc = desc;
}
public OrderStep() {
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
public static List<OrderStep> buildOrders(){
List<OrderStep> orderlist = new ArrayList<>();
Long[] ids = {1001l,1002l,1003l,1004l};
for(Long id : ids){
OrderStep orderStep = new OrderStep(id,"创建");
orderlist.add(orderStep);
OrderStep orderStep = new OrderStep(id,"付款");
orderlist.add(orderStep);
OrderStep orderStep = new OrderStep(id,"发货");
orderlist.add(orderStep);
OrderStep orderStep = new OrderStep(id,"关闭");
orderlist.add(orderStep);
}
return orderlist;
}
}
顺序消息的生产者
消息生产者通过MessageQueueSelector 分区队列选择器,通过订单ID以及自定义的规则,对消息进行分区队列对选择。
public class OrderProducer {
public static void main(String[] args) throws Exception{
//创建消息生产者producer,并制定生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("order-group");
//指定NameServer地址
producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//启动生产者
producer.start();
//构建消息集合
List<OrderStep> orderSteps = OrderStep.buildOrders();
//发送消息
int i = 0 ;
for(OrderStep orderStep : orderSteps){
String body = JSON.toJSONString(orderStep);
Message msg = new Message("OrderTopic","order",""+(i++),body.getBytes());
/**
* 参数一:消息对象
* 参数二:消息队列的选择器
* 参数三:选择队列的业务表识(订单ID)
*/
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
*
* @param list 队列集合
* @param message 消息对象
* @param o 业务标示的参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Long orderId = (long) o;
long index = orderId % list.size(); //订单ID除分区队列总数取模
System.out.println("队列总数【"+list.size()
+"】,当前订单ID【"+orderId+"】选择的队列下标【"+index+"】");
return list.get((int) index);
}
}, orderStep.getOrderId());
System.out.println("发送结果:" + sendResult);
}
producer.shutdown();
}
}
顺序消息的消费者
顺序消息的消费者 通过MessageListenerOrderly ,让一个队列的消息用一个线程去处理。
public class OrderConsumer {
public static void main(String[] args) throws Exception{
//1.创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-group");
//2.指定nameserver地址
consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.订阅主题topic和tag
consumer.subscribe("OrderTopic", "*");
//4.注册消息监听器 MessageListenerOrderly 一个队列的消息用一个线程去处理
consumer.registerMessageListener(new MessageListenerOrderly() {
/**
*
* @param msgs 获取到的消息
* @param consumeOrderlyContext
* @return
*/
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : msgs){
String str = new String(msg.getBody());
System.out.println("线程名称【"+Thread.currentThread().getName()
+"】消费消息:" + str);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.启动消费者
consumer.start();
}
}
三、延时消息
当发送当消息不需要立即消费,需要设置一定当延迟时间时,可以悬着发送延迟消息,该类型当消息并不会立即被消费,会延迟一段设定好的时间后才会被消费者消费。
RocketMq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
通过Message类中提供的 public void setDelayTimeLevel(int level)方法来设置延迟的级别。
延迟消息的生产者:
public class DelayProducer {
public static void main(String[] args) throws Exception{
//创建消息生产者producer,并制定生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("order-group");
//指定NameServer地址
producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//启动生产者
producer.start();
//构建消息集合
List<OrderStep> orderSteps = OrderStep.buildOrders();
//发送消息
int i = 0 ;
for(OrderStep orderStep : orderSteps){
String body = JSON.toJSONString(orderStep);
Message msg = new Message("DelayTopic","d1",""+(i++),body.getBytes());
msg.setDelayTimeLevel(2); //设置延迟时间级别
/**
* 参数一:消息对象
* 参数二:消息队列的选择器
* 参数三:选择队列的业务表识(订单ID)
*/
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);
}
producer.shutdown();
}
}
四、批量消息
当消息数量很多当时候,可以通过批量发送当形式来提高消息的发送效率。批量发送消息能够显著提高传递小消息的性能。
限制:
- 这批消息应该拥有相同的topic,相同分waitStoremsgOK,而且不能是延迟消息。
- 批消息的总大小不应该超过4MB
批量消息的生产者:
public class BatchProducer {
public static void main(String[] args) throws Exception{
//1.创建消息生产者producer,并制定生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("batch-group");
//2.指定NameServer地址
producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.启动生产者
producer.start();
//4.构建一个消息集合
List<Message> msgs = new ArrayList<>();
for(int i = 1 ; i<=10 ; i++){
Message message = new Message("batch-topic", "batch-tag",
("Hello World" + i ).getBytes());
msgs.add(message);
}
//5.发送批量消息
SendResult send = producer.send(msgs);
System.out.println("批量发送结果:" + send);
//6.关闭生产者
producer.shutdown();
}
}
五、事务消息
RocketMQ提供事务消息的发送,主要是为了解决本地事务执行与消息发送的原子性问题。即解决Producer执行业务逻辑成功之后投递消息可能失败的场景。
事务消息的状态
1.提交状态 :允许消费者消费此消息
2.回归状态 :消息将被删除,不允许被消费
3.中间状态 :代表需要检查消息队列来确定状态
事务消息发送及提交
1.发送消息(half消息)
2.服务端响应写入结果
3.根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
4.根据本地事务状态执行commit或者rollback (commit操作生成消息索引,消息对消费者可见)
事务补偿
1.对没有commit/rolback对事务消息(pending状态对消息)从服务端发起一次回查
2.Producer收到回查消息,检查回查消息对应对的本地事务的状态
3.根据本地事务状态,重新commit汇总rollback,其中,补偿阶段用于解决消息commiy或者rollback发生超时或者失败的情况。
使用的限制
1.事务消息不支持延迟消息和批量消息
2.未来避免单个消息被检查太多次而导致半队列消息积累,可以设置默认单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已检查某条消息超过N次的话(N=transactionCheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
3.事务消息将在broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。
4.事务性消息可能不止一次被检查或消费。
5.提交给用户的目标主题消费可能会失败,目前这依据日志的记录而定,它的高可用性机制来保证,如果希望确保事务消费不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6.事务消息的生产者ID不能与其他类型消费的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。
事务消息的生产者
public class Producer {
public static void main(String[] args) throws Exception{
//1.创建事务消息生产者producer,并制定生产者的组名
TransactionMQProducer producer = new TransactionMQProducer("transaction-group");
//2.指定NameServer地址
producer.setNamesrvAddr("39.108.128.240:9876");
//3.设置事务监听器
producer.setTransactionListener(new MyTransactionListener());
//4.启动生产者
producer.start();
//每隔一秒,循环发送5条消息
for (int i = 0; i < 4; i++) {
String body = "事务消息【"+i+"】";
//创建消息对象 指定主题、标签、消息体 同步和异步是一致到
Message message = new Message("transaction-topic", "T-"+i, body.getBytes());
//同步发送消息
SendResult sendResult = producer.sendMessageInTransaction(message,null);
//打印消息发送的结果
System.out.println("发送结果:"+sendResult);
//线程休眠一秒再去发送下一条消息
TimeUnit.SECONDS.sleep(1);
}
//关闭生产者 不建议在事务消息中关闭,可能回导致事务监听器不执行
//producer.shutdown();
}
}
自定义的事务监听器
public class MyTransactionListener implements TransactionListener {
//在该方法中执行本地的一些事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
if(message.getTags().equals("T-1")){
System.out.println("tag【"+message.getTags()+"】-- 事务提交");
return LocalTransactionState.COMMIT_MESSAGE;
}else if(message.getTags().equals("T-2")){
System.out.println("tag【"+message.getTags()+"】-- 事务回滚");
return LocalTransactionState.ROLLBACK_MESSAGE;
}else {
System.out.println("tag【"+message.getTags()+"】-- 事务结果未知");
return LocalTransactionState.UNKNOW;
}
}
//MQ在没有接收到事务结果时,发送请求来二次核对 ,消息事务状态的回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("MQ开始事务状态的回查");
if(messageExt.getTags().equals("T-3")){
System.out.println("tag【"+messageExt.getTags()+"】-- 事务提交");
return LocalTransactionState.COMMIT_MESSAGE;
}else{
System.out.println("tag【"+messageExt.getTags()+"】-- 仍然结果未知");
return LocalTransactionState.UNKNOW;
}
}
}
事务消息的消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-group");
//2.指定nameserver地址
consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.订阅主题topic和tag
consumer.subscribe("transaction-topic", "*");
//4.设置回调函数 处理消息
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.err.println("消费消息: " + new String(messageExt.getBody()));//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
//5.启动消费者
consumer.start();
//6.打印启动状态
System.out.println("消费者启动成功");
}
}
六、消费消息的过滤
针对消费者, 消费者来决定对哪些消息进行消费,对哪些消息不消费。
提供两种消息过滤的方式
1.依据tag来过滤
2.依据RocketMQ定义了一些基本语法来支持过滤 (只有使用push模式对消费者才能使用SQL92标准对sql语句)
数值比较 例如: > , >= ,< , <= , BETWEEN , = ;
字符比较 例如:= ,<> ,IN;
IS NULL 或者 IS NOT NULL ;
逻辑符号 AND , OR ,NOT;
常量支持类型:
数值 例如:123,3.1415926
字符 例如:‘abc’ 必须用单引号包裹起来
特殊字符 例如:NULL
布尔值 TRUE 或 FALSE
sql模式过滤的消息生产者
通过Message对象提供的 public void putUserProperty(String name, String value) 方法,添加自定义sql查询的标示与内容。
public class Producer {
public static void main(String[] args) throws Exception{
//创建消息生产者producer,并制定生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("sql-filter-group");
//指定NameServer地址
producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//启动生产者
producer.start();
//每隔一秒,循环发送5条消息
for (int i = 0; i < 5; i++) {
String body = "通过tag过滤消息"+i;
//创建消息对象 指定主题、标签、消息体 同步和异步是一致到
Message message = new Message("sql-filter-topic", "sql-filter-tag", body.getBytes());
//设置消息 用户自定义属性
message.putUserProperty("i",String.valueOf(i));
//同步发送消息
SendResult sendResult = producer.send(message);
//打印消息发送的结果
System.out.println("发送结果:"+sendResult);
//线程休眠一秒再去发送下一条消息
TimeUnit.SECONDS.sleep(1);
}
}
}
sql模式过滤的消息消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql-filter-group");
//2.指定nameserver地址
consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
//3.订阅主题topic和tag 依据消息sql过滤器来过滤消息
consumer.subscribe("sql-filter-topic", MessageSelector.bySql("i>=0"));
//4.设置回调函数 处理消息
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.err.println("消费消息:【 " + new String(messageExt.getBody())+"】");
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
//5.启动消费者
consumer.start();
//6.打印启动状态
System.out.println("消费者启动成功");
}
}
参考学习视频 黑马程序员《RocketMQ系统精讲,经受历年双十一狂欢节考验的分布式消息中间件》,图片来源于网络侵删~
上一篇: RocketMq-顺序消息
下一篇: S2SH整合开发——登录
推荐阅读
-
使用Python的Zato发送AMQP消息的教程
-
angular,vue,react的基本语法—双向数据绑定、条件渲染、列表渲染、angular小案例
-
RocketMQ中Producer消息的发送
-
微信公共服务平台开发(.Net 的实现)3-------发送文本消息
-
微信公共服务平台开发(.Net 的实现)7-------发送图文消息
-
使用Kotlin+RocketMQ实现延时消息的示例代码
-
消息持续发送的完整例子
-
rocketmq发送有序消息笔记
-
C#开发微信门户及应用(19)-微信企业号的消息发送(文本、图片、文件、语音、视频、图文消息等)
-
python 发送和接收ActiveMQ消息的实例