欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

大话设计模式—观察者模式

程序员文章站 2022-06-19 10:14:28
观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知...

观察者模式

  • 观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)
  • 在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知

实现方式

  • 观察者模式。根据应用场景的不同,观察者模式会对应不同的代码实现方式:有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式

同步阻塞

  • 同步阻塞是最经典的实现方式,主要是为了代码解耦;

异步非阻塞

  • 异步非阻塞除了能实现代码解耦之外,还能提高代码的执行效率;

进程间

  • 进程间的观察者模式解耦更加彻底,一般是基于消息队列来实现,用来实现不同进程间的被观察者和观察者之间的交互

代码

  • 被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。不过,在实际的项目开发中,这两种对象的称呼是比较灵活的,有各种不同的叫法,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。不管怎么称呼,只要应用场景符合刚刚给出的定义,都可以看作观察者模式
  • 观察者模式的实现方法各式各样,函数、类的命名等会根据业务场景的不同有很大的差别,比如 register 函数还可以叫作 attach,remove 函数还可以叫作 detach 等等。不过,万变不离其宗,设计思路都是差不多的。

应用场景

  • 观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式
  • 发布-订阅模型,是一对多的关系,可以以同步的方式实现,也可以以异步的方式实现。
  • 生产-消费模型,是多对多的关系,一般以异步的方式实现

P2P((进程内同步)

public interface RegObserver {
  void handleRegSuccess(long userId);
}

public class RegPromotionObserver implements RegObserver {
  private PromotionService promotionService; // 依赖注入

  @Override
  public void handleRegSuccess(long userId) {
    promotionService.issueNewUserExperienceCash(userId);
  }
}

public class RegNotificationObserver implements RegObserver {
  private NotificationService notificationService;

  @Override
  public void handleRegSuccess(long userId) {
    notificationService.sendInboxMessage(userId, "Welcome...");
  }
}

public class UserController {
  private UserService userService; // 依赖注入
  private List<RegObserver> regObservers = new ArrayList<>();

  // 一次性设置好,之后也不可能动态的修改
  public void setRegObservers(List<RegObserver> observers) {
    regObservers.addAll(observers);
  }

  public Long register(String telephone, String password) {
    //省略输入参数的校验代码
    //省略userService.register()异常的try-catch代码
    long userId = userService.register(telephone, password);

    for (RegObserver observer : regObservers) {
      observer.handleRegSuccess(userId);
    }

    return userId;
  }
}
  • 它是一种同步阻塞的实现方式。观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。
  • 如果注册接口是一个调用比较频繁的接口,对性能非常敏感,希望接口的响应时间尽可能短,那我们可以将同步阻塞的实现方式改为异步非阻塞的实现方式,以此来减少响应时间。具体来讲,当 userService.register() 函数执行完成之后,我们启动一个新的线程来执行观察者的 handleRegSuccess() 函数,这样 userController.register() 函数就不需要等到所有的 handleRegSuccess() 函数都执行完成之后才返回结果给客户端
  • userController.register() 函数从执行 3 个 SQL 语句才返回,减少到只需要执行 1 个 SQL 语句就返回,响应时间粗略来讲减少为原来的 1/3。

P2P(进程内异步)

  • 简单一点的做法是,在每个handleRegSuccess()函数中,创建一个新的线程执行代码
  • 也可以用一个线程统一处理多个observer-handler或者在UserController 的 register() 函数中使用线程池来执行每个观察者的 handleRegSuccess() 函数
  • 还有更加优雅的实现方式,那就是基于 EventBus 来实现

每个观察者内使用线程

// 第一种实现方式,其他类代码不变,就没有再重复罗列
public class RegPromotionObserver implements RegObserver {
  private PromotionService promotionService; // 依赖注入

  @Override
  public void handleRegSuccess(long userId) {
    Thread thread = new Thread(new Runnable() {
      @Override
      public void run() {
        promotionService.issueNewUserExperienceCash(userId);
      }
    });
    thread.start();
  }
}
  • 频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出

使用线程池处理每个观察者

  • 也就是在主函数内,使用线程池中的线程去处理每个observer
  • 尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了 register() 函数中,增加了这部分业务代码的维护成本
// 第二种实现方式,其他类代码不变,就没有再重复罗列
public class UserController {
  private UserService userService; // 依赖注入
  private List<RegObserver> regObservers = new ArrayList<>();
  private Executor executor;

  public UserController(Executor executor) {
    this.executor = executor;
  }

  public void setRegObservers(List<RegObserver> observers) {
    regObservers.addAll(observers);
  }

  public Long register(String telephone, String password) {
    //省略输入参数的校验代码
    //省略userService.register()异常的try-catch代码
    long userId = userService.register(telephone, password);

    for (RegObserver observer : regObservers) {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          observer.handleRegSuccess(userId);
        }
      });
    }

    return userId;
  }
}

