消息队列与AMQP协议
程序员文章站
2022-05-14 19:31:10
...
一、 消息队列
消息队列(Message Queue,简称MQ)提供异步通信协议,可以实现进程间通信或同一进程不同线程间的通信。其中‘消息’是指包含必要信息的数据。消息的发送者发送完数据后,立即返回,消息被存储在消息队列当中,对这个消息感兴趣的消费者会订阅消息并接收并处理它。
使用消息队列的好处如下:
1、 应用解耦
消息是与平台和语言无关的,消息队列可以应对多变的产品变更。
2、 异步通信
可以缩短请求等待的时间,使用专门处理请求的消费者来执行,提高WEB页面的吞吐量,
尤其是瞬间发生的高流量情况,消息队列非常有助于顶住访问的压力
3、数据持久化
未完成的消息不会因为某些故障而丢失。
4、 送达保证
消息队列提供的冗余机制保证了消息确实能被处理,除非消费者明确表示已经出来完这个消息,否则这个消息可以被放回队列中以备其他消费者出来。
二 、AMQP协议
1、概述
1.1 什么是AMQP
AMQP,即Advanced Message Queuing Portocol,高级消息队列协议。
该协议使得遵从该规范的客户端应用和消息中间服务器的全功能互操作成为可能,
它的设计初衷是为了拜托商业MQ高额费用和不同MQ供应商的接口不统一的问题。
消息发送与接收的双方遵守这个协议可以实现异步通信,这个协议约定了消息的格式和工作方式
1.2 为什么使用AMQP
在分布式的系统中,子系统如果使用socket进行通信,有很多问题需要解决
1)消息的发送者和接收者如何维持这个连接,如果一方中断,这期间的数据如何防止丢失?
2)如何降低发送者和接收者的耦合
3)如何让优先级高的接收者接收消息
4)如何做到负载均衡
5)如何将消息发送给相关的接收者,如果接收者订阅了不同的数据,如何正确的分发到接受者。
6)如何保证接收者接到了正确的或者是有序的数据。
7)如何做到可扩展,将通信模块发到集群上去。
AMQP可以解决这些问题。
2 、AMQP的模型与原理
2.1 AMQP中包含的主要元素
生产者(producer):向Exchage发布消息的应用
消费者(Consumer):从消息队列中消费消息的应用。
消息队列(Message Queue):服务器组件,用于保存消息,知道发送给消费者
消息(Message):传输的内容,消息实际上包含了两部分的内容
1.有效载荷(Payload),也就是要传输的数据,数据类型可以纯文本,也可以是JSON
2.标签(Lable),它包含交换机的名字和可选的主题(topic)标记等。
交换器(Exchage):路由组件,接收生产者发送的消息,并将消息路由转发给消息队列
虚拟主机(Virtual Host):一批交换器,消息队列和相关对象。虚拟主机共享相同身份认证和加密环境独立的独立服务器域。
连接(Connection):一个网络连接,比如TCP/IP套接字连接。
信道(Channel):多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。
绑定器(Binding):消息队列和交换器直接的关联。
2.2 AMQP的工作流程
2.3 AMQP是如何建立通信的
1)建立连接Connection。由生产者和消费者创建连接,连接到broker(消息代理)的物理节点上。
2)建立Channel。Channel是建立在Connection之上的,一个Connection可以建立多个 Channel,
producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。
3)发送消息。由Producer发送消息到Broker中的exchange中。
4) 路由转发。exchange收到消息后,根据一定的路由策略,将消息转发到相应的queue中去。
5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,
queue就将消息发送给Consumer端。
6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。
Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;
如果在对应的Channel断开后,Queue没有收到这条消息的ACK(消息回应)信息,
该消息将被发送给另外的Channel。
至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。
2.4 技术术语
连接(Connection):一个网络连接,比如TCP/IP套接字连接。
会话(Session):端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
信道(Channel):多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
客户端(Client):AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
服务器(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程。也称为“消息代理”。
端点(Peer):AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)
搭档(Partner):当描述两个端点之间的交互过程时,使用术语“搭档”来表示“另一个”端点的简记法。
比如我们定义端点A和端点B,当它们进行通信时,端点B是端点A的搭档,端点A是端点B的搭档。
片段集(Assembly):段的有序集合,形成一个逻辑工作单元。
段(Segment):帧的有序集合,形成片段集中一个完整子单元。
帧(Frame):AMQP传输的一个原子单元。一个帧是一个段中的任意分片。
控制(Control):单向指令,AMQP规范假设这些指令的传输是不可靠的。
命令(Command):需要确认的指令,AMQP规范规定这些指令的传输是可靠的。
异常(Exception):在执行一个或者多个命令时可能发生的错误状态
类(Class):一批用来描述某种特定功能的AMQP命令或者控制。
消息头(Header):描述消息数据属性的一种特殊段。
消息体(Body):包含应用程序数据的一种特殊段。消息体段对于服务器来说完全透明——服务器不能查看或者修改消息体。
消息内容(Content):包含在消息体段中的的消息数据。
交换器(Exchange):服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
交换器类型(Exchange Type):基于不同路由语义的交换器类。
消息队列(Message Queue):一个命名实体,用来保存消息直到发送给消费者。
绑定器(Binding):消息队列和交换器之间的关联。
绑定器关键字(Binding Key):绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
路由关键字(Routing Key):一个消息头,交换器可以用这个消息头决定如何路由某条消息。
持久存储(Durable):一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
临时存储(Transient):一种服务器资源,当服务器重启时,保存的消息数据会丢失。
持久化(Persistent):服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
非持久化(Non-Persistent):服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
虚拟主机(Virtual Host):一批交换器、消息队列和相关对象。
客户端应用程序在登录到服务器之后,可以选择一个虚拟主机。
2.5 应用场景
AQMP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:
1. 异步处理
比如公司新入职一个员工,需要开通系统账户,有几件事情要做,开通系统账户,发短信通知用户,发邮件给员工,在公司内部通讯系统中发送消息给员工。其中发短信,发邮件,发内部通讯系统消息,这三件事情可以串行,也可以并行,并行的好处就是可以提高效率。这时候可以使用MQ来实现运行。
2. 应用解耦
在公司内部系统中,有人事系统,OA系统,财务系统,外围应用系统等等,当人事发生变动的时候(离职入职调岗),人事系统需要将这些变动通知给其他系统,这时只需人事系统发送一条消息,各个外围系统订阅该消息,就可得知人事变动,与实时服务调用相比,如果人事系统挂掉,各个外围系统不会受到影响,继续运行。
3. 流量缓冲
在有些流量会瞬间暴增的场景下,如秒杀,为了防止流量突然增大而使得应用挂掉,可以引入MQ,将请求存入MQ中,如果超过了MQ的长度,就把请求丢弃掉,这样来限制流量。
4. 日志处理
将消息队列引入到日志处理中,如kafka的应用,解决了大量日志的传输问题。日志客户端负责采集日志数据,并定期写入kafka队列,kafka负责接收,存储和转发日志,日志处理系统订阅并消费kafka中的日志数据。
推荐阅读
-
PHP PDO和消息队列的个人理解与应用实例分析
-
PHP高级编程之消息队列原理与实现方法详解
-
深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议
-
[视频教程] 基于redis的消息队列实现与思考
-
PHP PDO和消息队列的个人理解与应用实例分析
-
JAVAEE——宜立方商城08:Zookeeper+SolrCloud集群搭建、搜索功能切换到集群版、Activemq消息队列搭建与使用
-
PHP Beanstalkd消息队列的安装与使用方法实例详解
-
Apache RocketMQ 消息队列部署与可视化界面安装
-
RabbitMQ与.net core(四) 消息的优先级 与 死信队列
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间