欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ:队列优先级和消息优先级的介绍和使用

程序员文章站 2022-05-03 11:57:51
...

1.声明

当前内容主要用于本人学习和复习之用,当前内容包括如何创建优先级队列和使用消息优先级,以及不设定优先级和设定优先级队列之间的区别。包括探讨server如何是实现优先级的

当前内容来源:RabbitMQ官方文档

2.官方的Priority Queue Support介绍

RabbitMQ has priority queue implementation in the core as of version 3.5.0. Any queue can be turned into a priority one using client-provided optional arguments (but, unlike other features that use optional arguments, not policies). The implementation supports a limited number of priorities: 255. Values between 1 and 10 are recommended.

RabbitMQ在3.5.0版本的时候实现了优先级队列。任何一个队列都可以通过客户端配置参数方式设置一个优先级(但是不能使用策略的方式配置这个参数)。当前优先级的最大值为:255。这个值最好在1到10之间

3.官方的Using Client-provided Optional Arguments介绍

To declare a priority queue, use the x-max-priority optional queue argument. This argument should be a positive integer between 1 and 255, indicating the maximum priority the queue should support.

通过声明队列方式并使用参数x-max-priority指定当前的队列为优先级队列。这个优先级队列支持的参数必须是一个整数在1到255.

Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);

分析发现:队列的优先级设置只能通过声明方式设定,不能通过策略方式修改某个队列

Publishers then can publish prioritised messages using the priority field of basic.properties. Larger numbers indicate higher priority.

消息发布者可以发送一个优先级消息通过basic.properties.数字越大表示优先级越高

4.官方的Interaction with Consumers介绍

It’s important to understand how consumers work when working with priority queues. By default, consumers may be sent a large number of messages before they acknowledge any, limited only by network backpressure.

这是非常重要的知道这个消费者与优先级的队列如何一起工作的。可能会在消费者确认任何消息之前向消费者发送大量消息,这仅受网络背压的限制。

So if such a hungry consumer connects to an empty queue to which messages are subsequently published, the messages may not spend any time at all waiting in the queue. In this case the priority queue will not get any opportunity to prioritise them.

如此如果一个饥饿的消费者连接到一个空的队列,然后再发送一些消息,那么这些消息可能不会被在队列中等待。在这种情况下,这个优先级队列及那个不能将该消息排序

分析发现:所谓的优先级队列就是,当消费者阻塞的时候,对具有优先级的消息直接按照优先级排序操作,然后按照优先级在一个一个的发送给消费者,这里需要多个条件(优先级队列、优先级消息、消费者阻塞、并且server对消费者排序)

In most cases you will want to use the basic.qos method in manual acknowledgement mode on your consumers, to limit the number of messages that can be out for delivery at any time and thus allow messages to be prioritised.

在大多数情况下,您将需要在使用者的 手动确认模式下使用basic.qos方法,以限制可以随时传递的邮件数量,从而使邮件具有优先级

4.测试使用优先级队列和非优先级队列(使用优先级消息)

创建名称为test的交换机,并使用topic类型

1.创建优先级队列(将其和test绑定,路由为“”)

public class QueuePriorityTest {
	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory factory=new ConnectionFactory();
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		Map<String, Object> arguments = new HashMap<String, Object>();
		arguments.put("x-max-priority", 10);
	
		channel.queueDeclare("pirority-10-queue", true, false, false, arguments);
		channel.queueBind("pirority-10-queue", "test", "");
	}
}

RabbitMQ:队列优先级和消息优先级的介绍和使用

2.创建非优先级队列hello(将其和test绑定,路由为"")
RabbitMQ:队列优先级和消息优先级的介绍和使用

3.创建优先级消息的生产者

public class MsgPrioritySender {
	public static void main(String[] args) {
		RabbitMqUtils mqUtils=new RabbitMqUtils();
		Random random=new Random();			
		BasicProperties.Builder builder=new BasicProperties.Builder();
		builder.deliveryMode(2);
		for (int i = 0; i < 10; i++) {
			int nextInt = random.nextInt(10);
			builder.priority(nextInt);
			mqUtils.send("test", "", true, builder.build(), "你好【"+(i+1)+"】===>");
		}
		
	}
}

4.创建优先级队列的消费者

/**
 * @description 队列优先级的测试
 * @author hy
 * @date 2020-05-18
 */
public class QueuePriorityConsumerTest2 {
	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		channel.basicQos(1);
		channel.basicConsume("pirority-10-queue", false, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消费者开始消费pirority-10-queue队列开始处理消息===>" + new String(body, "utf-8") + ",优先级==>"
						+ properties.getPriority());
				channel.basicAck(envelope.getDeliveryTag(), false);
			}

		});
	}
}

5.创建非有衔接队列的消费者

/**
 * @description 队列优先级的测试
 * @author hy
 * @date 2020-05-18
 */
public class NoPriorityConsumerTest {
	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory factory=new ConnectionFactory();
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		channel.basicQos(1);
		channel.basicConsume("hello", false, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				//System.out.println(properties);
				System.out.println("消费者开始消费hello队列开始处理消息===>" + new String(body, "utf-8")+",优先级==>"+properties.getPriority());
				channel.basicAck(envelope.getDeliveryTag(), false);
			}

		});
	}
}

5.测试方式1(先启动消息生产者、然后启动消费者1(非优先级)和消费者2(优先级))

优先级消费者接收的消息是按照优先级排序后的消息
RabbitMQ:队列优先级和消息优先级的介绍和使用
非优先级队列接收的是原来发送消息的顺序
RabbitMQ:队列优先级和消息优先级的介绍和使用
6.测试方式2(先启动两个消费者,再启动消息生产者)观察发现
RabbitMQ:队列优先级和消息优先级的介绍和使用
RabbitMQ:队列优先级和消息优先级的介绍和使用
此时发现两个结果完全一致,原因就是前面的,当已近存在消费者了,那么在发送消息的时候,不会经过按照优先级排序,直接发送消息出去

5.分析上面优先级队列的问题

1.优先级队列和非优先级队列在消费者不忙的情况下发送消息的结果完全一致

2.优先级队列,其实就是就是在消息积压的时候进行按照优先级排序,它需要排序

6.实现每次接收优先级消息

只要打破这个消费者不忙,有多余的时间让server对消息按照优先级排序即可(直接让每次进行应答的时候休眠或者忙以下就可以了)

/**
 * @description 队列优先级的测试
 * @author hy
 * @date 2020-05-18
 */
public class QueuePriorityConsumerTest2 {
	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		channel.basicQos(1);
		channel.basicConsume("pirority-10-queue", false, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				
				// 直接休眠5秒,让当前的发布者进行重新排序
				try {
					Thread.sleep(2000l);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				System.out.println("消费者开始消费pirority-10-queue队列开始处理消息===>" + new String(body, "utf-8") + ",优先级==>"
						+ properties.getPriority());
				channel.basicAck(envelope.getDeliveryTag(), false);
			}

		});
	}
}

测试结果
RabbitMQ:队列优先级和消息优先级的介绍和使用
RabbitMQ:队列优先级和消息优先级的介绍和使用

7.总结

1.优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能

2.优先级队列必须在消费者繁忙的时候,才能对消息按照优先级排序

3.非优先级队列发送优先级消息是不会排序的,所以向非优先级队列发送优先级是没有任何作用的

以上纯属个人见解,如有问题请联本人!

相关标签: 消息队列