SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ
前言
RabbitMQ 官网 文档 提供7种消息队列模式,如下图所示
依赖导入
<dependency>
<groupId>com.github.luues</groupId>
<artifactId>spring-boot-starter-rabbitmq</artifactId>
<version>1.2.9.1.RELEASE</version>
</dependency>
普通消息队列
也就是一个生产者和一个消费者模式
生产者
public class Send {
public static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
Channel channel = null;
Connection connection = null;
factory.setHost("localhost");
//factory.setPort(5671);
try {
connection = factory.newConnection();
channel = connection.createChannel();
//创建生命队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World! 111 ";
//发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
factory.clone();
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
发送消息
消费者
public class Receiving {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
while (true){
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//true 监听队列 有消息就获取 没有就阻塞
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
}
接受消息
可以看到接受完消息后main方法没有停止,而是在监听消息处于阻塞状态。
work模式
一个生产者 多个消费者,只能有一个消费者接受到一个消息。
生产者
public static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
QUEUE_NAME, false, false,
false, null
);
//发送消息
for (int i = 0; i < 50; i++) {
String message = "message: " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(message);
try {
Thread.sleep(i * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
connection.close();
}
消费者1 消费者2 代码一样 这里只写一个
public static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//同一时刻只能发送一个消息给消费者
channel.basicQos(1);
channel.queueDeclare(
QUEUE_NAME, false, false,
false, null
);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
需要注意的是
//同一时刻只能发送一个消息给消费者 那个消费者早消费完 早可以拿消息进行消费 能者多劳
channel.basicQos(1);
消息的确认模式
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
- 模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
channel.basicConsume(QUEUE_NAME, true, consumer);
只需再监听的时候设置为true就可以了,只要消费者拿到消息就会确认。
- 模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
channel.basicConsume(QUEUE_NAME, false, consumer);
false 表示不自动确认消息
手动提交确认消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动进行确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列 不自动提交确认
channel.basicConsume(QUEUE_NAME, false, consumer);
订阅模式
一个生产者 多个消费者订阅 然后同时接受到消息
X 表示交换机
红色两个队列绑定到X交换机
订阅者模式:
- 1、1个生产者,多个消费者
- 2、每一个消费者都有自己的一一个队列
- 3、生产者没有将消息直接发送到队列,而是发送到了交换机
- 4、每个队列都要绑定到交换机
- 5、生产者发送的消息,经过交换机,到达队列,实现,-一个消息被多个消费者获取的目的
发布者
//交换机
private static final String EXCHANGE_NAME = "exchange_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//消息
String msg = "订单更新..... ";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("发送的消息 : " + msg);
channel.close();
connection.close();
}
两个订阅者
队列为 test_queue1_name 的订阅者
package com.example.demo.publish;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @description:
* @author: Administrator
* @create: 2020-07-12 17:55
**/
public class Rece1 {
//交换机
private static final String EXCHANGE_NAME = "exchange_logs";
//队列
public static final String QUEUE_NAME = "test_queue1_name";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received1 '" + message + "'");
};
//自动确认消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
队列为 test_queue2_name 的订阅者
package com.example.demo.publish;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @description:
* @author: Administrator
* @create: 2020-07-12 17:55
**/
public class Rece2 {
//交换机
private static final String EXCHANGE_NAME = "exchange_logs";
//队列
public static final String QUEUE_NAME = "test_queue2_name";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received2 '" + message + "'");
};
//自动确认消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
控制台中可以看到绑定的两个队列
发布的消息
订阅的消息
路由模式
x表示交换器
type表示交换器要发送消息的类型key
不同的key发送到不同的队列中去,可以将数据类型分开进行数据订阅,从而进行不同的数据处理。
路由流程图
根据不同的key进行订阅
如:前台新增一个商品数据,此时我们不需要写到redis中
前台删除一个商品数据,我们需要从redis中删除这条数据,也需要从数据库删除这条数据。
在发布订阅的基础代码上更改
但是必须设置事件类型为
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
路由发布者
//消息
String msg = "新增商品..... ";
channel.basicPublish(EXCHANGE_NAME,"add",null,msg.getBytes());
System.out.println("发送的消息 : " + msg);
删除redis缓存发布者
//消息
String msg = "删除商品..... ";
channel.basicPublish(EXCHANGE_NAME,"del",null,msg.getBytes());
System.out.println("发送的消息 : " + msg);
路由订阅者
增加订阅者
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "add");
//同时订阅多个路由类型可以多写一行
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
删除订阅者
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "del");
通配符匹配模式(Topic)
通配符匹配模式是对路由模式的
大白话来说:路由模式是全匹配key必须一样,通配符是模糊匹配。
通配符匹配模式图
匹配符
*
表示匹配一个#
表示匹配多个
例如:
设置事件类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
接收iteme.add消息
//交换机
private static final String EXCHANGE_NAME = "exchange_topic";
//队列
public static final String QUEUE_NAME = "router_queue_topic_add";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.add");
//同一时刻只接受一条消息
channel.basicQos(1);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 新增商品 '" + message + "'");
};
//自动确认消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
接收iteme.del删除消息
//交换机
private static final String EXCHANGE_NAME = "exchange_topic";
//队列
public static final String QUEUE_NAME = "router_queue_topic_del";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//绑定队列到交换机 绑定del路由 key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.del");
//同一时刻只接受一条消息
channel.basicQos(1);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 删除redis缓存 '" + message + "'");
};
//自动确认消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
接收所有消息
//交换机
private static final String EXCHANGE_NAME = "exchange_topic";
//队列
public static final String QUEUE_NAME = "router_queue_topic_all";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//绑定队列到交换机 绑定del路由 key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.#");
//同一时刻只接受一条消息
channel.basicQos(1);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 获取所有消息 '" + message + "'");
};
//自动确认消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
发送消息
//交换机
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//消息
String msg = "新增消息接收..... ";
channel.basicPublish(EXCHANGE_NAME,"iteme.add",null,msg.getBytes());
//String msg = "删除消息接收..... ";
//channel.basicPublish(EXCHANGE_NAME,"iteme.del",null,msg.getBytes());
System.out.println("发送的消息 : " + msg);
channel.close();
connection.close();
}
topic消息队列功能更强大,可以兼容路由和发布订阅模式的消息功能。
官网文档地址 https://www.rabbitmq.com/getstarted.html
环境安装 https://blog.csdn.net/weixin_38361347/article/details/107292227