RocketMQ学习(四):消息类型——延时消息,批量消息,过滤消息,事务消息
目录
本篇博客由于是上篇博客“RocketMQ学习(三):消息类型——发送方式,接收方式,顺序消息”的后续,因此代码只贴了变动部分。
延时消息
比如上传文件,我们可以先上传到临时目录,然后发送一个1h的延时消息,1h后若文件表单没有提交,我们就删除文件释放存储。
生产者:需要在发送消息之前设置延时级别,且目前RocketMQ的延时级别是预设好的,不能自定义精度。
// 可用的级别对应的时间
private String avilibleDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Message msg = new Message("base", "tag4", ("Delay msg-" + i).getBytes());
msg.setDelayTimeLevel(2); // 级别2代表延时5s
producer.send(msg);
消费者:可以打印一下消费到消息的时间和该消息被存储到队列的时间差。
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
list.forEach(item -> {
System.out.println(new String(item.getBody()) + " 延时:" + (System.currentTimeMillis() - item.getStoreTimestamp()) + "ms later");
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
批量消息
之前我们所有的例子的消息都是以循环的方式发送的,这样的效率不高。批量消息的发送能够提高小消息的传递能力。
注意:批量消息中的所有消息的Topic和waitStoreMsgOK必须是一样的,不能使用延时消息,并且这一批消息的大小不能超过1M。
生产者:对于小于1M的消息,只需要将多个Message放到集合里,一起发送即可。
List<Message> messages = new ArrayList<>();
messages.add(new Message("BatchTopic", "tag1", "hello1".getBytes()));
messages.add(new Message("BatchTopic", "tag1", "hello2".getBytes()));
messages.add(new Message("BatchTopic", "tag1", "hello3".getBytes()));
producer.send(messages);
对于超过了1M大小的一批消息,我们需要将其拆分为一批一批小于1M的消息来发送。具体实现是,循环累加集合内的消息大小,并与1M比较,来返回小于1M的部分。
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
/**
* 用于返回不超过1M的消息集合
*/
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 用于存放消息的总长度
int totalSize = 0;
// 遍历以计算消息是否超过1M
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
// 取Topic和消息内容的大小
int tmpSize = message.getTopic().length() + message.getBody().length;
// 取额外属性的大小
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
// 增加日志的开销20字节
tmpSize = tmpSize + 20;
// 单条消息是否超过1M
if (tmpSize > SIZE_LIMIT) {
// 假如当前消息是next()方法的第一次遍历, 则单独返回此消息,否则返回之前已经过遍历的消息
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
// 目前遍历到的所有消息的总大小是否超过1M,超过就返回未超过部分,未超过就加上,进行下一次循环
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
// 返回的是不超过1M的消息集合
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
生产者只需要将消息集合传递给这个迭代器即可。
List<Message> messages = new ArrayList<>();
// 添加若干消息.....
// 把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
过滤消息
1、Tag过滤
在大多数情况下,我们都能使用Tag来过滤消息。
consumer.subscribe("BatchTopic", "tag1");
consumer.subscribe("BatchTopic", "tag1 || tag2");
consumer.subscribe("BatchTopic", "*");
通过不同消息的不同标签实现的过滤方便,但一个消息只能有一个标签,这意味着判断依据只有一个,这对于复杂场景显得有些力不从心。
2、SQL过滤
RocketMQ的SQL过滤功能,能够在发送消息时附带一些属性,在消费者获取时进行一些计算,来筛选消息。
RocketMQ只定义了一些基础语法来支持这个特性,你也可以很容易的扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
支持的常量类型:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
生产者:调用putUserProperty添加属性
for (int i = 0; i < 10; i++) {
Message message = new Message("Filter", "tag1", ("hello" + i).getBytes());
// 添加一个属性
message.putUserProperty("key", String.valueOf(i));
producer.send(message);
}
消费者:在订阅消息时,设置过滤条件
// 只获取key在0-4的消息
consumer.subscribe("Filter", MessageSelector.bySql("key between 0 and 4"));
可以看到,消费者只接收了5个消息(key:0-4)
事务消息
1、事务流程
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
事务消息发送及提交
- 发送消息(half消息)
- 服务端响应消息写入结果
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
事务补偿
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
- Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态
2、代码
生产者需要使用TransactionMQProducer来创建生产者,然后设置本地事务的监听器用于处理本地事务,发送消息时使用sendMessageInTransaction方法。消费者不需要变动。
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.1.1:9876");
// 设置事务消息的监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 根据不同tag做不同操作
if ("TagA".equals(message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if ("TagB".equals(message.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ("TagC".equals(message.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
/**
* 本地事务的回查,UNKNOW状态的消息回调这个方法
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("MQ检查消息Tag【"+messageExt.getTags()+"】的本地事务执行结果");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
String[] tags = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
String tag = tags[i % 3];
Message message = new Message("Transaction", tag, (tag + "消息" + i).getBytes());
// 此处的第二个参数会传递到executeLocalTransaction()方法的第二个参数去
producer.sendMessageInTransaction(message, null);
}
// 不关闭生产者的原因是其要监听回传
// TimeUnit.SECONDS.sleep(5);
// producer.shutdown();
}
}
我们可以看到,只有TagC的消息进入了本地事务的回查,消费者端,只接收了一开始被提交的TagA消息,和回查后提交的TagC消息。
上一篇: 事务特征的四个特征和事务并发问题(面试)
下一篇: 事务的四个特征详解