关于RocketMQ你需要掌握哪些点?
程序员文章站
2022-04-22 13:05:01
...
核心组件
- Producer,消息生产者
- Consumer,消息消费者,负责消费消息,服务端向消费者推送消息或者消费者向服务端定时拉取消息。包括ConsumerGroup,逻辑概念通常消费一类的消息,且消费逻辑一致
- NameServer,元数据信息存储,相当于注册中心,集群架构中的组织协调员
- Broker,是RocketMq核心是真正干活的,专门负责消息的发送,接收,默认每10秒同步一次自身情况到Nameserver,否者超过两分钟认为broker失效
- Topic,逻辑概念,不同类型的消息以不同的Topic进行区分,消息队列用于存储消息
消息的发送与接受
- 发送消息可分为同步发送与异步发送
- 消息有多种订阅模式, 例如需要接收某个topic中add类型的消息,
生产者设置:Message message = newMessage("my-topic", "delete", msg.getBytes("UTF-8"));
消费者设置:consumer.subscribe("my-topic", "add || update");
- 消息过滤
根据用户自定义属性进行过滤,过滤表达式类似于SQL的where,如:a> 5 AND b =‘abc’
RocketMQ之顺序消息
一个topic可以有多个消息队列,RocketMQ可以实现消息放到一个队列里面
RocketMQ之事务消息
- 生产者首先发送消息到MQ服务端,但是此消息被标记位“暂不能投递”状态,也就是半消息状态。
- 等到MQ服务端接收到生产者提交的二次确认消息的时候,可能是commit或者Rollback,只有commit才会进行向消费者投递。
- 如果异常原因导致二次消息投递失败,MQ服务会主动询问生成者消息的最终状态,即消息回查。
- 消费者接收到消息执行逻辑后commit_success后流程结束,否者重试;重试参考消费者重试策略;
消费者获取消息原理
消费者获取消息是通过客户端轮询的方式来获取的,但这种轮询是长轮询机制,没有消息的时候会服务端阻塞请求,不会立即返回,等到有数据的时候才会返回给客户端,然后关闭连接。客户端响应完消息之后再次向服务端发送新的请求,进入下一个循环周期。
消息模式
消费者通过DefaultMQPushConsumer将多个consumer组合在一起,消费发送到这个组分为集群模式与广播模式:
- 集群模式、是消费者负载均衡的策略,同一个ConsumerGroup所有的consumer组合起来才是Topic内容的整体,每个consumer消费的只是一部分消息
-
广播模式、 同一个ConsumerGroup里的每个Consumer,都能消费订阅到Topic的全部消息。同一个消息会被多次分发
集群模式设置:
consumer.setMessageModel(MessageModel.CLUSTERING);
广播模式设置:
consumer.setMessageModel(MessageModel.BROADCASTING);
RocketMQ消息存储
- RocketMQ中的消息数据存储,采用了零拷贝技术。
- RocketMQ写入消息到磁盘的时候尽可能的保证顺序写入,顺序写入比随机写入效率高。
- RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,CommitLog是真正存储数据的文件,ConsumeQueue是索引文件,存储指向真正数据的物理地址
存储关系截图:
存储关系流程图:
RocketMQ写入消息到磁盘的模式
RocketMQ尽可能的顺序写入消息到磁盘,消息通过produer写入消息到磁盘的时候有两种写磁盘的方式,一种是同步复制一种是异步刷盘:
- 同步复制、消息在写入内存中后,会立即写入磁盘、写完磁盘完成之后才会返回消息写成功状态
-
异步刷盘、在消息返回写成功之后,消息只是写入到内存中,当内存中消息积累到一定程度,统一触发写入磁盘
同步刷盘适合写入高可靠性消息,异步刷盘适合高吞吐量不太重要的消息,可以通过broker配置文件中指定刷盘方式:
flushDiskType=ASYNC_FLUSH #异步
flushDiskType=SYNC_FLUSH #同步
消息重试机制
producer与consumer都可能发生重试,都可以自定义重试次数
- producer端重试只有在同步发送的情况下才会重试,而且是某些特定的异常才会触发重试
- consumer端重试分为两种、一个是exception,一个是timeout。消费者默认重试频率5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h。异常重试需要返回ConsumeConcurrentlyStatus.RECONSUME_LATER状态,返回这个状态的时候将会触发重试。而超时重试的定义为,MQ服务端没有接到消息发反馈,既不是成功也不是失败,这个时候就会定义超时。
集群模式
- 多master模式、无slave,单台宕机期间,这台机器未被消费的消息不可订阅,消息实时性收到影响
- 多master多slave模式,异步复制、master宕机消费者仍然可以通过slave消费,缺点是master宕机,磁盘损坏会有少量消息丢失
- 多master多slave模式,同步双写,优点master宕机,slave仍然可以提供消费,数据的可用性可靠性很高,缺点是性能比异步复制略低。