RabbitMQ简介
程序员文章站
2022-03-25 16:57:38
RabbitMQ简介 官网也讲述的清楚而且还有例子,只不过是英文的,很多人看到英文就不明白说什么了吧,即使有翻译成中文,总觉得哪里怪怪的,有些翻译并不流畅。我还是支持多看官网,官网:https://www.rabbitmq.com/ 。下面是自己做的一点下笔记,有参考其他文档。如有什么不对的地方,希 ......
rabbitmq简介
官网也讲述的清楚而且还有例子,只不过是英文的,很多人看到英文就不明白说什么了吧,即使有翻译成中文,总觉得哪里怪怪的,有些翻译并不流畅。我还是支持多看官网,官网: 。下面是自己做的一点下笔记,有参考其他文档。如有什么不对的地方,希望大家能够告诉我,通过留言板,像消息队列一样,你发送消息,我接收消息。
一、消息队列(mq)
- 什么是消息队列:即mq,message queue。是一种应用程序对应用对应用程序的通信方式。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来连接他们。消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断从队列中获取消息。【因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦】
-
amqp和jms
- amqp:即advanced message queuing protocol,一个提供统一消息服务的应用层标准(高级消息队列协议)。【限定了数据传输的格式和方式,跨语言跨平台,和http协议类似】。是应用层协议的一个开放标准,为面向消息的中间件设计。
- jms:java messageservice,实际上指jms api。jms是sun公司早期提出的消息标准,旨在为java应用提供统一的消息操作,包括create、send、recieve等。jms已经成为java enterprise edition的一部分。
-
两者间的区别和联系:
- jms是定义了统一的接口,来对消息操作进行统一;amqp是通过规定协议来统一数据交互的格式。
- jms限定了必须使用java语言;amqp只是协议,不规定实现方式,因此是跨语言的。
- jms规定了两种消息模型;而amqp的消息模型更加丰富。
- 常见mq产品:
- activemq:基于jms, apache旗下的
- rabbitmq:基于amqp协议,erlang(一种通用的面向并发的编程语言)语言开发,稳定性好
- rocketmq:基于jms,阿里巴巴产品,目前交由apache基金会
- kafka:分布式消息系统,高吞吐量
二、rabbitmq:
- 下载安装:先安装erlang,在安装rabbitmq。(这里不详细说明,如果有不是很清楚的,可以去搜索,相信很多伙伴已经分享过很多这样的博客啦)
- 启动步骤【window系统下的cmd命令行】:cd \rabbitmq server\rabbitmq_server-3.7.15\sbin(进入到你安装的rabbitmq目录的sbin目录) --> rabbitmq-plugins enable rabbitmq_management --> rabbitmq-server【浏览器访问窗口:】
-
五种消息模型:rabbitmq提供了6中消息模型,但是第6种rpc,并不是mq。其中3/4/5这三种属于订阅模式,只不过进行路由的方式不同。
-
基本消息模型:rabbitmq是一个消息的代理者(message broker):它接收消息并且传递消息。你可以认为它是一个邮局,当你投递邮件到一个邮箱,你很肯定邮递员终究会将邮件递交给你的收件人。与此类似,rabbitmq可以是一个邮箱、邮局、同时还是邮递员。不同之处在于:rabbitmq不是传递纸质邮件,而是二进制的数据。
- 问题:那么rabbitmq怎么知道消息被接收了呢?(转换思维,即如何避免消息的丢失?)【答:消费者的消息确认机制acknowlege,当消费者获取消息后,会向rabbitmq发送回执ack,告知消息已经被接收。不过这种回执ack分两种情况:① 自动ack:消息一旦被接收,消费者自动发送ack;② 手动ack:消息接收后,不会发送ack,需要手动调用。很好理解,望名知意嘛,嘿嘿嘿!!!】。
-
public class connectionutil { /** * 建立与rabbitmq的链接 */ public static connection getconnection() throws ioexception, timeoutexception { // 定义连接工厂 connectionfactory factory = new connectionfactory(); // 设置服务地址 factory.sethost("127.0.0.1"); // 端口 factory.setport(5672); // 设置账号信息,用户名、密码、vhost factory.setvirtualhost("/demo"); factory.setusername("guest"); factory.setpassword("guest"); // 通过工厂获取连接 connection connection = factory.newconnection(); return connection; } }
public class send { private final static string queue_name = "simple_queue"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到连接 connection connection = connectionutil.getconnection(); // 从连接中创建通道,使用通道才能完成消息相关的操作 channel channel = connection.createchannel(); // 声明(创建)队列 channel.queuedeclare(queue_name, false, false, false, null); // 消息内容 string message = "hello word!"; // 向指定的队列中发送消息 channel.basicpublish("", queue_name, null, message.getbytes()); system.out.println(" [x] sent '" + message + "'"); // 关闭通道和连接 channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "simple_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); } }; // 监听队列,第二个参数:是否自动进行消息确认 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "simple_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); // 手动进行ack channel.basicack(envelope.getdeliverytag(), false); } }; // 监听队列,第二个参数false,手动进行ack channel.basicconsume(queue_name, false, consumer); } }
-
work消息模型:也称为task queue任务模型。当消息处理比较耗时的时候,可能产生消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用该模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
- 问题:又会存在疑惑,任务是如何分配的,可以和显示生活中任务的分配联想起来呦?【答案:① 默认是任务平分,一次分配完全(即公平分配);② channel.basicqos(int num); 设置每个消费者同时只能处理num条数据(即能者多劳,耗时小的多处理些,你懂滴)】。
-
public class send { private final static string queue_name = "task_work_queue"; public static void main(string[] args) throws exception { // 获取到连接 connection connection = connectionutil.getconnection(); // 从连接中创建通道,使用通道才能完成消息相关的操作 final channel channel = connection.createchannel(); // 声明(创建)队列 channel.queuedeclare(queue_name, false, false, false, null); // 循环发布任务 for (int i=0; i<50; i++) { // 消息内容 string message = "task ... " + i; channel.basicpublish("", queue_name, null, message.getbytes()); system.out.println(" [x] sent '" + message + "'"); thread.sleep(i * 2); } // 关闭通道和连接 channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "task_work_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 设置每个消费同时只能处理一条消息 channel.basicqos(1); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); try { // 模拟完成任务的耗时:1000ms thread.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } channel.basicack(envelope.getdeliverytag(), false); } }; // 监听队列,第二个参数:是否自动进行消息确认 channel.basicconsume(queue_name, false, consumer); } }
/* * 对比上个消费者:耗时小,完成任务多些 */ public class recv2 { private final static string queue_name = "task_work_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 final channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 设置每个消费同时只能处理一条消息 channel.basicqos(1); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); // 手动进行ack channel.basicack(envelope.getdeliverytag(), false); } }; // 监听队列,第二个参数false,手动进行ack channel.basicconsume(queue_name, false, consumer); } }
-
订阅模型分类:
-
订阅模型 - fanout:广播。一条消息,会被所有订阅的队列消费。
-
public class send { private final static string exchange_name = "fanout_exchange_test"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明exchange,指定类型为fanout channel.exchangedeclare(exchange_name, builtinexchangetype.fanout); // 消息内容 string message = "hello everyone"; // 发布消息到exchange channel.basicpublish(exchange_name, "", null, message.getbytes()); system.out.println(" [生产者] sent '" + message + "'"); channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "fanout_exchange_queue_1"; private final static string exchange_name = "fanout_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机 channel.queuebind(queue_name, exchange_name, ""); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者1] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "fanout_exchange_queue_2"; private final static string exchange_name = "fanout_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机 channel.queuebind(queue_name, exchange_name, ""); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者2] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
-
-
订阅模型 - direct:不同的消息被不同的队列消费。在direct模型下:
- 队列与交换机的绑定,不能是任意绑定,而是要指定至少一个 routingkey(路由key);
- 消息的发送方向 exchange 发送消息时,也必须指定消息的 routingkey;
- exchange 不再把消息交给每一个绑定的队列,而是根据消息的 routingkey 进行判断,只有队列的routingkey与消息的routingkey完全一致,才会接收到消息。
-
public class send { private final static string exchange_name = "direct_exchange_test"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明exchange,指定类型为direct channel.exchangedeclare(exchange_name, builtinexchangetype.direct); // 消息内容 string message = "商品增加了,id = 1002"; // 发布消息到exchange,并且指定routing key为:delete,代表删除商品 channel.basicpublish(exchange_name, "insert", null, message.getbytes()); system.out.println(" [商品服务] sent '" + message + "'"); channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "direct_exchange_queue_1"; private final static string exchange_name = "direct_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "update"); channel.queuebind(queue_name, exchange_name, "delete"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者1] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "direct_exchange_queue_2"; private final static string exchange_name = "direct_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "update"); channel.queuebind(queue_name, exchange_name, "delete"); channel.queuebind(queue_name, exchange_name, "insert"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者2] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
-
订阅模型 - topic:可以根据routingkey把消息路由到不同的队列,topic类型exchange可以让队列在绑定routingkey的时候使用通配符。
- routingkey 一般都是有一个或多个单词组成,多个单词之间以“ . ”分割,例:item.insert。
-
通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好一个词
-
public class send { private final static string exchange_name = "topic_durable_exchange_test"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 开启生产者确认 // channel.confirmselect(); // 声明exchange,指定类型为topic, 并且设置durable为true,持久化 channel.exchangedeclare(exchange_name, builtinexchangetype.topic, true); // 消息内容 string message = "商品新增了,id = 1002"; // 发布消息到exchange,并且指定routing key,消息持久化 channel.basicpublish(exchange_name, "item.insert", messageproperties.persistent_text_plain, message.getbytes()); system.out.println(" [商品服务] sent '" + message + "'"); // 等待rabbitmq的确认消息,true为确认收到,false为发出有误 // channel.waitforconfirms(); channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "topic_durable_exchange_queue_1"; private final static string exchange_name = "topic_durable_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列, 第二个参数:true代表声明为持久化 channel.queuedeclare(queue_name, true, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "item.update"); channel.queuebind(queue_name, exchange_name, "item.delete"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者1] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "topic_durable_exchange_queue_2"; private final static string exchange_name = "topic_durable_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, true, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "item.*"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者2] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
-
订阅模型 - fanout:广播。一条消息,会被所有订阅的队列消费。
-
基本消息模型:rabbitmq是一个消息的代理者(message broker):它接收消息并且传递消息。你可以认为它是一个邮局,当你投递邮件到一个邮箱,你很肯定邮递员终究会将邮件递交给你的收件人。与此类似,rabbitmq可以是一个邮箱、邮局、同时还是邮递员。不同之处在于:rabbitmq不是传递纸质邮件,而是二进制的数据。
-
当然啦,国内这么hot的rabbitmq自然也是有集成到了springboot中滴,开心。
-
maven坐标依赖:
<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.0.4.release</version> </parent> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> </dependencies>
-
application.yml:
spring: rabbitmq: host: 127.0.0.1 username: guest password: guest virtual-host: /demo
-
@component public class listener { @rabbitlistener(bindings = @queuebinding( value = @queue(value = "spring.test.queue", durable = "true"), exchange = @exchange( value = "spring.test.exchange", type = exchangetypes.topic), key = {"#.#"})) public void listen(string msg) { system.out.println("接收到的消息: " + msg); } }
-
maven坐标依赖:
-
持久化:要将消息持久化,前提是:队列、exchange都持久化。
- 交换机持久化:channel.exchangedeclare(exchange_name, builtinexchangetype.topic, true); // 参数三:设置durable为true。
- 队列持久化:channel.queuedeclare(queue_name, true, false, false, null); // 参数二:设置为true,表示设置队列持久化。
- 消息持久化:channel.basicpublish(exchange_name, "item.insert", messageproperties.persistent_text_plain, message.getbytes());
-
解决消息丢失?
- ack(消费者确认,由消费者向mq发送,防止消息丢失于消费者)
- 持久化(防止rabbitmq把消息丢失)
- 生产者确认机制publisher confirm(由mq向生产者发送,有些mq包含,有些不包含,比如:activemq不包含该机制,rabbitmq包含该机制)
- 发送消息前,将消息持久化到数据库,并记录消息状态(可靠消息服务)
- 思考问题(这个问题就留给你们思考啦?冲啊):如何保证消息发送的重复性,如何保证接口的幂等性(同一接口被重复执行,其结果一致)?【提示:加标识 消息的重发要谨慎】
上一篇: 这样查生产问题,不做背锅侠