欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ学习(四):消息类型——延时消息,批量消息,过滤消息,事务消息

程序员文章站 2022-07-14 23:03:51
...

目录

延时消息

批量消息

过滤消息

1、Tag过滤

2、SQL过滤

事务消息

1、事务流程

2、代码


本篇博客由于是上篇博客“RocketMQ学习(三):消息类型——发送方式,接收方式,顺序消息”的后续,因此代码只贴了变动部分。

延时消息

比如上传文件,我们可以先上传到临时目录,然后发送一个1h的延时消息,1h后若文件表单没有提交,我们就删除文件释放存储。

生产者:需要在发送消息之前设置延时级别,且目前RocketMQ的延时级别是预设好的,不能自定义精度。

// 可用的级别对应的时间
private String avilibleDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

Message msg = new Message("base", "tag4", ("Delay msg-" + i).getBytes());
msg.setDelayTimeLevel(2); // 级别2代表延时5s
producer.send(msg);

消费者:可以打印一下消费到消息的时间和该消息被存储到队列的时间差。

consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
    list.forEach(item -> {
        System.out.println(new String(item.getBody()) + "  延时:" + (System.currentTimeMillis() - item.getStoreTimestamp()) + "ms later");
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

批量消息

之前我们所有的例子的消息都是以循环的方式发送的,这样的效率不高。批量消息的发送能够提高小消息的传递能力。

注意:批量消息中的所有消息的Topic和waitStoreMsgOK必须是一样的,不能使用延时消息,并且这一批消息的大小不能超过1M。

生产者:对于小于1M的消息,只需要将多个Message放到集合里,一起发送即可。

List<Message> messages = new ArrayList<>();
messages.add(new Message("BatchTopic", "tag1", "hello1".getBytes()));
messages.add(new Message("BatchTopic", "tag1", "hello2".getBytes()));
messages.add(new Message("BatchTopic", "tag1", "hello3".getBytes()));
producer.send(messages);

对于超过了1M大小的一批消息,我们需要将其拆分为一批一批小于1M的消息来发送。具体实现是,循环累加集合内的消息大小,并与1M比较,来返回小于1M的部分。

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    /**
     * 用于返回不超过1M的消息集合
     */
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        // 用于存放消息的总长度
        int totalSize = 0;

        // 遍历以计算消息是否超过1M
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);

            // 取Topic和消息内容的大小
            int tmpSize = message.getTopic().length() + message.getBody().length;

            // 取额外属性的大小
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }

            // 增加日志的开销20字节
            tmpSize = tmpSize + 20;

            // 单条消息是否超过1M
            if (tmpSize > SIZE_LIMIT) {

                // 假如当前消息是next()方法的第一次遍历, 则单独返回此消息,否则返回之前已经过遍历的消息
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }

            // 目前遍历到的所有消息的总大小是否超过1M,超过就返回未超过部分,未超过就加上,进行下一次循环
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }

        // 返回的是不超过1M的消息集合
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

生产者只需要将消息集合传递给这个迭代器即可。

List<Message> messages = new ArrayList<>();
// 添加若干消息.....

// 把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
    try {
        List<Message> listItem = splitter.next();
        producer.send(listItem);
    } catch (Exception e) {
        e.printStackTrace();
        //处理error
    }
}

过滤消息

1、Tag过滤

在大多数情况下,我们都能使用Tag来过滤消息。

consumer.subscribe("BatchTopic", "tag1");
consumer.subscribe("BatchTopic", "tag1 || tag2");
consumer.subscribe("BatchTopic", "*");

通过不同消息的不同标签实现的过滤方便,但一个消息只能有一个标签,这意味着判断依据只有一个,这对于复杂场景显得有些力不从心。

2、SQL过滤

RocketMQ的SQL过滤功能,能够在发送消息时附带一些属性,在消费者获取时进行一些计算,来筛选消息。

RocketMQ学习(四):消息类型——延时消息,批量消息,过滤消息,事务消息

RocketMQ只定义了一些基础语法来支持这个特性,你也可以很容易的扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

支持的常量类型:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

生产者:调用putUserProperty添加属性

for (int i = 0; i < 10; i++) {
    Message message = new Message("Filter", "tag1", ("hello" + i).getBytes());
    // 添加一个属性
    message.putUserProperty("key", String.valueOf(i));
    producer.send(message);
}

消费者:在订阅消息时,设置过滤条件

// 只获取key在0-4的消息
consumer.subscribe("Filter", MessageSelector.bySql("key between 0 and 4"));

可以看到,消费者只接收了5个消息(key:0-4)

RocketMQ学习(四):消息类型——延时消息,批量消息,过滤消息,事务消息

事务消息

1、事务流程

RocketMQ学习(四):消息类型——延时消息,批量消息,过滤消息,事务消息

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

事务消息发送及提交

  1. 发送消息(half消息)
  2.  服务端响应消息写入结果
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
  4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

事务补偿

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态
  3. 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态

2、代码

生产者需要使用TransactionMQProducer来创建生产者,然后设置本地事务的监听器用于处理本地事务,发送消息时使用sendMessageInTransaction方法。消费者不需要变动。

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.1.1:9876");

        // 设置事务消息的监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 执行本地事务
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 根据不同tag做不同操作
                if ("TagA".equals(message.getTags())) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if ("TagB".equals(message.getTags())) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if ("TagC".equals(message.getTags())) {
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 本地事务的回查,UNKNOW状态的消息回调这个方法
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("MQ检查消息Tag【"+messageExt.getTags()+"】的本地事务执行结果");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();

        String[] tags = {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 10; i++) {
            String tag = tags[i % 3];
            Message message = new Message("Transaction", tag, (tag + "消息" + i).getBytes());
            // 此处的第二个参数会传递到executeLocalTransaction()方法的第二个参数去
            producer.sendMessageInTransaction(message, null);
        }

        // 不关闭生产者的原因是其要监听回传
//        TimeUnit.SECONDS.sleep(5);
//        producer.shutdown();
    }
}

我们可以看到,只有TagC的消息进入了本地事务的回查,消费者端,只接收了一开始被提交的TagA消息,和回查后提交的TagC消息。

RocketMQ学习(四):消息类型——延时消息,批量消息,过滤消息,事务消息