欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot集成RabbitMQ的方法(死信队列)

程序员文章站 2023-12-09 17:42:03
介绍 死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因: 1.有消息被拒绝(basic.reject/ basic.nack)并且requeu...

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息ttl过期

场景

SpringBoot集成RabbitMQ的方法(死信队列)

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

安装mq

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;

import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.core.topicexchange;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer;
import org.springframework.amqp.rabbit.listener.adapter.messagelisteneradapter;
import org.springframework.beans.factory.annotation.autowire;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

import java.util.hashmap;
import java.util.map;

@configuration
public class mqconfig {

  //time
  @value("${spring.df.buffered.min:120}")
  private int springdfbufferedtime;

  @value("${spring.df.high-buffered.min:5}")
  private int springdfhighbufferedtime;

  @value("${spring.df.low-buffered.min:120}")
  private int springdflowbufferedtime;

  // 30min buffered queue
  @value("${spring.df.queue:spring-df-buffered-queue}")
  private string springdfbufferedqueue;

  @value("${spring.df.topic:spring-df-buffered-topic}")
  private string springdfbufferedtopic;

  @value("${spring.df.route:spring-df-buffered-route}")
  private string springdfbufferedroutekey;

  // 5m buffered queue
  @value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
  private string springdfhighbufferedqueue;

  @value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private string springdfhighbufferedtopic;

  @value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private string springdfhighbufferedroutekey;

  // high queue
  @value("${spring.df.high.queue:spring-df-high-queue}")
  private string springdfhighqueue;

  @value("${spring.df.high.topic:spring-df-high-topic}")
  private string springdfhightopic;

  @value("${spring.df.high.route:spring-df-high-route}")
  private string springdfhighroutekey;

  // 2h low buffered queue
  @value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
  private string springdflowbufferedqueue;

  @value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private string springdflowbufferedtopic;

  @value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private string springdflowbufferedroutekey;

  // low queue
  @value("${spring.df.low.queue:spring-df-low-queue}")
  private string springdflowqueue;

  @value("${spring.df.low.topic:spring-df-low-topic}")
  private string springdflowtopic;

  @value("${spring.df.low.route:spring-df-low-route}")
  private string springdflowroutekey;


  @bean(autowire = autowire.by_name, value = "springdfbufferedqueue")
  queue springdfbufferedqueue() {
    int bufferedtime = 1000 * 60 * springdfbufferedtime;
    return createbufferedqueue(springdfbufferedqueue, springdfhighbufferedtopic, springdfhighbufferedroutekey, bufferedtime);
  }

  @bean(autowire = autowire.by_name, value = "springdfhighbufferedqueue")
  queue springdfhighbufferedqueue() {
    int highbufferedtime = 1000 * 60 * springdfhighbufferedtime;
    return createbufferedqueue(springdfhighbufferedqueue, springdfhightopic, springdfhighroutekey, highbufferedtime);
  }

  @bean(autowire = autowire.by_name, value = "springdfhighqueue")
  queue springdfhighqueue() {
    return new queue(springdfhighqueue, true);
  }

  @bean(autowire = autowire.by_name, value = "springdflowbufferedqueue")
  queue springdflowbufferedqueue() {
    int lowbufferedtime = 1000 * 60 * springdflowbufferedtime;
    return createbufferedqueue(springdflowbufferedqueue, springdflowtopic, springdflowroutekey, lowbufferedtime);
  }

  @bean(autowire = autowire.by_name, value = "springdflowqueue")
  queue springdflowqueue() {
    return new queue(springdflowqueue, true);
  }


  @bean(autowire = autowire.by_name, value = "springdfbufferedtopic")
  topicexchange springdfbufferedtopic() {
    return new topicexchange(springdfbufferedtopic);
  }

  @bean
  binding springbuffereddf(queue springdfbufferedqueue, topicexchange springdfbufferedtopic) {
    return bindingbuilder.bind(springdfbufferedqueue).to(springdfbufferedtopic).with(springdfbufferedroutekey);
  }


  @bean(autowire = autowire.by_name, value = "springdfhighbufferedtopic")
  topicexchange springdfhighbufferedtopic() {
    return new topicexchange(springdfhighbufferedtopic);
  }

  @bean
  binding springhighbuffereddf(queue springdfhighbufferedqueue, topicexchange springdfhighbufferedtopic) {
    return bindingbuilder.bind(springdfhighbufferedqueue).to(springdfhighbufferedtopic).with(springdfhighbufferedroutekey);
  }

  @bean(autowire = autowire.by_name, value = "springdfhightopic")
  topicexchange springdfhightopic() {
    return new topicexchange(springdfhightopic);
  }

