欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ流量控制机制简单分析

程序员文章站 2022-07-13 15:11:46
...

 

在RabbitMQ中,消息可能被存储在多个不同的队列,消息越早被消费,那么消息经过的队列层次越少,则平均每个消息处理的开销就越小。但若接收消息的速率过快,MQ来不及处理,这些消息就可能进入很深层次的队列,大大增加平均每个消息的处理开销,进一步使得处理新消息和发送旧消息的能力减弱,更多的消息会进入很深的队列,循环往复,整个系统的性能就会极大的降低。另外若接收消息的速率过快还会实现某些进程的mailbox过大,可能会产生很严重的后果。为此RabbitMQ设计了一套流控机制,本文从以下三个方面去阐述该流控机制是如何工作的。

1.如何开关闸门

RabbitMQ使用TCP长连接进行通讯,接收数据的起点进程为rabbit_reader。首先分析它的接收loop

 

recvloop(Deb, State = #v1{connection_state = blocked}) ->
    mainloop(Deb, State); 
recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
  when BufLen < RecvLen ->
    ok = rabbit_net:setopts(Sock, [{active, once}]), 
    mainloop(Deb, State#v1{pending_recv = true});
recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
    {Data, Rest} = split_binary(case Buf of
                                    [B] -> B;
                                    _   -> list_to_binary(lists:reverse(Buf))
                                end, RecvLen),
    recvloop(Deb, handle_input(State#v1.callback, Data,
                               State#v1{buf = [Rest],
                                        buf_len = BufLen - RecvLen})).
 

从上面代码可以看出,rabbit_reader每接收到一个包,就设置套接字属性为{active, onece},若当前连接被blocked时则不设置{active,once},这个接收进程就阻塞在receive方法上。通过这种方式来实现闸门的开关。

2.何时关闭闸门

RabbitMQ是用erlang/OTP开发的,一个消息从被接收到被发送给订阅者,必然要在多个进程间的转发,从接收到被消费,一个消息所走过的所有进程自然形成一条消息链,RabbitMQ通过监控这条链上每个节点“mailbox”中未被接收的消息数量,决定何时关闭闸门。实现机制如下所述:


RabbitMQ流量控制机制简单分析
            
    
    博客分类: rabbitmq rabbitmqerkangamqp

如图所示,进程A、B、C连成一条消息链,每个进程字典中有一对关于收发消息的credit值,以进程B为例,{{credit_from, C}, Value},表示能发多少条消息给C,每发一条消息该值减1,当为0时,本进程阻塞住不再往下游进程发消息也不再接收上游的消息;{{credit_to, A}, Value}表示再接收多少个消息就向上游进程发增加credit值的消息{bump_credit, { self(), Quantity}},在上游进程接收到该消息后,就增加{credit_from, pid}值,这样上游进程就能持续发消息。但当上游发送速率高于下游接收速率,credit值会逐渐被耗光这时进程就会被阻塞,阻塞的情况会一直传递到最上游

Rabbit_reader,这时rabbit_reader就关闭闸门。

3.何时开启闸门

当上游进程收到来自下游进程的bump_credit消息时,若此时上游进程处于block状态则解除block状态,开始接收更上游进程的消息,一个个的传导最终能够解除rabbit_reader的block状态。


 

  • RabbitMQ流量控制机制简单分析
            
    
    博客分类: rabbitmq rabbitmqerkangamqp
  • 大小: 29 KB