欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Introduction to the AMQP 0-9-1 Model

程序员文章站 2022-07-13 17:00:54
...

 

 

About This Guide

This guide explains the AMQP 0-9-1 model used by RabbitMQ. The original version was written and kindly contributed by Michael Klishin and edited by Chris Duncan.

High-level Overview of AMQP 0-9-1 and the AMQP Model

What is AMQP?

AMQP (Advanced Message Queuing Protocol) is a networking protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

Brokers and Their Role

brokers 从producers(产生消息的应用)接收消息然后转发给consumers(处理消息的应用)。

AMQP 0-9-1 Model in Brief

AMQP 0-9-1 模型的世界观:生产者产生消息,并发送到交换机--可以类比邮局或者邮箱。交换机根据绑定规则将消息分发到队列。要么由AMQP broker将消息递送到订阅队列的消费者,要么由消费者按需主动的从队列中获取。

Introduction to the AMQP 0-9-1 Model
            
    
    博客分类: 后端技术 AMQP 

When publishing a message, producers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message.

 

网络的不可靠导致应用可能不能正常的处理消息,为此AMQP模型引入了“消息应答”的概念:当消息递送到消费者后,消费者通知broker;应答可以是自动的或者由应用的开发者自行来决定。当启用应答机制后,broker只有在收到应答后才从队列中删除消息。

某些情况下,例如消息因找不到路由规则而无法被转发,则处理方式包括返还给生产者、丢弃或者暂存到“死信队列”。 生产者通过在发送消息的时候设置参数来决定如何处理这些问题。

 

队列、交换机、绑定,这些都是 AMQP模型中的实体。

 

AMQP is a Programmable Protocol

AMQP 0-9-1 is a programmable protocol in the sense that AMQP entities and routing schemes are defined by applications themselves, not a broker administrator. Accordingly, provision is made for protocol operations that declare queues and exchanges, define bindings between them, subscribe to queues and so on.

 

这给应用程序开发者带来了极大的*,但同时他们也应该注意到潜在的定义冲突。实际上,定义冲突很少发生,一旦发生通常意味着错误的配置。应用定义了所需要的AMQP实体好必要的路由schema,并可能在不需要的时候删除它们。

 

Exchanges and Exchange Types

 

Exchanges是一种AMQP实体,它是消息的发送对象。它接收消息并路由给零个或者多个queue。所使用的路由机制,依赖于exchange的类型和规则,又叫binds。 AMQP 0-9-1提供了四种exchange类型:

 

 

Name Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)

 

除了类型,exchanges还定义了一些属性,最重要的属性包括:

  • Name
  • Durability (exchanges survive broker restart)
  • Auto-delete (exchange is deleted when all queues have finished using it)
  • Arguments (these are broker-dependent)

 

Exchanges可以是durable或transient。前者当broker重启后仍然有效,而后者当broker重启后即消失。并非所有场景和用例都需要durable的exchange。

 

Default Exchange

broker预定义了一个默认的交换机,它是一个direct类型的无名交换机。它有一个特殊的属性从而非常适用于简单的应用场景:每个队列创建后,自动以队列名称作为key绑定到此交换机上。

例如,假设你定义了一个名为 "search-indexing-online"的queue,broker会以 "search-indexing-online"为routing key,将此queue绑定到默认的exchange上。因而,若一个message以 "search-indexing-online"为routing key发布到默认的exchange,就会被路由到此queue。换句话说,默认的exchange看上去可以直接将消息路由到队列,其实并非如此(有一个自动的绑定动作)

 

 

Direct Exchange

A direct exchange delivers messages to queues based on the message routing key. A direct exchange is ideal for the unicast routing of messages (although they can be used for multicast routing as well). Here is how it works:

  • A queue binds to the exchange with a routing key K
  • When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R

 

direct exchanges最适用于以RR的方式在多个worker之间分发任务。在AMQP 0.9.1模型中,有一点非常重要:消息是在多个消费者之间进行负载均衡的,而并非在多个队列之间。

 

A direct exchange can be represented graphically as follows:Introduction to the AMQP 0-9-1 Model
            
    
    博客分类: 后端技术 AMQP 

Fanout Exchange

 

fanout exchange,将消息路由给所有绑定到它的queue,路由key被完全忽略。如果有N个queue绑定到fanout exchange上,当一个消息发布到该exchange,在会为每个queue拷贝一份消息并递送给它。Fanout exchange非常适合广播消息。

 