  @bean
  binding springhighdf(queue springdfhighqueue, topicexchange springdfhightopic) {
    return bindingbuilder.bind(springdfhighqueue).to(springdfhightopic).with(springdfhighroutekey);
  }

  @bean(autowire = autowire.by_name, value = "springdflowbufferedtopic")
  topicexchange springdflowbufferedtopic() {
    return new topicexchange(springdflowbufferedtopic);
  }

  @bean
  binding springlowbuffereddf(queue springdflowbufferedqueue, topicexchange springdflowbufferedtopic) {
    return bindingbuilder.bind(springdflowbufferedqueue).to(springdflowbufferedtopic).with(springdflowbufferedroutekey);
  }

  @bean(autowire = autowire.by_name, value = "springdflowtopic")
  topicexchange springdflowtopic() {
    return new topicexchange(springdflowtopic);
  }

  @bean
  binding springlowdf(queue springdflowqueue, topicexchange springdflowtopic) {
    return bindingbuilder.bind(springdflowqueue).to(springdflowtopic).with(springdflowroutekey);
  }


  @bean
  simplemessagelistenercontainer container(connectionfactory connectionfactory,
                       messagelisteneradapter listeneradapter) {
    simplemessagelistenercontainer container = new simplemessagelistenercontainer();
    container.setconnectionfactory(connectionfactory);
    container.setqueuenames(springdfhighqueue, springdflowqueue);
    container.setmessagelistener(listeneradapter);
    return container;
  }

  @bean
  messagelisteneradapter listeneradapter(integrationreceiver receiver) {


    messagelisteneradapter adapter = new messagelisteneradapter(receiver);
    adapter.setdefaultlistenermethod("receive");
    map<string, string> queueortagtomethodname = new hashmap<>();
    queueortagtomethodname.put(springdfhighqueue, "springdfhighreceive");
    queueortagtomethodname.put(springdflowqueue, "springdflowreceive");
    adapter.setqueueortagtomethodname(queueortagtomethodname);
    return adapter;

  }


  private queue createbufferedqueue(string queuename, string topic, string routekey, int bufferedtime) {
    map<string, object> args = new hashmap<>();
    args.put("x-dead-letter-exchange", topic);
    args.put("x-dead-letter-routing-key", routekey);
    args.put("x-message-ttl", bufferedtime);
    // 是否持久化
    boolean durable = true;
    // 仅创建者可以使用的私有队列,断开后自动删除
    boolean exclusive = false;
    // 当所有消费客户端连接断开后,是否自动删除队列
    boolean autodelete = false;

    return new queue(queuename, durable, exclusive, autodelete, args);
  }
}

消费者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.objectmapper;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;

import java.util.map;

public class mqreceiver {

  private static logger logger = loggerfactory.getlogger(mqreceiver.class);

  @value("${high-retry:5}")
  private int highretry;

  @value("${low-retry:5}")
  private int lowretry;

  @value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private string springdfhighbufferedtopic;

  @value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private string springdfhighbufferedroutekey;

  @value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private string springdflowbufferedtopic;

  @value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private string springdflowbufferedroutekey;

  private final rabbittemplate rabbittemplate;
  @autowired
  public mqreceiver(rabbittemplate rabbittemplate) {
    this.rabbittemplate = rabbittemplate;
  }

  public void receive(object message) {
    if (logger.isinfoenabled()) {
      logger.info("default receiver: " + message);
    }
  }

  /**
   * 消息从初始队列进入5分钟的高速缓冲队列
   * @param message
   */
  public void highreceiver(object message){
    objectmapper mapper = new objectmapper();
    map msg = mapper.convertvalue(message, map.class);

    try{
      logger.info("这里做消息处理...");
    }catch (exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < highretry) {
        msg.put("times", times + 1);
        rabbittemplate.convertandsend(springdfhighbufferedtopic,springdfhighbufferedroutekey,message);
      } else {
        msg.put("times", 0);
        rabbittemplate.convertandsend(springdflowbufferedtopic,springdflowbufferedroutekey,message);
      }
    }
  }

  /**
   * 消息从5分钟缓冲队列进入2小时缓冲队列
   * @param message
   */
  public void lowreceiver(object message){
    objectmapper mapper = new objectmapper();
    map msg = mapper.convertvalue(message, map.class);
    
    try {
      logger.info("这里做消息处理...");
    }catch (exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < lowretry) {
        rabbittemplate.convertandsend(springdflowbufferedtopic,springdflowbufferedroutekey,message);
      }else{
        logger.info("消息无法被消费...");
      }
    } 
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。