分布式消息通信ActiveMQ
课程目标
- 持久化消息和非持久化消息的发送策略
- 消息的持久化方案及实践
- 消费端消费消息的原理
- 关于PrefetchSize的优化
1、持久化消息和非持久化消息的发送策略
1.1、消息同步发送和异步发送
ActiveMQ支持同步、异步两种发送模式将消息发送到broker上。
同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理。这个机
制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能
异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所
以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。
默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。
但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高。所以在发送持久化消
息的时候,尽量去开启事务会话。
除了持久化消息和非持久化消息的同步和异步特性以外,我们还可以通过以下几种方式来设置异步发送
1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.12.121:8161? jms.useAsyncSend=true");
2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3.((ActiveMQConnection)connection).setUseAsyncSend(true);
1.2、消息的发送原理分析图解
1.2.1、ProducerWindowSize的含义
producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的
确认,才能继续发送。
代码在:ActiveMQSession的1957行
主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。每次发送消
息之后,都将会导致memoryUsage大小增加(+message.size),当broker返回producerAck时,memoryUsage尺
寸减少(producerAck.size,此size表示先前发送消息的大小)。
可以通过如下2种方式设置:
Ø 在brokerUrl中设置: “tcp://localhost:61616?jms.producerWindowSize=1048576”,这种设置将会对所有的
producer生效。
Ø 在destinationUri中设置: “test-queue?producer.windowSize=1048576”,此参数只会对使用此Destination实例
的producer失效,将会覆盖brokerUrl中的producerWindowSize值。
注意:此值越大,意味着消耗Client端的内存就越大。
1.2.2消息发送的源码分析
以producer.send为入口
ActiveMQSession的send方法
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
ActiveMQConnection. doAsyncSendPacket
private void doAsyncSendPacket(Command command) throws JMSException { try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
同步发送和异步发送的区别
public Object request(Object command, int timeout) throws IOException { FutureResponse response = asyncRequest(command, null); return response.getResult(timeout); // 从future方法阻塞等待返回 }
在ResponseCorrelator的request方法中,需要通过response.getResult去获得broker的反馈,否则会阻塞
持久化消息和非持久化消息的存储原理
正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。能够存储的最大消息数据在
${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点
SystemUsage配置设置了一些系统内存和硬盘容量
从上面的配置我们需要get到一个结论,当非持久化消息堆积到一定程度的时候,也就是内存超过指定的设置阀
值时,ActiveMQ会将内存中的非持久化消息写入到临时文件,以便腾出内存。但是它和持久化消息的区别是,重
启之后,持久化消息会从文件中恢复,非持久化的临时文件会直接删除
消息的持久化策略分析
消息持久性对于可靠消息传递来说是一种比较好的方法,即时发送者和接受者不是同时在线或者消息中心在发送者
发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去。消息持久性的原理很简单,就是在发送消息出去
后,消息中心首先将消息存储在本地文件、内存或者远程数据库,然后把消息发送给接受者,发送成功后再把消息
从存储中删除,失败则继续尝试。接下来我们来了解一下消息在broker上的持久化存储实现方式
持久化存储支持类型
ActiveMQ支持多种不同的持久化方式,主要有以下几种,不过,无论使用哪种持久化方式,消息的存储逻辑都是
一致的。
Ø KahaDB存储(默认存储方式)
Ø JDBC存储
Ø Memory存储
Ø LevelDB存储
Ø JDBC With ActiveMQ Journal
消费端消费消息的原理
有两种方法可以接收消息,一种是使用同步阻塞的MessageConsumer#receive方
法。另一种是使用消息监听器MessageListener。这里需要注意的是,在同一个session下,这两者不能同时工
作,也就是说不能针对不同消息采用不同的接收方式。否则会抛出异常。
至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控
消费端的 PrefetchSize
消息消费流程图
activemq 的 consumer 端也有窗口机制,通过 prefetchSize 就可以设置窗口
大小。不同的类型的队列,prefetchSize 的默认值也是不一样的
持久化队列和非持久化队列的默认值为 1000
持久化 topic 默认值为 100
非持久化队列的默认值为 Short.MAX_VALUE-1
消费端会根据
prefetchSize 的大小批量获取数据,比如默认值是 1000,那么消费端会预先加
载 1000 条数据到本地的内存中
prefetchSize 的设置方法
在 createQueue 中添加 consumer.prefetchSize,就可以看到效果
Destination
destination=session.createQueue("myQueue?consumer.prefetchSize=10");
optimizeAcknowledge
ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化
ACK”,只有在为 true 的情况下,prefetchSize 以及
optimizeAcknowledgeTimeout 参数才会有意义
优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开
销,另一方面由于延迟了确认(默认 ack 了 0.65*prefetchSize 个消息才确
认),broker 再次发送消息时又可以批量发送
如果只是开启了 prefetchSize,每条消息都去确认的话,broker 在收到确认后
也只是发送一条消息,并不是批量发布,当然也可以通过设置 DUPS_OK_ACK
来手动延迟确认, 我们需要在 brokerUrl 指定 optimizeACK 选项
ConnectionFactory connectionFactory= new ActiveMQConnectionFactory ("tcp://192.168.11.153:61616?jms.optimizeAcknowledge=true&jms.optimiz
eAcknowledgeTimeOut=10000");
Ø 注意,如果 optimizeAcknowledge 为 true,那么 prefetchSize 必须大于 0.
当 prefetchSize=0 的时候,表示 consumer 通过 PULL 方式从 broker 获取消
息
optimizeAcknowledge 和 prefetchSize 的作用,两
者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消
费模型。它比仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消
息时的网络通信开销
Ø 需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提
升 consumer 的性能。如果 consumer 的消费性能本身就比较慢,设置比较大
的 prefetchSize 反而不能有效的达到提升消费性能的目的。因为过大的
prefetchSize 不利于 consumer 端消息的负载均衡。因为通常情况下,我们都
会部署多个 consumer 节点来提升消费端的消费性能。
这个优化方案还会存在另外一个潜在风险,当消息被消费之后还没有来得及确
认时,client 端发生故障,那么这些消息就有可能会被重新发送给其他
consumer,那么这种风险就需要 client 端能够容忍“重复”消息。
消息的确认过程
ACK_MODE
消息确认有四种 ACK_MODE,分别是
AUTO_ACKNOWLEDGE = 1 自动确认
CLIENT_ACKNOWLEDGE = 2 客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
SESSION_TRANSACTED = 0 事务提交并确认
虽然 Client 端指定了 ACK 模式,但是在 Client 与 broker 在交换 ACK 指令的时
候,还需要告知 ACK_TYPE,ACK_TYPE 表示此确认指令的类型,不同的
ACK_TYPE 将传递着消息的状态,broker 可以根据不同的 ACK_TYPE 对消息进
行不同的操作
ACK_TYPE
DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
STANDARD_ACK_TYPE = 2 “标准"类型,通常表示为消息"处理成功”,broker 端
可以删除消息了
POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多
次后,都无法正确处理时,消息将会被删除或者 DLQ(死信队列)
REDELIVERED_ACK_TYPE = 3 消息需"重发",比如 consumer 处理消息时抛出
了异常,broker 稍后会重新发送此消息
INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何 ACK_MODE 下
UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”
时,发现此消息不符合 Selector 过滤条件,那么此消息将 不会转发给订阅
者,消息将会被存储引擎删除(相当于在 Broker 上确认了消息)。
Client 端在不同的 ACK 模式时,将意味着在不同的时机发送 ACK 指令,每个 ACK
Command 中会包含 ACK_TYPE,那么 broker 端就可以根据 ACK_TYPE 来决定
此消息的后续操作
消息的重发机制原理
在正常情况下,有几中情况会导致消息重新发送
Ø 在事务性会话中,没有调用 session.commit 确认消息或者调用
session.rollback 方法回滚消息
Ø 在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE 的情况下,没有
调用 acknowledge 或者调用了 recover 方法;
一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费端会
给 broker 发送一个”poison ack”(ActiveMQMessageConsumer#dispatch:
1460 行),表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker 会
把这个消息放到 DLQ(死信队列)。
ActiveMQ 的优缺点
优点:
ActiveMQ 采用消息推送方式,所以最适合的场景是默认消息都可在短时间内
被消费。数据量越大,查找和消费消息就越慢,消息积压程度与消息速度成反
比。
缺点:
1.吞吐量低。由于 ActiveMQ 需要建立索引,导致吞吐量下降。这是无法克服
的缺点,只要使用完全符合 JMS 规范的消息中间件,就要接受这个级别的
TPS。
2.无分片功能。这是一个功能缺失,JMS 并没有规定消息中间件的集群、分片
机制。而由于 ActiveMQ 是伟企业级开发设计的消息中间件,初衷并不是为了
处理海量消息和高并发请求。如果一台服务器不能承受更多消息,则需要横向
拆分。ActiveMQ 官方不提供分片机制,需要自己实现。
适用场景
- 对 TPS 要求比较低的系统,可以使用 ActiveMQ 来实现,一方面比较简单,能
够快速上手开发,另一方面可控性也比较好,还有比较好的监控机制和界面
不适用的场景
1.消息量巨大的场景。ActiveMQ 不支持消息自动分片机制,如果消息量巨大,
导致一台服务器不能处理全部消息,就需要自己开发消息分片功能。
推荐阅读
-
(八)分布式通信----主机Host
-
(七)分布式通信----Netty实现NIO通信
-
activemq消息持久化方式(activemq和kafka区别)
-
PHP下操作Linux消息队列完成进程间通信的方法
-
activemq消息持久化方式(activemq和kafka区别)
-
Python进程间通信Queue消息队列用法分析
-
(一)ASP.Net Core 分布式通信----序列化
-
浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
-
JAVAEE——宜立方商城08:Zookeeper+SolrCloud集群搭建、搜索功能切换到集群版、Activemq消息队列搭建与使用
-
分布式事务之解决方案(可靠消息最终一致性)