大话设计模式—观察者模式
程序员文章站
2022-06-19 10:14:28
观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知...
文章目录
观察者模式
- 观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)
- 在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知
实现方式
- 观察者模式。根据应用场景的不同,观察者模式会对应不同的代码实现方式:有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式
同步阻塞
- 同步阻塞是最经典的实现方式,主要是为了代码解耦;
异步非阻塞
- 异步非阻塞除了能实现代码解耦之外,还能提高代码的执行效率;
进程间
- 进程间的观察者模式解耦更加彻底,一般是基于消息队列来实现,用来实现不同进程间的被观察者和观察者之间的交互
代码
- 被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。不过,在实际的项目开发中,这两种对象的称呼是比较灵活的,有各种不同的叫法,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。不管怎么称呼,只要应用场景符合刚刚给出的定义,都可以看作观察者模式
- 观察者模式的实现方法各式各样,函数、类的命名等会根据业务场景的不同有很大的差别,比如 register 函数还可以叫作 attach,remove 函数还可以叫作 detach 等等。不过,万变不离其宗,设计思路都是差不多的。
应用场景
- 观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式
- 发布-订阅模型,是一对多的关系,可以以同步的方式实现,也可以以异步的方式实现。
- 生产-消费模型,是多对多的关系,一般以异步的方式实现
P2P((进程内同步)
public interface RegObserver {
void handleRegSuccess(long userId);
}
public class RegPromotionObserver implements RegObserver {
private PromotionService promotionService; // 依赖注入
@Override
public void handleRegSuccess(long userId) {
promotionService.issueNewUserExperienceCash(userId);
}
}
public class RegNotificationObserver implements RegObserver {
private NotificationService notificationService;
@Override
public void handleRegSuccess(long userId) {
notificationService.sendInboxMessage(userId, "Welcome...");
}
}
public class UserController {
private UserService userService; // 依赖注入
private List<RegObserver> regObservers = new ArrayList<>();
// 一次性设置好,之后也不可能动态的修改
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telephone, String password) {
//省略输入参数的校验代码
//省略userService.register()异常的try-catch代码
long userId = userService.register(telephone, password);
for (RegObserver observer : regObservers) {
observer.handleRegSuccess(userId);
}
return userId;
}
}
- 它是一种同步阻塞的实现方式。观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。
- 如果注册接口是一个调用比较频繁的接口,对性能非常敏感,希望接口的响应时间尽可能短,那我们可以将同步阻塞的实现方式改为异步非阻塞的实现方式,以此来减少响应时间。具体来讲,当 userService.register() 函数执行完成之后,我们启动一个新的线程来执行观察者的 handleRegSuccess() 函数,这样 userController.register() 函数就不需要等到所有的 handleRegSuccess() 函数都执行完成之后才返回结果给客户端
- userController.register() 函数从执行 3 个 SQL 语句才返回,减少到只需要执行 1 个 SQL 语句就返回,响应时间粗略来讲减少为原来的 1/3。
P2P(进程内异步)
- 简单一点的做法是,在每个handleRegSuccess()函数中,创建一个新的线程执行代码
- 也可以用一个线程统一处理多个observer-handler或者在UserController 的 register() 函数中使用线程池来执行每个观察者的 handleRegSuccess() 函数
- 还有更加优雅的实现方式,那就是基于 EventBus 来实现
每个观察者内使用线程
// 第一种实现方式,其他类代码不变,就没有再重复罗列
public class RegPromotionObserver implements RegObserver {
private PromotionService promotionService; // 依赖注入
@Override
public void handleRegSuccess(long userId) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
promotionService.issueNewUserExperienceCash(userId);
}
});
thread.start();
}
}
- 频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出
使用线程池处理每个观察者
- 也就是在主函数内,使用线程池中的线程去处理每个observer
- 尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了 register() 函数中,增加了这部分业务代码的维护成本
// 第二种实现方式,其他类代码不变,就没有再重复罗列
public class UserController {
private UserService userService; // 依赖注入
private List<RegObserver> regObservers = new ArrayList<>();
private Executor executor;
public UserController(Executor executor) {
this.executor = executor;
}
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telephone, String password) {
//省略输入参数的校验代码
//省略userService.register()异常的try-catch代码
long userId = userService.register(telephone, password);
for (RegObserver observer : regObservers) {
executor.execute(new Runnable() {
@Override
public void run() {
observer.handleRegSuccess(userId);
}
});
}
return userId;
}
}
P2P(进程间同步)
- 管是同步阻塞实现方式还是异步非阻塞实现方式,都是进程内的实现方式。如果用户注册成功之后,我们需要发送用户信息给大数据征信系统,而大数据征信系统是一个独立的系统,跟它之间的交互是跨不同进程的,那如何实现一个跨进程的观察者模式呢
- 大数据征信系统提供了发送用户注册信息的 RPC 接口,我们仍然可以沿用之前的实现思路,在 handleRegSuccess() 函数中调用 RPC接口来发送数据。但是,我们还有更加优雅、更加常用的一种实现方式,那就是基于消息队列(Message Queue,比如 ActiveMQ)来实现。
RPC 调用
消息系统
- 引入一个新的系统(消息队列),增加了维护成本。不过,它的好处也非常明显。
- 在原来的实现方式中,观察者需要注册到被观察者中,被观察者需要依次遍历观察者来发送消息。
- 而基于消息队列的实现方式,被观察者和观察者解耦更加彻底,两部分的耦合更小。
- 被观察者完全不感知观察者,同理,观察者也完全不感知被观察者。被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑
EventBus
- EventBus 翻译为“事件总线”,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。
- 其中,Google Guava EventBus 就是一个比较著名的 EventBus 框架,它不仅仅支持异步非阻塞模式,同时也支持同步阻塞模式
细节
实现流程
- 利用 EventBus 框架实现的观察者模式,跟从零开始编写的观察者模式相比,从大的流程上来说,实现思路大致一样
- 都需要定义 Observer,并且通过 register() 函数注册 Observer
- 都需要通过调用某个函数(比如,EventBus 中的 post() 函数)来给 Observer 发送消息(在 EventBus 中消息被称作事件 event)
observer 的定义
- 基于 EventBus,我们不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息。
EventBus、AsyncEventBus
EventBus eventBus = new EventBus(); // 同步阻塞模式
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(8));// 异步阻塞模式
register
- EventBus 类提供了 register() 函数用来注册观察者。它可以接受任何类型(Object)的观察者。而在经典的观察者模式的实现中,register() 函数必须接受实现了同一 Observer 接口的类对象
public void register(Object object);
post() 函数
- EventBus 类提供了 post() 函数,用来给观察者发送消息
- 当我们调用 post() 函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息(post 函数定义中的 event)类型的父类
event 类型
- AObserver 能接收的消息类型是 XMsg,BObserver 能接收的消息类型是 YMsg,CObserver 能接收的消息类型是 ZMsg。其中,XMsg 是 YMsg 的父类
XMsg xMsg = new XMsg();
YMsg yMsg = new YMsg();
ZMsg zMsg = new ZMsg();
post(xMsg); => AObserver接收到消息
post(yMsg); => AObserver、BObserver接收到消息
post(zMsg); => CObserver接收到消息
@Subscribe 注解
- EventBus 通过 @Subscribe 注解来标明,某个函数能接收消息。
代码实现
public class UserControllerEventBus {
private UserService userService; // 依赖注入
private EventBus eventBus;
private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
private List<RegObserver> regObservers = new ArrayList<>();
public UserControllerEventBus(){
eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 异步非阻塞模式
}
public void setRegObservers(List<RegObserver> observers) {
eventBus.register(observers);
}
public Long register(String telephone, String password) {
//省略输入参数的校验代码
//省略userService.register()异常的try-catch代码
long userId = userService.register(telephone, password);
eventBus.post(userId);
return userId;
}
}
本文地址:https://blog.csdn.net/king14bhhb/article/details/110577505
上一篇: 微服务实战——SpringCloud与Feign集成
下一篇: golang实现常用集合原理介绍