欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

消息队列和发布订阅

程序员文章站 2022-07-02 16:45:25
编程语言集成了发布订阅 很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用 实现订阅,使用 使用发布。这种系统集成的我们先叫它“集成组件” 与语言无关的消息队列 事实上,发布订阅真的与开发语言没有什么关系,所以出现了另一种产品,消息中间件,或 ......

编程语言集成了发布订阅

很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用eventlistener实现订阅,使用applicationeventpublisher使用发布。这种系统集成的我们先叫它“集成组件”

与语言无关的消息队列

事实上,发布订阅真的与开发语言没有什么关系,所以出现了另一种产品,消息中间件,或者叫消息队列,它是以发布订阅模式为理论基础的,同时很多消息队列产品又有自己的特色,这种独立的消息队列我们为rabbitmq为例子。

共同点

  1. 代码解耦,发布者与订阅者可以互不关心
  2. 异步处理,集成组件有的是同步的,需要加@async注解
  3. 消息安全

不同点

  1. rabbitmq实现的是多服务之间的发布与订阅
  2. 集成组件实现的是一个服务内部的发布与订阅
  3. rabbitmq是异步的,集成组件可以是异步,也可以是同步
  4. rabbitmq可以有广播,点对点等模式,而集成组件只有广播模式

基于以上的介绍,主要帮助大家理解和认识,在什么时候用什么类型的工具。

实例

  • 集成组件的发布订阅

订阅

@getter
@builder(tobuilder = true)
@noargsconstructor
@allargsconstructor
public class createbookevent {
  private string address;
  private string title;
}

@component
public class emaileventlistener {
  @eventlistener
  @async
  public void handleevent(createbookevent event) throws exception {
    system.out.println("email消息:建立图书:" + event.gettitle());
  }
}

发布

@autowired
  private applicationeventpublisher applicationeventpublisher;
  public void publish(){
      applicationeventpublisher.publishevent(createbookevent.builder().address("system").title("新建图书").build());
}
  • rabbitmq的发布订阅

订阅

@slf4j
@component
public class distributorsubscriber {
  public static final string work_queue = "fx.activity.total";
  public static final string exchange = "fx.exchange";
  @autowired
  distributoractivitytotalrepository distributoractivitytotalrepository;
  @autowired
  objectmapper objectmapper;

  @bean
  public topicexchange phonetotalexchange() {
    return new topicexchange(exchange);
  }

  @bean
  public queue phonetotalqueue() {
    return new queue(work_queue);
  }
  
  @bean
  public binding bindsignqueue() {
    return bindingbuilder.bind(phonetotalqueue()).to(phonetotalexchange()).with(work_queue);
  }
   @rabbitlistener(queues = work_queue)
  public void phonetotalqueuelistener(string data) {
    try {
      logger.debug("fx.activity.total:{}", data);
      distributoractivitytotal entity =
          objectmapper.readvalue(data, distributoractivitytotal.class);
      distributoractivitytotalrepository.incupdate(entity);
    } catch (exception ex) {
      logger.error("fx.activity.total.error", ex);
    }
  }

发布

  @autowired
  private rabbittemplate rabbittemplate;
  
   public void modifysalesperson(salespersondto salespersondto) {
    try {
      rabbittemplate.convertandsend(
          "exchange",
          "mqname",
          objectmapper.writevalueasstring(salespersondto)
      );
      logger.debug("enter {},message:{}", "modifysalesperson", salespersondto.tostring());

    } catch (exception ex) {
      logger.error("mq.modifysalesperson.error", ex);
    }
  }