RabbitMQ死信机制实现延迟队列
程序员文章站
2022-05-17 08:33:43
...
延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)
Time To Live(TTL)
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter(死信)
死信队列
DLX(Dead Letter Exchange),死信交换器。当队列中的消息被拒绝、或者过期会变成死信,死信可以被重新发布到另一个交换器,这个交换器就是DLX,与DLX绑定的队列称为死信队列。
造成死信的原因:
信息被拒绝
信息超时
超过了队列的最大长度
可以通过设置x-dead-letter-exchange参数指定DLX,设置x-dead-letter-routing-key指定DLX使用的路由键。
- 生产者
package com.ghgcn.mq.test02;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class ProducerDLX {
private static String userName = "root";
private static String password = "root123";
private static String host = "localhost";
private static int port = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(port);
// 1.建立连接
Connection conn = factory.newConnection();
// 2.建立信道
Channel channel = conn.createChannel();
// 3.声明交换器
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);
//4. 声明队列
Map<String, Object> arg = new HashMap<String, Object>();
// 设置DLX
arg.put("x-dead-letter-exchange", "exchange.dlx");
arg.put("x-dead-letter-routing-key", "routingkey.dlx");
// 设置消息过期时间,消息过期后,会重新发布到DLX
arg.put("x-message-ttl", 10000);
channel.queueDeclare("queue.normal", true, false, false, arg);
// 死信队列
channel.queueDeclare("queue.dlx", true, false, false, null);
//4. 绑定队列
channel.queueBind("queue.normal", "exchange.normal", "");
//这里的routingkey与上面设置的一样
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey.dlx");
byte[] body = "死信测试 ".getBytes();;
channel.basicPublish("exchange.normal", "", MessageProperties.PERSISTENT_TEXT_PLAIN, body );
TimeUnit.SECONDS.sleep(10);
channel.close();
conn.close();
}
}
- 消费者
package com.ghgcn.mq.test02;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DLXConsumer {
private static String userName = "root";
private static String password = "root123";
private static String host = "localhost";
private static int port = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(port);
// 1.建立连接
Connection conn = factory.newConnection();
// 2.建立信道
Channel channel = conn.createChannel();
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey.dlx");
channel.basicConsume("queue.dlx", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.err.println(" 死信 "+consumerTag);
System.err.println(" 死信 body"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
TimeUnit.SECONDS.sleep(100);
channel.close();
conn.close();
}
}
延迟队列
延迟队列存储的是延迟消息,延迟消息指的是,当消息被发发布出去之后,并不立即投递给消费者,而是在指定时间之后投递。如:在订单系统中,订单有30秒的付款时间,在订单超时之后在投递给消费者处理超时订单。
rabbitMq没有直接支持延迟队列,可以通过死信队列实现。在死信队列中,可以为普通交换器绑定多个消息队列,假设绑定过期时间为5分钟,10分钟和30分钟,3个消息队列,然后为每个消息队列设置DLX,为每个DLX关联一个死信队列。当消息过期之后,被转存到对应的死信队列中,然后投递给指定的消费者消费。
生产者
package com.ghgcn.mq.test02;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class DelayProducer {
private static String userName = "root";
private static String password = "root123";
private static String host = "localhost";
private static int port = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(port);
// 1.建立连接
Connection conn = factory.newConnection();
// 2.建立信道
Channel channel = conn.createChannel();
channel.exchangeDeclare("exchange.delay", "direct", true, false,
false, null);
// 创建dlx,用于将过期的message路由到不同的队列
channel.exchangeDeclare("exchange.dlx-5s", "fanout", true, false,
false, null);
// 创建死信队列,接收过期的message
channel.queueDeclare("queue-delay-5s", true, false, false, null);
// 创建两个消息过期队列,并设置dlx
Map<String, Object> arg = new HashMap<String, Object>();
arg.put("x-dead-letter-exchange", "exchange.dlx-5s");
arg.put("x-message-ttl", 5000);
channel.queueDeclare("queue-5s", true, false, false, arg);
// 队列与交换器绑定
channel.queueBind("queue-5s", "exchange.delay", "routingkey-5s");
channel.queueBind("queue-delay-5s", "exchange.dlx-5s", "");
// 发布消息
channel.basicPublish("exchange.delay", "routingkey-5s",
MessageProperties.PERSISTENT_TEXT_PLAIN,
("Message-5s"+new Date()).getBytes());
//释放资源
channel.close();
conn.close();
}
}
消费者
package com.ghgcn.mq.test02;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Queue5s {
private static String userName = "root";
private static String password = "root123";
private static String host = "localhost";
private static int port = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(port);
// 1.建立连接
Connection conn = factory.newConnection();
// 2.建立信道
Channel channel = conn.createChannel();
// 创建dlx,用于将过期的message路由到不同的队列
channel.exchangeDeclare("exchange.dlx-5s", "fanout", true, false, false, null);
// 创建死信队列,接收过期的message
channel.queueDeclare("queue-delay-5s", true, false, false, null);
channel.queueBind("queue-delay-5s", "exchange.dlx-5s", "");
channel.basicConsume("queue-delay-5s", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.err.println("queue-delay-5s "+new String(body)+" 接收时间 "+ new Date());
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("============确认完成 =============");
}
});
TimeUnit.SECONDS.sleep(100);
channel.close();
conn.close();
}
}
- x-dead-letter-exchange", “exchange.dlx-5s”); 声明死信交换器
- “x-message-ttl”, 5000); 过期时间
- 建立将死信交换器设置为fanout
- routingKey会默认与原来的一样
queue-delay-5s Message-5sThu May 30 17:56:00 CST 2019 接收时间 Thu May 30 17:56:05 CST 2019============确认完成 =============
envelope Envelope(deliveryTag=1, redeliver=false, exchange=exchange.dlx-5s, routingKey=routingkey-5s)
properties #contentHeader<basic>(content-type=text/plain, content-encoding=null, headers={x-first-death-exchange=exchange.delay, x-death=[{reason=expired, count=1, exchange=exchange.delay, time=Thu May 30 17:56:05 CST 2019, routing-keys=[routingkey-5s], queue=queue-5s}], x-first-death-reason=expired, x-first-death-queue=queue-5s}, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)