【RabbitMQ】——队列模式(1)
MQ:Message Queue,消息队列,是系统和系统之间的通信方法。
RabbitMQ是MQ的一种实现,较为常用的还有ActiveMQ,Kafka(分布式发布订阅消息系统)等。本篇博客是RabbitMQ的入门,简单的了解一些RabbitMQ的5中队列,分别为:简单队列、work模式、订阅模式、路由模式、通配符模式。
环境:windows上安装RabbitMQ
工具类:用来获取连接的,下面所有队列的中获取连接时使用,具体实现如下
public class ConnectionUtil {
publicstatic Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
简单队列:
P:消息生成者
C:消息消费者
红色:消息队列
- 最简单的模式,生产者将消息发送到队列,消费者从队列中获取消息。
- 一条消息只能被消费一次,消费完毕后消息被删除。
生产者代码:
public class Send {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道,ConnectionUtil为封装的工具类
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null,message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
启动完毕后从服务上查询消息队列,如下:
有一个名为test_queue的队列,消息数量为1。
消费者代码:
public class Recv {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列,要创建的队列已经存在则不再创建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message +"'");
}
}
}
work模式:
P:生产者
C1、C2:消费者
红色:消息队列
- 一条消息只能被一个消费者消费
生产者代码:
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送50条消息
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null,message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
消费者1:获取一条消息休眠10ms后,返回确认状态
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message +"'");
//休眠
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2:获取一条消息休眠1000ms后,返回确认状态
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message +"'");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
运行结果:
消费者1和消费者2获得的消息数量相同
但是这样是不合理的,消费者2获得消息后休眠的时间长,所以它得到的消息数量应该少用消费者1。
work模式——能者多劳
将上面两个消费者中的注解代码 channel.basicQos(1) 解注,这就意味着同一时刻服务器只发一条消息给消费者,消费者返回确认状态后获得新的消息,这样消费者2获得的消息数量就少于消费者1.
消息确认模式:
自动确认:
消费者从队列中获取消息,无论消费者获取到消息后是否成功消费,服务端都会认为消息已经被成功消费。
如下代码中,消费者不需要向服务器反馈
手动确认:
消费者从队列获得消息,服务器会将该消息标记为不可用状态,等待消费者反馈。如果消费者一直没有反馈消息将一直处于不可用状态。
手动模式消费者需要向服务端反馈状态。
小结:
文章中的两种MQ模式组成包括生产者、消费者、队列,还有在代码中体现出来的Connection连接、Channel通道。MQ的使用和hibernate、mysql的流程及其相像。首先获得连接,通过连接获得通道,通过通道完成消息队列的声明、消息的发布,对消息的监听等系列工作。
上一篇: python3通过qq邮箱发送邮件