SpringBoot集成RabbitMQ的方法(死信队列)
程序员文章站
2023-12-09 17:42:03
介绍
死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeu...
介绍
死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息ttl过期
场景
1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理
使用
安装mq
使用docker方式安装,选择带mangement的版本
docker pull rabbitmq:management docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
访问 localhost: 15672,默认账号密码guest/guest
项目配置
(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
(3)队列配置
package com.df.ps.mq; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.queue; import org.springframework.amqp.core.topicexchange; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer; import org.springframework.amqp.rabbit.listener.adapter.messagelisteneradapter; import org.springframework.beans.factory.annotation.autowire; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import java.util.hashmap; import java.util.map; @configuration public class mqconfig { //time @value("${spring.df.buffered.min:120}") private int springdfbufferedtime; @value("${spring.df.high-buffered.min:5}") private int springdfhighbufferedtime; @value("${spring.df.low-buffered.min:120}") private int springdflowbufferedtime; // 30min buffered queue @value("${spring.df.queue:spring-df-buffered-queue}") private string springdfbufferedqueue; @value("${spring.df.topic:spring-df-buffered-topic}") private string springdfbufferedtopic; @value("${spring.df.route:spring-df-buffered-route}") private string springdfbufferedroutekey; // 5m buffered queue @value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}") private string springdfhighbufferedqueue; @value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}") private string springdfhighbufferedtopic; @value("${spring.df.high-buffered.route:spring-df-high-buffered-route}") private string springdfhighbufferedroutekey; // high queue @value("${spring.df.high.queue:spring-df-high-queue}") private string springdfhighqueue; @value("${spring.df.high.topic:spring-df-high-topic}") private string springdfhightopic; @value("${spring.df.high.route:spring-df-high-route}") private string springdfhighroutekey; // 2h low buffered queue @value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}") private string springdflowbufferedqueue; @value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}") private string springdflowbufferedtopic; @value("${spring.df.low-buffered.route:spring-df-low-buffered-route}") private string springdflowbufferedroutekey; // low queue @value("${spring.df.low.queue:spring-df-low-queue}") private string springdflowqueue; @value("${spring.df.low.topic:spring-df-low-topic}") private string springdflowtopic; @value("${spring.df.low.route:spring-df-low-route}") private string springdflowroutekey; @bean(autowire = autowire.by_name, value = "springdfbufferedqueue") queue springdfbufferedqueue() { int bufferedtime = 1000 * 60 * springdfbufferedtime; return createbufferedqueue(springdfbufferedqueue, springdfhighbufferedtopic, springdfhighbufferedroutekey, bufferedtime); } @bean(autowire = autowire.by_name, value = "springdfhighbufferedqueue") queue springdfhighbufferedqueue() { int highbufferedtime = 1000 * 60 * springdfhighbufferedtime; return createbufferedqueue(springdfhighbufferedqueue, springdfhightopic, springdfhighroutekey, highbufferedtime); } @bean(autowire = autowire.by_name, value = "springdfhighqueue") queue springdfhighqueue() { return new queue(springdfhighqueue, true); } @bean(autowire = autowire.by_name, value = "springdflowbufferedqueue") queue springdflowbufferedqueue() { int lowbufferedtime = 1000 * 60 * springdflowbufferedtime; return createbufferedqueue(springdflowbufferedqueue, springdflowtopic, springdflowroutekey, lowbufferedtime); } @bean(autowire = autowire.by_name, value = "springdflowqueue") queue springdflowqueue() { return new queue(springdflowqueue, true); } @bean(autowire = autowire.by_name, value = "springdfbufferedtopic") topicexchange springdfbufferedtopic() { return new topicexchange(springdfbufferedtopic); } @bean binding springbuffereddf(queue springdfbufferedqueue, topicexchange springdfbufferedtopic) { return bindingbuilder.bind(springdfbufferedqueue).to(springdfbufferedtopic).with(springdfbufferedroutekey); } @bean(autowire = autowire.by_name, value = "springdfhighbufferedtopic") topicexchange springdfhighbufferedtopic() { return new topicexchange(springdfhighbufferedtopic); } @bean binding springhighbuffereddf(queue springdfhighbufferedqueue, topicexchange springdfhighbufferedtopic) { return bindingbuilder.bind(springdfhighbufferedqueue).to(springdfhighbufferedtopic).with(springdfhighbufferedroutekey); } @bean(autowire = autowire.by_name, value = "springdfhightopic") topicexchange springdfhightopic() { return new topicexchange(springdfhightopic); } @bean binding springhighdf(queue springdfhighqueue, topicexchange springdfhightopic) { return bindingbuilder.bind(springdfhighqueue).to(springdfhightopic).with(springdfhighroutekey); } @bean(autowire = autowire.by_name, value = "springdflowbufferedtopic") topicexchange springdflowbufferedtopic() { return new topicexchange(springdflowbufferedtopic); } @bean binding springlowbuffereddf(queue springdflowbufferedqueue, topicexchange springdflowbufferedtopic) { return bindingbuilder.bind(springdflowbufferedqueue).to(springdflowbufferedtopic).with(springdflowbufferedroutekey); } @bean(autowire = autowire.by_name, value = "springdflowtopic") topicexchange springdflowtopic() { return new topicexchange(springdflowtopic); } @bean binding springlowdf(queue springdflowqueue, topicexchange springdflowtopic) { return bindingbuilder.bind(springdflowqueue).to(springdflowtopic).with(springdflowroutekey); } @bean simplemessagelistenercontainer container(connectionfactory connectionfactory, messagelisteneradapter listeneradapter) { simplemessagelistenercontainer container = new simplemessagelistenercontainer(); container.setconnectionfactory(connectionfactory); container.setqueuenames(springdfhighqueue, springdflowqueue); container.setmessagelistener(listeneradapter); return container; } @bean messagelisteneradapter listeneradapter(integrationreceiver receiver) { messagelisteneradapter adapter = new messagelisteneradapter(receiver); adapter.setdefaultlistenermethod("receive"); map<string, string> queueortagtomethodname = new hashmap<>(); queueortagtomethodname.put(springdfhighqueue, "springdfhighreceive"); queueortagtomethodname.put(springdflowqueue, "springdflowreceive"); adapter.setqueueortagtomethodname(queueortagtomethodname); return adapter; } private queue createbufferedqueue(string queuename, string topic, string routekey, int bufferedtime) { map<string, object> args = new hashmap<>(); args.put("x-dead-letter-exchange", topic); args.put("x-dead-letter-routing-key", routekey); args.put("x-message-ttl", bufferedtime); // 是否持久化 boolean durable = true; // 仅创建者可以使用的私有队列,断开后自动删除 boolean exclusive = false; // 当所有消费客户端连接断开后,是否自动删除队列 boolean autodelete = false; return new queue(queuename, durable, exclusive, autodelete, args); } }
消费者配置
package com.df.ps.mq; import com.fasterxml.jackson.databind.objectmapper; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.value; import java.util.map; public class mqreceiver { private static logger logger = loggerfactory.getlogger(mqreceiver.class); @value("${high-retry:5}") private int highretry; @value("${low-retry:5}") private int lowretry; @value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}") private string springdfhighbufferedtopic; @value("${spring.df.high-buffered.route:spring-df-high-buffered-route}") private string springdfhighbufferedroutekey; @value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}") private string springdflowbufferedtopic; @value("${spring.df.low-buffered.route:spring-df-low-buffered-route}") private string springdflowbufferedroutekey; private final rabbittemplate rabbittemplate; @autowired public mqreceiver(rabbittemplate rabbittemplate) { this.rabbittemplate = rabbittemplate; } public void receive(object message) { if (logger.isinfoenabled()) { logger.info("default receiver: " + message); } } /** * 消息从初始队列进入5分钟的高速缓冲队列 * @param message */ public void highreceiver(object message){ objectmapper mapper = new objectmapper(); map msg = mapper.convertvalue(message, map.class); try{ logger.info("这里做消息处理..."); }catch (exception e){ int times = msg.get("times") == null ? 0 : (int) msg.get("times"); if (times < highretry) { msg.put("times", times + 1); rabbittemplate.convertandsend(springdfhighbufferedtopic,springdfhighbufferedroutekey,message); } else { msg.put("times", 0); rabbittemplate.convertandsend(springdflowbufferedtopic,springdflowbufferedroutekey,message); } } } /** * 消息从5分钟缓冲队列进入2小时缓冲队列 * @param message */ public void lowreceiver(object message){ objectmapper mapper = new objectmapper(); map msg = mapper.convertvalue(message, map.class); try { logger.info("这里做消息处理..."); }catch (exception e){ int times = msg.get("times") == null ? 0 : (int) msg.get("times"); if (times < lowretry) { rabbittemplate.convertandsend(springdflowbufferedtopic,springdflowbufferedroutekey,message); }else{ logger.info("消息无法被消费..."); } } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
SpringBoot集成RabbitMQ的方法(死信队列)
-
Springboot中集成Swagger2框架的方法
-
SpringBoot 集成 Memcached的方法示例
-
springboot集成mybatisplus的方法
-
springboot2.x集成swagger的方法示例
-
springboot集成mybatisplus的方法
-
使用PHP访问RabbitMQ消息队列的方法示例
-
springboot集成与使用Sentinel的方法
-
RabbitMQ与.net core(四) 消息的优先级 与 死信队列
-
Springboot 1.5.x 集成基于Centos7的RabbitMQ集群安装及配置