P2P(进程间同步)

  • 管是同步阻塞实现方式还是异步非阻塞实现方式,都是进程内的实现方式。如果用户注册成功之后,我们需要发送用户信息给大数据征信系统,而大数据征信系统是一个独立的系统,跟它之间的交互是跨不同进程的,那如何实现一个跨进程的观察者模式呢
  • 大数据征信系统提供了发送用户注册信息的 RPC 接口,我们仍然可以沿用之前的实现思路,在 handleRegSuccess() 函数中调用 RPC接口来发送数据。但是,我们还有更加优雅、更加常用的一种实现方式,那就是基于消息队列(Message Queue,比如 ActiveMQ)来实现。

RPC 调用

消息系统

  • 引入一个新的系统(消息队列),增加了维护成本。不过,它的好处也非常明显。
  • 在原来的实现方式中,观察者需要注册到被观察者中,被观察者需要依次遍历观察者来发送消息
  • 而基于消息队列的实现方式,被观察者和观察者解耦更加彻底,两部分的耦合更小。
  • 被观察者完全不感知观察者,同理,观察者也完全不感知被观察者。被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑

EventBus

  • EventBus 翻译为“事件总线”,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。
  • 其中,Google Guava EventBus 就是一个比较著名的 EventBus 框架,它不仅仅支持异步非阻塞模式,同时也支持同步阻塞模式

细节

实现流程
  • 利用 EventBus 框架实现的观察者模式,跟从零开始编写的观察者模式相比,从大的流程上来说,实现思路大致一样
  • 都需要定义 Observer,并且通过 register() 函数注册 Observer
  • 都需要通过调用某个函数(比如,EventBus 中的 post() 函数)来给 Observer 发送消息(在 EventBus 中消息被称作事件 event)
observer 的定义
  • 基于 EventBus,我们不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息。
EventBus、AsyncEventBus
EventBus eventBus = new EventBus(); // 同步阻塞模式
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(8));// 异步阻塞模式
register
  • EventBus 类提供了 register() 函数用来注册观察者。它可以接受任何类型(Object)的观察者。而在经典的观察者模式的实现中,register() 函数必须接受实现了同一 Observer 接口的类对象
public void register(Object object);
post() 函数
  • EventBus 类提供了 post() 函数,用来给观察者发送消息
  • 当我们调用 post() 函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息(post 函数定义中的 event)类型的父类
event 类型
  • AObserver 能接收的消息类型是 XMsg,BObserver 能接收的消息类型是 YMsg,CObserver 能接收的消息类型是 ZMsg。其中,XMsg 是 YMsg 的父类
XMsg xMsg = new XMsg();
YMsg yMsg = new YMsg();
ZMsg zMsg = new ZMsg();
post(xMsg); => AObserver接收到消息
post(yMsg); => AObserver、BObserver接收到消息
post(zMsg); => CObserver接收到消息
@Subscribe 注解
  • EventBus 通过 @Subscribe 注解来标明,某个函数能接收消息。

代码实现

public class UserControllerEventBus {
    private UserService userService; // 依赖注入
    private EventBus eventBus;
    private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
    private List<RegObserver> regObservers = new ArrayList<>();

    public UserControllerEventBus(){
        eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 异步非阻塞模式
    }
    public void setRegObservers(List<RegObserver> observers) {
        eventBus.register(observers);
    }

    public Long register(String telephone, String password) {
        //省略输入参数的校验代码
        //省略userService.register()异常的try-catch代码
        long userId = userService.register(telephone, password);

       eventBus.post(userId);

        return userId;
    }
}

本文地址:https://blog.csdn.net/king14bhhb/article/details/110577505

相关标签: 设计模式 java