欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitmq实现向各服务广播消息

程序员文章站 2022-06-01 19:07:40
广播fanout 主要是将一个消息,分发到绑定了它的队列上,而这些队列如 自己去建立和绑定! 对生产者是解耦的 生产者不需要关心消费者有多少,消费者如果需要这种消息,只需要把队列绑定到exchange上即可 流程 1. 打开rabbitmq的ui 2. 建立两个队列fanout1,fanout2 3 ......

广播fanout

主要是将一个消息,分发到绑定了它的队列上,而这些队列如消费者自己去建立和绑定!

对生产者是解耦的

生产者不需要关心消费者有多少,消费者如果需要这种消息,只需要把队列绑定到exchange上即可

流程

  1. 打开rabbitmq的ui
  2. 建立两个队列fanout1,fanout2
  3. 打开exchange里的amqp.fanout类型
  4. 绑定上面的两个队列
  5. 向exchange里发消息
  6. 回到队列页面,这时可以看到每个队列都收到了消息

例子

@component
public class amqpconfig {
  public static final string lind_fanout_exchange = "lindfanoutexchange";

 /**
   * 广播交换机.
   *
   * @return
   */
  @bean
  public fanoutexchange fanoutexchange() {
    return new fanoutexchange(lind_fanout_exchange);
  }
}

生产者

  /**
   * 发布广播消息.
   *
   * @param message .
   */
  public void fanoutpublish(string message) {
    try {
      rabbittemplate.convertandsend(amqpconfig.lind_fanout_exchange, null, "广播消息");
    } catch (exception e) {
      e.printstacktrace();
    }
  }

消费者

@component
public class fanoutsubscriber {

  @autowired
  amqpconfig amqpconfig;

  @bean
  public queue product1queue() {
    return new queue("product1.queue");
  }

  @bean
  public queue product2queue() {
    return new queue("product2.queue");
  }

  @bean
  public binding product1queuebinding() {
    return bindingbuilder.bind(product1queue()).to(amqpconfig.fanoutexchange());
  }

  @bean
  public binding product2queuebinding() {
    return bindingbuilder.bind(product2queue()).to(amqpconfig.fanoutexchange());
  }

  @rabbitlistener(queues = "product1.queue")
  public void product1(string data) {
    system.out.println(data);
  }

  @rabbitlistener(queues = "product2.queue")
  public void product2(string data) {
    system.out.println(data);
  }
}