RabbitMQ源码分析 - 队列机制
(注:分析代码基于RabbitMQ 2.8.2)
当rabbit无法直接将消息投递到消费者时,需要暂时将消息入队列,以便重新投递。但是,将消息入队列并不意味着,就是将消息扔在一个队列中就不管了,rabbit提供了一套状态转换的机制来管理消息。
在rabbit中,队列中的消息可能会处理以下四种状态:
alpha:消息内容以及消息在队列中的位置(消息索引)都保存在内存中;
beta:消息内容只在磁盘上,消息索引只在内存中;
gamma:消息内容只在磁盘上,消息索引在内存和磁盘上都存在;
delta:消息的内容和索引都在磁盘上。
注意:对于持久化消息,消息内容和消息索引都必须先保存在磁盘上,然后才处于上述状态中的一种。其中gamma很少被用到,只有持久化的消息才会出现这种状态。
rabbit提供这种分类的主要作用是满足不同的内存和CPU需求。alpha最耗内存,但很少消耗CPU;delta基本不消耗内存,但是要消耗更多的CPU以及磁盘I/O操作(delta需要两次I/O操作,一次读索引,一次读消息内容;delta及gamma只需要一次I/O操作来读取消息内容)。
rabbit在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果内存中的消息数量大于这个数量,就会引起消息的状态转换,转换主要两种:alpha -> beta, beta -> delta。alpha -> beta的转换会将消息的内容写到磁盘(如果是持久化消息,在这一步转换后,消息将会处于gamma状态),beta -> delta的转换会更进一步减少内存消耗,将消息索引也写到磁盘。
运行时怎么体现消息的各种状态呢?在队列的状态vqstate(参见[$RABBIT_SRC/src/rabbit_variable_queue.erl])结构中,存在q1,q2,delta,q3,q4五个队列(使用q1~q4使用Erlang的queue模块,delta存储结构依赖于消息索引的实现),其中q1,q4只包含alpha状态的消息,q2,q3只包含beta和gamma状态的消息,delta包含delta状态的消息。
一般情况下消息会以p1->p2->delta->q3->q4的顺序进行状态变换:消息进入队列时,处于alpha状态并保存在内存中(q1或q4),然后某个时刻发现内存不足,被转换到beta状态(q2,q3)(这时候其实有两个转换q1->q2,q4->q3),如果还是内存不足,被转换到delta状态(delta)(q2->delta,q3->delta);当从队列中消费消息时,会先从处于alpha状态的内存队列(q4)中获取消息,如果q4为空,则从beta状态的队列(q3)中获取消息,如果q3也为空,则会从delta状态的消息队列中读取消息,并将之转移到q3。
上述步骤只是个一般步骤,实际运行时,中间的步骤都是可以跳过的,比如消息可能在一开始被放在q4队列中、q1队列中的元素会直接跳到q4队列中(这两种情况下,q3,delta,q2队列都为空);当delta队列为空时,q2队列可以直接跳到q3队列,q1队列也可以直接跳到q3。
上述这些状态转换主要发生的两个地方为:从队列中获取消息时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> queue_out/1])以及减少内存使用量时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> reduce_memery_use/1])。下面详细说明下这两个操作的逻辑。
$RABBIT_SRC/src/rabbit_variable_queue.erl -> queue_out/1 queue_out(State = #vqstate { q4 = Q4 }) -> case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end; {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> fetch_from_q3/1 fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4 }) -> case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; {{value, MsgStatus}, Q3a} -> State1 = State #vqstate { q3 = Q3a }, State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> true = ?QUEUE:is_empty(Q2), %% ASSERTION true = ?QUEUE:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; // A {true, false} -> maybe_deltas_to_betas(State1); // B {false, _} -> State1 end, {loaded, {MsgStatus, State2}} end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> maybe_deltas_to_betas/1 maybe_deltas_to_betas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, index_state = IndexState, pending_ack = PA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, end_seq_id = DeltaSeqIdEnd } = Delta, DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, PA, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, case ?QUEUE:len(Q3a) of 0 -> maybe_deltas_to_betas( State1 #vqstate { delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); Q3aLen -> Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> State1 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, q3 = ?QUEUE:join(Q3b, Q2) }; // C N when N > 0 -> Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, end_seq_id = DeltaSeqIdEnd }), State1 #vqstate { delta = Delta1, q3 = Q3b } end end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> reduce_memory_use/1 reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun push_betas_to_deltas/2, fun limit_ram_acks/2, State), State1 reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, rates = #rates { avg_ingress = AvgIngress, avg_egress = AvgEgress }, ack_rates = #rates { avg_ingress = AvgAckIngress, avg_egress = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; S1 -> {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) end, {S1, State}, case (AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress) of true -> [AckFun, AlphaBetaFun]; false -> [AlphaBetaFun, AckFun] end), {true, State2} end, case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; _ -> {Reduce, State1} end chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> 0; chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
$RABBIT_SRC/src/rabbit_variable_queue.erl -> push_alphas_to_betas/2 push_alphas_to_betas(Quota, State) -> {Quota1, State1} = push_alphas_to_betas( fun ?QUEUE:out/1, fun (MsgStatus, Q1a, State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } end, Quota, State #vqstate.q1, State), {Quota2, State2} = push_alphas_to_betas( fun ?QUEUE:out_r/1, fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}.
推荐阅读
-
rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)
-
Qt事件分发机制源码分析之QApplication对象构建过程
-
[五]类加载机制双亲委派机制 底层代码实现原理 源码分析 java类加载双亲委派机制是如何实现的
-
dubbo源码分析01:SPI机制
-
Zookeeper-watcher机制源码分析(一)
-
【SpringBoot】默认错误处理机制的源码大白话分析,以及自定义配置(遇坑)
-
jQuery源码分析之异步队列 Deferred 使用介绍
-
并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析
-
分布式消息队列 RocketMQ 源码分析 —— 事务消息
-
Android消息机制三剑客之Handler、Looper、Message源码分析(一)