Because a fanout exchange delivers a copy of a message to every queue bound to it, its use cases are quite similar:

  • Massively multi-player online (MMO) games can use it for leaderboard updates or other global events
  • Sport news sites can use fanout exchanges for distributing score updates to mobile clients in near real-time
  • Distributed systems can broadcast various state and configuration updates
  • Group chats can distribute messages between participants using a fanout exchange (although AMQP does not have a built-in concept of presence, so XMPP may be a better choice)

 

A fanout exchange can be represented graphically as follows:Introduction to the AMQP 0-9-1 Model
            
    
    博客分类: 后端技术 AMQP 

Topic Exchange

Topic exchanges根据路由key和绑定模式来转发消息到一个或多个queue,它适合于多播类型的消息。

 

topic exchanges 有大量的使用场景。当涉及到多个消费者/应用需要选择性的接收消息时,就可以考虑topic exchange。

实例场景:

  • Distributing data relevant to specific geographic location, for example, points of sale
  • Background task processing done by multiple workers, each capable of handling specific set of tasks
  • Stocks price updates (and updates on other kinds of financial data)
  • 股票价格变动
  • News updates that involve categorization or tagging (for example, only for a particular sport or team)
  • Orchestration of services of different kinds in the cloud
  • Distributed architecture/OS-specific software builds or packaging where each builder can handle only one architecture or OS

 

Headers Exchange

A headers exchange is designed to for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.

It is possible to bind a queue to a headers exchange using more than one header for matching. In this case, the broker needs one more piece of information from the application developer, namely, should it consider messages with any of the headers matching, or all of them? This is what the "x-match" binding argument is for. When the "x-match" argument is set to "any", just one matching header value is sufficient. Alternatively, setting "x-match" to "all" mandates that all the values must match.

Headers exchanges can be looked upon as "direct exchanges on steroids". Because they route based on header values, they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.

 

Queues

AMQP中的queue,同其它消息队列或者任务队列中的queue的概念非常相似:它们存储消息,这些消息最终将提供给作为消费者的应用。Queue共享了exchanges的一些属性,但也具有自己额外的属性:

 

  • Name
  • Durable (the queue will survive a broker restart)
  • Exclusive (used by only one connection and the queue will be deleted when that connection closes)
  • Auto-delete (queue is deleted when last consumer unsubscribes)
  • Arguments (some brokers use it to implement additional features like message TTL)

 

首先要定义Queue然后才能使用。定义Queue会创建它。如果一个queue已经存在且属性完全一样,则定义Queue的操作不起作用。如果queue名称一样,但属性不一样,则会抛出一个channel级别的异常,异常代号为406 (PRECONDITION_FAILED) 。

 

Queue Names

Applications may pick queue names or ask the broker to generate a name for them. Queue names may be up to 255 bytes of UTF-8 characters. To ask an AMQP broker to generate a unique queue name for you, pass an empty string as the queue name argument: The same generated name may be obtained by subsequent methods in the same channel by using the empty string where a queue name is expected. This works because the channel remembers the last server-generated queue name.

Queue names starting with "amq." are reserved for internal use by the broker. Attempts to declare a queue with a name that violates this rule will result in a channel-level exception with reply code 403 (ACCESS_REFUSED).

Queue Durability

Durable queues are persisted to disk and thus survive broker restarts. Queues that are not durable are called transient. Not all scenarios and use cases mandate queues to be durable.

Note that only durable queues can be bound to durable exchanges. This guarantees that it is possible to restore bindings on broker restart.

Durability of a queue does not make messages that are routed to that queue durable. If broker is taken down and then brought back up, durable queue will be re-declared during broker startup, however, only persistent messages will be recovered.

Bindings

Bindings are rules that exchanges use (among other things) to route messages to queues. To instruct an exchange E to route messages to a queue Q, Q has to be bound to E. Bindings may have an optional routing key attribute used by some exchange types. The purpose of the routing key is to select certain messages published to an exchange to be routed the bound queue. In other words, the routing key acts like a filter.

To draw an analogy:

  • Queue is like your destination in New York city
  • Exchange is like JFK airport
  • Bindings are routes from JFK to your destination. There can be zero or many ways to reach it

Having this layer of indirection enables routing scenarios that are impossible or very hard to implement using publishing directly to queues and also eliminates certain amount of duplicated work application developers have to do.

