欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ死信机制实现延迟队列

程序员文章站 2022-05-17 08:33:43
...

延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)

Time To Live(TTL)
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter(死信)

死信队列

DLX(Dead Letter Exchange),死信交换器。当队列中的消息被拒绝、或者过期会变成死信,死信可以被重新发布到另一个交换器,这个交换器就是DLX,与DLX绑定的队列称为死信队列。
造成死信的原因:

信息被拒绝
信息超时
超过了队列的最大长度
可以通过设置x-dead-letter-exchange参数指定DLX,设置x-dead-letter-routing-key指定DLX使用的路由键。

RabbitMQ死信机制实现延迟队列

  • 生产者
package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class ProducerDLX {

	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();
		// 3.声明交换器
		channel.exchangeDeclare("exchange.dlx", "direct", true);
		channel.exchangeDeclare("exchange.normal", "fanout", true);
		
		//4. 声明队列
		Map<String, Object> arg = new HashMap<String, Object>();
		// 设置DLX
		arg.put("x-dead-letter-exchange", "exchange.dlx");
		arg.put("x-dead-letter-routing-key", "routingkey.dlx");
		// 设置消息过期时间,消息过期后,会重新发布到DLX
		arg.put("x-message-ttl", 10000);
		
		channel.queueDeclare("queue.normal", true, false, false, arg);
		//	死信队列
		channel.queueDeclare("queue.dlx", true, false, false, null);
	
		//4. 绑定队列
		channel.queueBind("queue.normal", "exchange.normal", "");
		//这里的routingkey与上面设置的一样
		channel.queueBind("queue.dlx", "exchange.dlx", "routingkey.dlx");
		
		
		byte[] body = "死信测试  ".getBytes();;
		
		channel.basicPublish("exchange.normal", "", MessageProperties.PERSISTENT_TEXT_PLAIN, body );
		
	
		TimeUnit.SECONDS.sleep(10);
		channel.close();
		conn.close();
	}

}

  • 消费者
package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DLXConsumer {


	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();
		channel.exchangeDeclare("exchange.dlx", "direct", true);
		channel.queueDeclare("queue.dlx", true, false, false, null);
		
		channel.queueBind("queue.dlx", "exchange.dlx", "routingkey.dlx");
		
		channel.basicConsume("queue.dlx", new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				
				System.err.println(" 死信 "+consumerTag);
				System.err.println(" 死信  body"+new String(body));
				
				channel.basicAck(envelope.getDeliveryTag(), false);
				
			}
			
		});
		
		
		
		TimeUnit.SECONDS.sleep(100);
		channel.close();
		conn.close();
	}

}

延迟队列

延迟队列存储的是延迟消息,延迟消息指的是,当消息被发发布出去之后,并不立即投递给消费者,而是在指定时间之后投递。如:在订单系统中,订单有30秒的付款时间,在订单超时之后在投递给消费者处理超时订单。
rabbitMq没有直接支持延迟队列,可以通过死信队列实现。在死信队列中,可以为普通交换器绑定多个消息队列,假设绑定过期时间为5分钟,10分钟和30分钟,3个消息队列,然后为每个消息队列设置DLX,为每个DLX关联一个死信队列。当消息过期之后,被转存到对应的死信队列中,然后投递给指定的消费者消费。

RabbitMQ死信机制实现延迟队列

RabbitMQ死信机制实现延迟队列

生产者

package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class DelayProducer {

	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();
		
		channel.exchangeDeclare("exchange.delay", "direct", true, false,
				false, null);
		// 创建dlx,用于将过期的message路由到不同的队列
		channel.exchangeDeclare("exchange.dlx-5s", "fanout", true, false,
				false, null);
		// 创建死信队列,接收过期的message
		channel.queueDeclare("queue-delay-5s", true, false, false, null);

		// 创建两个消息过期队列,并设置dlx
		Map<String, Object> arg = new HashMap<String, Object>();
		arg.put("x-dead-letter-exchange", "exchange.dlx-5s");
		arg.put("x-message-ttl", 5000);
		channel.queueDeclare("queue-5s", true, false, false, arg);

		// 队列与交换器绑定
		channel.queueBind("queue-5s", "exchange.delay", "routingkey-5s");
		channel.queueBind("queue-delay-5s", "exchange.dlx-5s", "");

		// 发布消息
		channel.basicPublish("exchange.delay", "routingkey-5s",
				MessageProperties.PERSISTENT_TEXT_PLAIN,
				("Message-5s"+new Date()).getBytes());
		//释放资源
		
		channel.close();
		
		conn.close();
		
	}

}

消费者

package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Queue5s {
	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();

		// 创建dlx,用于将过期的message路由到不同的队列
		channel.exchangeDeclare("exchange.dlx-5s", "fanout", true, false, false, null);
		// 创建死信队列,接收过期的message
		channel.queueDeclare("queue-delay-5s", true, false, false, null);

		channel.queueBind("queue-delay-5s", "exchange.dlx-5s", "");
		
		
		
		channel.basicConsume("queue-delay-5s", new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				
				System.err.println("queue-delay-5s  "+new String(body)+"  接收时间  "+ new Date());
				channel.basicAck(envelope.getDeliveryTag(), false);
				System.out.println("============确认完成 =============");
			}
			
		});

		TimeUnit.SECONDS.sleep(100);
		channel.close();
		conn.close();
	}

}

RabbitMQ死信机制实现延迟队列

  • x-dead-letter-exchange", “exchange.dlx-5s”); 声明死信交换器
  • “x-message-ttl”, 5000); 过期时间
  • 建立将死信交换器设置为fanout
  • routingKey会默认与原来的一样
queue-delay-5s  Message-5sThu May 30 17:56:00 CST 2019  接收时间  Thu May 30 17:56:05 CST 2019============确认完成 =============

envelope Envelope(deliveryTag=1, redeliver=false, exchange=exchange.dlx-5s, routingKey=routingkey-5s)
properties #contentHeader<basic>(content-type=text/plain, content-encoding=null, headers={x-first-death-exchange=exchange.delay, x-death=[{reason=expired, count=1, exchange=exchange.delay, time=Thu May 30 17:56:05 CST 2019, routing-keys=[routingkey-5s], queue=queue-5s}], x-first-death-reason=expired, x-first-death-queue=queue-5s}, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

相关标签: RabbitMQ