欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitmq实现延时队列(死信队列)

程序员文章站 2022-10-04 17:57:30
基于队列和基于消息的TTL TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间 Per Queue Message TTL: 通过设置队列的x message ttl参数来设置指定队列上消息 ......

基于队列和基于消息的ttl

ttl是time to live 的简称,顾名思义指的是消息的存活时间。rabbitmq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-per-queue message ttl: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。

死信交换机dlx

队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false; (2)消息的过期时间到期了; (3)队列长度限制超过了。 当队列中的消息成为死信以后,如果队列设置了dlx那么消息会被发送到dlx。通过x-dead-letter-exchange设置dlx,通过这个x-dead-letter-routing-key设置消息发送到dlx所用的routing-key,如果不设置默认使用消息本身的routing-key.

 @bean
  public queue lindqueue() {
    return queuebuilder.durable(lind_queue)
        .withargument("x-dead-letter-exchange", lind_dl_exchange)//设置死信交换机
        .withargument("x-message-ttl", makecallexpire)
        .withargument("x-dead-letter-routing-key", lind_dead_queue)//设置死信routingkey
        .build();
  }

实现的过程

graph td publisher-->正常queue 正常queue-->ttl ttl-->dead.queue dead.queue-->subscriber

完整的代码

@component
public class amqpconfig {
  /**
   * 主要测试一个死信队列,功能主要实现延时消费,原理是先把消息发到正常队列,
   * 正常队列有超时时间,当达到时间后自动发到死信队列,然后由消费者去消费死信队列里的消息.
   */
  public static final string lind_exchange = "lind.exchange";
  public static final string lind_dl_exchange = "lind.dl.exchange";
  public static final string lind_queue = "lind.queue";
  public static final string lind_dead_queue = "lind.queue.dead";

  public static final string lind_fanout_exchange = "lindfanoutexchange";
  /**
   * 单位为微秒.
   */
  @value("${tq.makecall.expire:60000}")
  private long makecallexpire;

  /**
   * 创建普通交换机.
   */
  @bean
  public topicexchange lindexchange() {
    return (topicexchange) exchangebuilder.topicexchange(lind_exchange).durable(true)
        .build();
  }

  /**
   * 创建死信交换机.
   */
  @bean
  public topicexchange lindexchangedl() {
    return (topicexchange) exchangebuilder.topicexchange(lind_dl_exchange).durable(true)
        .build();
  }

  /**
   * 创建普通队列.
   */
  @bean
  public queue lindqueue() {
    return queuebuilder.durable(lind_queue)
        .withargument("x-dead-letter-exchange", lind_dl_exchange)//设置死信交换机
        .withargument("x-message-ttl", makecallexpire)
        .withargument("x-dead-letter-routing-key", lind_dead_queue)//设置死信routingkey
        .build();
  }

  /**
   * 创建死信队列.
   */
  @bean
  public queue linddelayqueue() {
    return queuebuilder.durable(lind_dead_queue).build();
  }

  /**
   * 绑定死信队列.
   */
  @bean
  public binding binddeadbuilders() {
    return bindingbuilder.bind(linddelayqueue())
        .to(lindexchangedl())
        .with(lind_dead_queue);
  }

  /**
   * 绑定普通队列.
   *
   * @return
   */
  @bean
  public binding bindbuilders() {
    return bindingbuilder.bind(lindqueue())
        .to(lindexchange())
        .with(lind_queue);
  }

  /**
   * 广播交换机.
   *
   * @return
   */
  @bean
  public fanoutexchange fanoutexchange() {
    return new fanoutexchange(lind_fanout_exchange);
  }
}


//-----------------

@component
public class publisher {
  @autowired
  private rabbittemplate rabbittemplate;


  public void publish(string message) {
    try {
      rabbittemplate
          .convertandsend(amqpconfig.lind_exchange, amqpconfig.lind_delay_queue,
              message);
    } catch (exception e) {
      e.printstacktrace();
    }
  }
}

//-----------------

@component
@slf4j
public class subscriber {
  @rabbitlistener(queues = amqpconfig.lind_queue)
  public void customersign(string data) {
    try {

      log.info("从队列拿到数据 :{}", data);

    } catch (exception ex) {
          e.printstacktrace();
    }
  }
}