If AMQP message cannot be routed to any queue (for example, because there are no bindings for the exchange it was published to) it is either dropped or returned to the publisher, depending on message attributes the publisher has set.

Consumers

 

仅仅把消息保存在queue中是没有意义的,必须有应用来消费它们。AMQP提供了两种方式让应用来消费消息:

  • 推送消息到消费者 ("push API")
  • 消费者主动来拉消息 ("pull API")

 

使用"push API",应用必须标明想从哪个queue 获取消息。此时,我们说queue上注册了一个消费者,或者说一个消费者订阅了一个queue。可以在一个queue上注册多个消费者,也可以注册一个排他性的消费者(当它在消费的时候,不允许其它消费者来获取消息)。

每个消费者都有一个唯一的consumer tag,可用于取消订阅消息。consumer tag 实质是一个字符串。

Message Acknowledgements

消费者,可能在偶然情况下,处理消息失败,或者有时候它会异常退出。在网络故障情况下,也可能导致问题。这带来一个问题:什么情况下,broker可以确信能从queue中删除一个消息? AMQP规范提供了两种选择:

  • 当broker将消息发送到一个应用(通过 basic.deliver 或 basic.get-ok 操作)
  • 当收到消费者的应答后(通过 basic.ack AMQP method).

 

前者称作自动应答模型,后者称作显示应答模型。采用后一种模型,消费者需要决定何时发送应答。可以在收到消息后立刻应答,也可以在将消息持续化到存储系统但尚没有处理时,或者是在处理结束后。

 

如果消费者挂掉了,来不及应答,broker可以讲消息投递给其它消费者,或者等待其它消费者注册上来再重新投递。

Rejecting Messages

When a consumer application receives a message, processing of that message may or may not succeed. An application can indicate to the broker that message processing has failed (or cannot be accomplished at the time) by rejecting a message. When rejecting a message, an application can ask the broker to discard or requeue it. When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again.

Negative Acknowledgements

Messages are rejected with the basic.reject AMQP method. There is one limitation that basic.reject has: there is no way to reject multiple messages as you can do with acknowledgements. However, if you are using RabbitMQ, then there is a solution. RabbitMQ provides an AMQP 0-9-1 extension known as negative acknowledgements or nacks. For more information, please refer to the Rabbit-specific extensions.

Prefetching Messages

For cases when multiple consumers share a queue, it is useful to be able to specify how many messages each consumer can be sent at once before sending the next acknowledgement. This can be used as a simple load balancing technique or to improve throughput if messages tend to be published in batches. For example, if a producing application sends messages every minute because of the nature of the work it is doing.

Note that RabbitMQ only supports channel-level prefetch-count, not connection or size based prefetching.

Message Attributes and Payload

Messages in the AMQP model have attributes. Some attributes are so common that the AMQP 0-9-1 specification defines them and application developers do not have to think about the exact attribute name. Some examples are

  • Content type
  • Content encoding
  • Routing key
  • Delivery mode (persistent or not)
  • Message priority
  • Message publishing timestamp
  • Expiration period
  • Producer application id

Some attributes are used by AMQP brokers, but most are open to interpretation by applications that receive them. Some attributes are optional and known as headers. They are similar to X-Headers in HTTP. Message attributes are set when a message is published.

AMQP messages also have a payload (the data that they carry), which AMQP brokers treat as an opaque byte array. The broker will not inspect or modify the payload. It is possible for messages to contain only attributes and no payload. It is common to use serialisation formats like JSON, Thrift, Protocol Buffers and MessagePack to serialize structured data in order to publish it as the message payload. AMQP peers typically use the "content-type" and "content-encoding" fields to communicate this information, but this is by convention only.

Messages may be published as persistent, which makes AMQP broker persist them to disk. If the server is restarted the system ensures that received persistent messages are not lost. Simply publishing message to a durable exchange or the fact that queue(s) they are routed to is durable doesn't make messages persistent: it all depends on persistence mode of the messages itself. Publishing messages as persistent affects performance (just like with data stores, durability comes at a certain cost in performance).

Message Acknowledgements

Since networks are unreliable and applications fail, it is often necessary to have some kind of processing acknowledgement. Sometimes it is only necessary to acknowledge the fact that a message has been received. Sometimes acknowledgements mean that a message was validated and processed by a consumer, for example, verified as having mandatory data and persisted to a data store or indexed.

