欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ入门-AMQP协议

程序员文章站 2022-07-15 08:09:41
...

   RabbitMQ和AMQP
   AMQP生产者流转过程
   AMQP消费者流转过程


   RabbitMQ和AMQP

     RabbitMQ遵从AMQP协议,AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey与绑定时的BindingKey相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。
     RabbitMQ中的交换器、交换器类型、队列、绑定、路由键等都是遵循AMQP协议中相应的概念。
     AMQP协议本身包括三层:

  1. Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。如客户端可以使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅消费一个队列中的消息。
  2. Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务器的相应返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  3. Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误监测和数据表示等。

     AMQP说到底还是一个通信协议,通信协议都会涉及报文交互,从low-level举例说,AMQP本身是应用层协议,其填充于TCP协议层的数据部分。从high-level来说,AMQP是通过协议命令来进行交互的。AMQP协议可以看作一系列结构化命令的集合。


   AMQP生产者流转过程

     生产者连接代码:

Connection connection = factory.newConnection();	//创建连接
Channel channel = connection.createChannel();	//创建信道
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();

     流转过程图:
                                   RabbitMQ入门-AMQP协议
     步骤分析:

  1. 客户端与Broker建立连接,调用factory.newConnection方法,这个方法会进一步封装成Protocol Header 0-9-1(如果支持的协议是AMQP 0-9-1)的报文头发送给Broker,以此通知Broker本次交互采用的是AMQP 0-9-1协议,接着Broker返回Connection.Start来建立连接,在连接过程中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-OK、Connection.Open/.Open-OK等6个命令交互。
  2. 当客户端调用connection.createChannel方法准备开启信道的时候,其包装Channel.Open命令发送给Broker,等待Channel.Open-OK命令。
  3. 当客户端发送消息时,需要调用channel.basicPublish方法,对应的AMQP命令为Basic.Publish,这个命令其中还包含了ContentHeader和Content Body。Content Header里面包含的是消息体的属性,如投递模式、优先级等。而Content Body包含消息体本身。
  4. 当客户端发送完消息需要关闭资源时,涉及Channel.Close/.Close-OK与Connection.Close/.Close-Ok命令交互。

   AMQP消费者流转过程

     消费者连接代码:

Connection connection = factory.newConnection(address);	//创建连接
final Channel channel = connection.createChannel();	//创建信道
Consumer consumer = new DefaultConsumer(channel);	
channel.basicQos(64);
channel.basicConsume(QUEUE_NAME, consumer);
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();

     流转过程图:
                                   RabbitMQ入门-AMQP协议
     步骤分析:

  1. 消费者客户端与Broker建立连接,协议交互同样涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-OK、Connection.Open/.Open-OK等。
  2. 与生产者一样在Connection上建立信道Channel。
  3. 调用channel.basicQos(int perfetchCount)的方法设置消费者客户端最大能"保持"的未确认的消息数,协议涉及Basic.Qos/.Qos-OK这两个AMQP命令。
  4. 在真正消费之前,消费者客户端向Broker发送Basic.Consume命令(channel.basicConsume方法)将Channel设置为接受模式,之后Broker回执Basic.Consume-OK以告诉消费者客户端准备好消费消息。接着Broker向消费者推送消息,即Basic.deliver命令,该命令同样携带了Content Header和Content Body。
  5. 消费者接收到消息并正确消费之后,向Broker发送确认,即Basic.Ack命令
  6. 在消费者停止消费的时候,主动关闭连接,涉及Channel.Close/.Close-OK与Connection.Close/.Close-Ok命令交互。