一个最简单的消息队列,带你理解 RabbitMQ!
作者:海向
来源:
rabbitmq 简述
rabbitmq是一个消息代理:它接受并转发消息。 您可以将其视为邮局:当您将要把寄发的邮件投递到邮箱中时,您可以确信postman 先生最终会将邮件发送给收件人。 在这个比喻中,rabbitmq是一个邮箱,邮局和邮递员,用来接受,存储和转发二进制数据块的消息。
队列就像是在rabbitmq中扮演邮箱的角色。 虽然消息经过rabbitmq和应用程序,但它们只能存储在队列中。 队列只受主机的内存和磁盘限制的限制,它本质上是一个大的消息缓冲区。 许多生产者可以发送到一个队列的消息,许多消费者可以尝试从一个队列接收数据。
producer即为生产者,用来产生消息发送给队列。consumer是消费者,需要去读队列内的消息。producer,consumer和broker(rabbitmq server)不必驻留在同一个主机上;确实在大多数应用程序中它们是这样分布的。
简单队列
简单队列是最简单的一种模式,由生产者、队列、消费者组成。生产者将消息发送给队列,消费者从队列中读取消息完成消费。
在下图中,“p”是我们的生产者,“c”是我们的消费者。 中间的框是队列 - rabbitmq代表消费者的消息缓冲区。
java 方式
生产者
package com.anqi.mq.nat; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; public class myproducer { private static final string queue_name = "item_queue"; public static void main(string[] args) throws exception { //1. 创建一个 connectionfactory 并进行设置 connectionfactory factory = new connectionfactory(); factory.sethost("localhost"); factory.setvirtualhost("/"); factory.setusername("guest"); factory.setpassword("guest"); //2. 通过连接工厂来创建连接 connection connection = factory.newconnection(); //3. 通过 connection 来创建 channel channel channel = connection.createchannel(); //实际场景中,消息多为json格式的对象 string msg = "hello"; //4. 发送三条数据 for (int i = 1; i <= 3 ; i++) { channel.basicpublish("", queue_name, null, msg.getbytes()); system.out.println("send message" + i +" : " + msg); } //5. 关闭连接 channel.close(); connection.close(); } } /** * declare a queue * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autodelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.ioexception if an error is encountered */ queue.declareok queuedeclare(string queue, boolean durable, boolean exclusive, boolean autodelete,map<string, object> arguments) throws ioexception; /** * publish a message * @see com.rabbitmq.client.amqp.basic.publish * @param exchange the exchange to publish the message to * @param routingkey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.ioexception if an error is encountered */ void basicpublish(string exchange, string routingkey, basicproperties props, byte[] body) throws ioexception; /** * start a non-nolocal, non-exclusive consumer, with * a server-generated consumertag. * @param queue the name of the queue * @param autoack true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object * @return the consumertag generated by the server * @throws java.io.ioexception if an error is encountered * @see com.rabbitmq.client.amqp.basic.consume * @see com.rabbitmq.client.amqp.basic.consumeok * @see #basicconsume(string, boolean, string, boolean, boolean, map, consumer) */ string basicconsume(string queue, boolean autoack, consumer callback) throws ioexception;
消费者
package com.anqi.mq.nat; import com.rabbitmq.client.*; import java.io.ioexception; public class myconsumer { private static final string queue_name = "item_queue"; public static void main(string[] args) throws exception { //1. 创建一个 connectionfactory 并进行设置 connectionfactory factory = new connectionfactory(); factory.sethost("localhost"); factory.setvirtualhost("/"); factory.setusername("guest"); factory.setpassword("guest"); //2. 通过连接工厂来创建连接 connection connection = factory.newconnection(); //3. 通过 connection 来创建 channel channel channel = connection.createchannel(); //4. 声明一个队列 channel.queuedeclare(queue_name, true, false, false, null); system.out.println(" [*] waiting for messages. to exit press ctrl+c"); /* true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费 false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一 直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息, 直到该消费者反馈。 */ //5. 创建消费者并接收消息 consumer consumer = new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string message = new string(body, "utf-8"); system.out.println(" [x] received '" + message + "'"); } }; //6. 设置 channel 消费者绑定队列 channel.basicconsume(queue_name, true, consumer); } } send message1 : hello send message2 : hello send message3 : hello [*] waiting for messages. to exit press ctrl+c [x] received 'hello' [x] received 'hello' [x] received 'hello'
当我们启动生产者之后查看rabbitmq管理后台可以看到有一条消息正在等待被消费。
当我们启动消费者之后再次查看,可以看到积压的一条消息已经被消费。
总结
队列声明queuedeclare的参数:第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数。
basicconsume的第二个参数autoack: 应答模式,true:自动应答,即消费者获取到消息,该消息就会从队列中删除掉,false:手动应答,当从队列中取出消息后,需要程序员手动调用方法应答,如果没有应答,该消息还会再放进队列中,就会出现该消息一直没有被消费掉的现象。
这种简单队列的模式,系统会为每个队列隐式地绑定一个默认交换机,交换机名称为" (amqp default)",类型为直连 direct,当你手动创建一个队列时,系统会自动将这个队列绑定到一个名称为空的 direct 类型的交换机上,绑定的路由键 routing key 与队列名称相同,相当于channel.queuebind(queue:"queue_name", exchange:"(amqp default)“, routingkey:"queue_name");
虽然实例没有显式声明交换机,但是当路由键和队列名称一样时,就会将消息发送到这个默认的交换机中。这种方式比较简单,但是无法满足复杂的业务需求,所以通常在生产环境中很少使用这种方式。
the default exchange is implicitly bound to every queue, with a routing key equal to the queue name. it is not possible to explicitly bind to, or unbind from the default exchange. it also cannot be deleted.默认交换机隐式绑定到每个队列,其中路由键等于队列名称。不可能显式绑定到,或从缺省交换中解除绑定。它也不能被删除。
——引自 rabbitmq 官方文档
spring-amqp方式
引入 maven 依赖
<dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp-client</artifactid> <version>5.6.0</version> </dependency> <dependency> <groupid>org.springframework.amqp</groupid> <artifactid>spring-rabbit</artifactid> <version>2.1.5.release</version> </dependency>
spring 配置文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemalocation="http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"> <rabbit:connection-factory id="connectionfactory" host="localhost" virtual-host="/" username="guest" password="guest"/> <rabbit:template id="amqptemplate" connection-factory="connectionfactory"/> <rabbit:admin connection-factory="connectionfactory"/> <rabbit:queue name="my-queue"/> </beans>
使用测试
import org.springframework.amqp.core.amqptemplate; import org.springframework.context.applicationcontext; import org.springframework.context.support.classpathxmlapplicationcontext; public class main { public static void main(string[] args) { applicationcontext app = new classpathxmlapplicationcontext("spring/rabbit-context.xml"); amqptemplate amqptemplate = app.getbean(amqptemplate.class); amqptemplate.convertandsend("my-queue", "item"); string msg = (string) amqptemplate.receiveandconvert("my-queue"); system.out.println(msg); } }
参考方法
/** * convert a java object to an amqp {@link message} and send it to a specific exchange * with a specific routing key. * * @param exchange the name of the exchange * @param routingkey the routing key * @param message a message to send * @throws amqpexception if there is a problem */ void convertandsend(string exchange, string routingkey, object message) throws amqpexception; /** * receive a message if there is one from a specific queue and convert it to a java * object. returns immediately, possibly with a null value. * * @param queuename the name of the queue to poll * @return a message or null if there is none waiting * @throws amqpexception if there is a problem */ @nullable object receiveandconvert(string queuename) throws amqpexception;
近期热文推荐:
1.1,000+ 道 java面试题及答案整理(2021最新版)
2.终于靠开源项目弄到 intellij idea 激活码了,真香!
3.阿里 mock 工具正式开源,干掉市面上所有 mock 工具!
4.spring cloud 2020.0.0 正式发布,全新颠覆性版本!
觉得不错,别忘了随手点赞+转发哦!
推荐阅读
-
C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(一)
-
一个最简单的消息队列,带你理解 RabbitMQ!
-
C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(二)
-
使用SpringBoot中提供的的RabbitMQ的API来编写一个消息队列的例子
-
一个最简单的屏幕触摸动作,理解事件分发的3个方法
-
C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(二)
-
C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(一)
-
一个最简单的消息队列,带你理解 RabbitMQ!