This situation is very common, so AMQP 0-9-1 has a built-in feature called message acknowledgements (sometimes referred to as acks) that consumers use to confirm message delivery and/or processing. If an application crashes (AMQP broker notices this when connection is closed), if an acknowledgement for a message was expected but not received by the AMQP broker, the message is re-queued (and possibly immediately delivered to another consumer, if any exists).

Having acknowledgements built into the protocol helps developers to build more robust software.

AMQP 0-9-1 Methods

 

AMQP 0-9-1由一组methods组成。方法是一种操作(类似于HTTP 方法),而跟 OOP完全无关。方法被组合成类。类是方法逻辑上的组合。

下面看看 exchange class (一组跟exchanges相关的操作),它包含了以下操作:

 

  • exchange.declare
  • exchange.declare-ok
  • exchange.delete
  • exchange.delete-ok

 

 

(注意,RabbitMQ针对exchange class,定义了自己的扩展的方法,本文不予讨论)。

 

The operations above form logical pairs: exchange.declare and exchange.declare-okexchange.delete and exchange.delete-ok. These operations are "requests" (sent by clients) and "responses" (sent by brokers in response to the aforementioned "requests").

As an example, the client asks the broker to declare a new exchange using the exchange.declare method:

Introduction to the AMQP 0-9-1 Model
            
    
    博客分类: 后端技术 AMQP 

As shown on the diagram above, exchange.declare carries several parameters. They enable the client to specify exchange name, type, durability flag and so on.

If the operation succeeds, the broker responds with the exchange.declare-ok method:

Introduction to the AMQP 0-9-1 Model
            
    
    博客分类: 后端技术 AMQP 

exchange.declare-ok does not carry any parameters except for the channel number (channels will be described later in this guide).

The sequence of events is very similar for another method pair on the AMQP queue class: queue.declare and queue.declare-ok:

Introduction to the AMQP 0-9-1 Model
            
    
    博客分类: 后端技术 AMQP 

Introduction to the AMQP 0-9-1 Model
            
    
    博客分类: 后端技术 AMQP 

Not all AMQP methods have counterparts. Some (basic.publish being the most widely used one) do not have corresponding "response" methods and some others (basic.get, for example) have more than one possible "response".

Connections

 

APMQ的连接通常要保持一段时间。它是一种应用层协议,底层采用TCP来保证可靠性。AMQP连接需要身份认证,且可通过TLS(SSL)来保护安全。若应用不再需要连接到一个AMQP broker,则它应该优雅的关闭连接,而不是粗暴的直接关闭底层的TCP连接。

 

Channels

有些应用需要与一个broker建立多个连接。但是,同时维持大量TCP连接是不明智的,那样会耗费系统资源,同时也对防火墙配置带来麻烦。 AMQP连接是可复用的,可以把channels看做“共享单条TCP连接的轻量级连接”。

对于使用多线程/多进程的应用,通常是为每个线程/进程打开一个channel,而不是在它们之间共享channel.

在一个channel内的通信,是与其它channel内的通信完全隔离的。因此,每个AMQP操作消息都携带一个channel number,指定了该操作作用于哪个channel。

 

 

Virtual Hosts

为了让一个broker能提供多个隔离的“环境”(一组用户、exchanges,queues等等),AMQP提供了虚拟主机的概念。这个跟web server的虚拟主机的概念很相似,可以为AMQP实体提供完全隔离的环境。AMQP客户端负责在创建AMQP连接的过程中指定所使用的虚拟主机。

 

AMQP is Extensible

AMQP 0-9-1 has several extension points:

These features make the AMQP 0-9-1 Model even more flexible and applicable to a very broad range of problems.

AMQP 0-9-1 Clients Ecosystem

There are many AMQP 0-9-1 clients for many popular programming languages and platforms. Some of them follow AMQP terminology closely and only provide implementation of AMQP methods. Some others have additional features, convenience methods and abstractions. Some of the clients are asynchronous (non-blocking), some are synchronous (blocking), some support both models. Some clients support vendor-specific extensions (for example, RabbitMQ-specific extensions).

Because one of the main AMQP goals is interoperability, it is a good idea for developers to understand protocol operations and not limit themselves to terminology of a particular client library. This way communicating with developers using different libraries will be significantly easier.

 

相关标签: AMQP

上一篇: NIO基本(2)

下一篇: QPID与AMQP简介