关于RxJS Subject的学习笔记
observer pattern
观察者模式定义
观察者模式又叫发布订阅模式(publish/subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。
我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:
- 期刊出版方 - 负责期刊的出版和发行工作
- 订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知
在观察者模式中也有两个主要角色:subject (主题) 和 observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。
观察者模式结构
观察者模式实战
subject 类定义
class subject { constructor() { this.observercollection = []; } addobserver(observer) { // 添加观察者 this.observercollection.push(observer); } deleteobserver(observer) { // 移除观察者 let index = this.observercollection.indexof(observer); if(index >= 0) this.observercollection.splice(index, 1); } notifyobservers() { // 通知观察者 this.observercollection.foreach((observer)=>observer.notify()); } }
observer 类定义
class observer { constructor(name) { this.name = name; } notify() { console.log(`${this.name} has been notified.`); } }
使用示例
let subject = new subject(); // 创建主题对象 let observer1 = new observer('semlinker'); // 创建观察者a - 'semlinker' let observer2 = new observer('lolo'); // 创建观察者b - 'lolo' subject.addobserver(observer1); // 注册观察者a subject.addobserver(observer2); // 注册观察者b subject.notifyobservers(); // 通知观察者 subject.deleteobserver(observer1); // 移除观察者a subject.notifyobservers(); // 验证是否成功移除
以上代码成功运行后控制台的输出结果:
semlinker has been notified.
lolo has been notified.
lolo has been notified.
observable subscribe
在介绍rxjs - subject 之前,我们先来看个示例:
const interval$ = rx.observable.interval(1000).take(3); interval$.subscribe({ next: value => console.log('observer a get value: ' + value); }); settimeout(() => { interval$.subscribe({ next: value => console.log('observer b get value: ' + value); }); }, 1000);
以上代码运行后,控制台的输出结果:
observer a get value: 0
observer a get value: 1
observer b get value: 0
observer a get value: 2
observer b get value: 1
observer b get value: 2
通过以上示例,我们可以得出以下结论:
- observable 对象可以被重复订阅
- observable 对象每次被订阅后,都会重新执行
上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:
function interval() { setinterval(() => console.log('..'), 1000); } interval(); settimeout(() => { interval(); }, 1000);
observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 subject 来实现上述功能。
自定义 subject
subject 类定义
class subject { constructor() { this.observers = []; } addobserver(observer) { this.observers.push(observer); } next(value) { this.observers.foreach(o => o.next(value)); } error(error){ this.observers.foreach(o => o.error(error)); } complete() { this.observers.foreach(o => o.complete()); } }
使用示例
const interval$ = rx.observable.interval(1000).take(3); let subject = new subject(); let observera = { next: value => console.log('observer a get value: ' + value), error: error => console.log('observer a error: ' + error), complete: () => console.log('observer a complete!') }; var observerb = { next: value => console.log('observer b get value: ' + value), error: error => console.log('observer b error: ' + error), complete: () => console.log('observer b complete!') }; subject.addobserver(observera); // 添加观察者a interval$.subscribe(subject); // 订阅interval$对象 settimeout(() => { subject.addobserver(observerb); // 添加观察者b }, 1000);
以上代码运行后,控制台的输出结果:
observer a get value: 0
observer a get value: 1
observer b get value: 1
observer a get value: 2
observer b get value: 2
observer a complete!
observer b complete!
通过自定义 subject,我们实现了前面提到的功能。接下来我们进入正题 - rxjs subject。
rxjs subject
首先我们通过 rxjs subject 来重写一下上面的示例:
const interval$ = rx.observable.interval(1000).take(3); let subject = new rx.subject(); let observera = { next: value => console.log('observer a get value: ' + value), error: error => console.log('observer a error: ' + error), complete: () => console.log('observer a complete!') }; var observerb = { next: value => console.log('observer b get value: ' + value), error: error => console.log('observer b error: ' + error), complete: () => console.log('observer b complete!') }; subject.subscribe(observera); // 添加观察者a interval$.subscribe(subject); // 订阅interval$对象 settimeout(() => { subject.subscribe(observerb); // 添加观察者b }, 1000);
rxjs subject 源码片段
/** * suject继承于observable */ export class subject extends observable { constructor() { super(); this.observers = []; // 观察者列表 this.closed = false; this.isstopped = false; this.haserror = false; this.thrownerror = null; } next(value) { if (this.closed) { throw new objectunsubscribederror(); } if (!this.isstopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者 copy[i].next(value); } } } error(err) { if (this.closed) { throw new objectunsubscribederror(); } this.haserror = true; this.thrownerror = err; this.isstopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者error方法 copy[i].error(err); } this.observers.length = 0; } complete() { if (this.closed) { throw new objectunsubscribederror(); } this.isstopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者complete方法 copy[i].complete(); } this.observers.length = 0; // 清空内部观察者列表 } }
通过 rxjs subject 示例和源码片段,对于 subject 我们可以得出以下结论:
- subject 既是 observable 对象,又是 observer 对象
- 当有新消息时,subject 会对内部的 observers 列表进行组播 (multicast)
angular 2 rxjs subject 应用
在 angular 2 中,我们可以利用 rxjs subject 来实现组件通信,具体示例如下:
message.service.ts
import { injectable } from '@angular/core'; import {observable} from 'rxjs/observable'; import { subject } from 'rxjs/subject'; @injectable() export class messageservice { private subject = new subject<any>(); sendmessage(message: string) { this.subject.next({ text: message }); } clearmessage() { this.subject.next(); } getmessage(): observable<any> { return this.subject.asobservable(); } }
home.component.ts
import { component } from '@angular/core'; import { messageservice } from '../_services/index'; @component({ moduleid: module.id, templateurl: 'home.component.html' }) export class homecomponent { constructor(private messageservice: messageservice) {} sendmessage(): void { // 发送消息 this.messageservice.sendmessage('message from home component to app component!'); } clearmessage(): void { // 清除消息 this.messageservice.clearmessage(); } }
app.component.ts
import { component, ondestroy } from '@angular/core'; import { subscription } from 'rxjs/subscription'; import { messageservice } from './_services/index'; @component({ moduleid: module.id, selector: 'app', templateurl: 'app.component.html' }) export class appcomponent implements ondestroy { message: any; subscription: subscription; constructor(private messageservice: messageservice) { this.subscription = this.messageservice.getmessage() .subscribe(message => { this.message = message; }); } ngondestroy() { this.subscription.unsubscribe(); } }
以上示例实现的功能是组件之间消息通信,即 homecomponent 子组件,向 appcomponent 父组件发送消息。代码运行后,浏览器的显示结果如下:
subject 存在的问题
因为 subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next
方法,具体如下:
next(value) { if (this.closed) { throw new objectunsubscribederror(); } if (!this.isstopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者 copy[i].next(value); } } }
这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下:
const source = rx.observable.interval(1000); const subject = new rx.subject(); const example = subject.map(x => { if (x === 1) { throw new error('oops'); } return x; }); subject.subscribe(x => console.log('a', x)); example.subscribe(x => console.log('b', x)); subject.subscribe(x => console.log('c', x)); source.subscribe(subject);
以上代码运行后,控制台的输出结果:
a 0
b 0
c 0
a 1
rx.min.js:74 uncaught error: oops
在代码运行前,大家会认为观察者b 会在接收到 1
值时抛出异常,观察者 a 和 c 仍会正常运行。但实际上,在当前的 rxjs 版本中若观察者 b 报错,观察者 a 和 c 也会停止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为所有的观察者添加异常处理,更新后的代码如下:
const source = rx.observable.interval(1000); const subject = new rx.subject(); const example = subject.map(x => { if (x === 1) { throw new error('oops'); } return x; }); subject.subscribe( x => console.log('a', x), error => console.log('a error:' + error) ); example.subscribe( x => console.log('b', x), error => console.log('b error:' + error) ); subject.subscribe( x => console.log('c', x), error => console.log('c error:' + error) ); source.subscribe(subject);
jsbin - rxjs subject problem solved demo
rxjs subject & observable
subject 其实是观察者模式的实现,所以当观察者订阅 subject 对象时,subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next()
方法,把值一一送出。
subject 之所以具有 observable 中的所有方法,是因为 subject 类继承了 observable 类,在 subject 类中有五个重要的方法:
- next - 每当 subject 对象接收到新值的时候,next 方法会被调用
- error - 运行中出现异常,error 方法会被调用
- complete - subject 订阅的 observable 对象结束后,complete 方法会被调用
- subscribe - 添加观察者
- unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)
behaviorsubject
behaviorsubject 定义
export class behaviorsubject extends subject { constructor(_value) { // 设置初始值 super(); this._value = _value; } get value() { // 获取当前值 return this.getvalue(); } _subscribe(subscriber) { const subscription = super._subscribe(subscriber); if (subscription && !subscription.closed) { subscriber.next(this._value); // 为新的订阅者发送当前最新的值 } return subscription; } getvalue() { if (this.haserror) { throw this.thrownerror; } else if (this.closed) { throw new objectunsubscribederror(); } else { return this._value; } } next(value) { // 调用父类subject的next方法,同时更新当前值 super.next(this._value = value); } }
behaviorsubject 应用
有些时候我们会希望 subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 subject 能够立即发出当前最新的值,而不是没有任何响应。具体我们先看一下示例:
var subject = new rx.subject(); var observera = { next: value => console.log('observer a get value: ' + value), error: error => console.log('observer a error: ' + error), complete: () => console.log('observer a complete!') }; var observerb = { next: value => console.log('observer b get value: ' + value), error: error => console.log('observer b error: ' + error), complete: () => console.log('observer b complete!') }; subject.subscribe(observera); subject.next(1); subject.next(2); subject.next(3); settimeout(() => { subject.subscribe(observerb); // 1秒后订阅 }, 1000);
以上代码运行后,控制台的输出结果:
observer a get value: 1
observer a get value: 2
observer a get value: 3
通过输出结果,我们发现在 observerb 订阅 subject 对象后,它再也没有收到任何值了。因为 subject 对象没有再调用 next()
方法。但很多时候我们会希望 subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 behaviorsubject。
behaviorsubject 跟 subject 最大的不同就是 behaviorsubject 是用来保存当前最新的值,而不是单纯的发送事件。behaviorsubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。接下来我们来使用 behaviorsubject 重新一下上面的示例:
var subject = new rx.behaviorsubject(0); // 设定初始值 var observera = { next: value => console.log('observer a get value: ' + value), error: error => console.log('observer a error: ' + error), complete: () => console.log('observer a complete!') }; var observerb = { next: value => console.log('observer b get value: ' + value), error: error => console.log('observer b error: ' + error), complete: () => console.log('observer b complete!') }; subject.subscribe(observera); subject.next(1); subject.next(2); subject.next(3); settimeout(() => { subject.subscribe(observerb); // 1秒后订阅 }, 1000);
以上代码运行后,控制台的输出结果:
observer a get value: 0
observer a get value: 1
observer a get value: 2
observer a get value: 3
observer b get value: 3
replaysubject
replaysubject 定义
export class replaysubject extends subject { constructor(buffersize = number.positive_infinity, windowtime = number.positive_infinity, scheduler) { super(); this.scheduler = scheduler; this._events = []; // replayevent对象列表 this._buffersize = buffersize < 1 ? 1 : buffersize; // 设置缓冲区大小 this._windowtime = windowtime < 1 ? 1 : windowtime; } next(value) { const now = this._getnow(); this._events.push(new replayevent(now, value)); this._trimbufferthengetevents(); super.next(value); } _subscribe(subscriber) { const _events = this._trimbufferthengetevents(); // 过滤replayevent对象列表 let subscription; if (this.closed) { throw new objectunsubscribederror(); } ... else { this.observers.push(subscriber); subscription = new subjectsubscription(this, subscriber); } ... const len = _events.length; // 重新发送设定的最后buffersize个值 for (let i = 0; i < len && !subscriber.closed; i++) { subscriber.next(_events[i].value); } ... return subscription; } } class replayevent { constructor(time, value) { this.time = time; this.value = value; } }
replaysubject 应用
有些时候我们希望在 subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 replaysubject ,具体示例如下:
var subject = new rx.replaysubject(2); // 重新发送最后2个值 var observera = { next: value => console.log('observer a get value: ' + value), error: error => console.log('observer a error: ' + error), complete: () => console.log('observer a complete!') }; var observerb = { next: value => console.log('observer b get value: ' + value), error: error => console.log('observer b error: ' + error), complete: () => console.log('observer b complete!') }; subject.subscribe(observera); subject.next(1); subject.next(2); subject.next(3); settimeout(() => { subject.subscribe(observerb); // 1秒后订阅 }, 1000);
以上代码运行后,控制台的输出结果:
observer a get value: 1
observer a get value: 2
observer a get value: 3
observer b get value: 2
observer b get value: 3
可能会有人认为 replaysubject(1)
是不是等同于 behaviorsubject,其实它们是不一样的。在创建behaviorsubject 对象时,是设置初始值,它用于表示 subject 对象当前的状态,而 replaysubject 只是事件的重放。
asyncsubject
asyncsubject 定义
export class asyncsubject extends subject { constructor() { super(...arguments); this.value = null; this.hasnext = false; this.hascompleted = false; // 标识是否已完成 } _subscribe(subscriber) { if (this.haserror) { subscriber.error(this.thrownerror); return subscription.empty; } else if (this.hascompleted && this.hasnext) { // 等到完成后,才发出最后的值 subscriber.next(this.value); subscriber.complete(); return subscription.empty; } return super._subscribe(subscriber); } next(value) { if (!this.hascompleted) { // 若未完成,保存当前的值 this.value = value; this.hasnext = true; } } }
asyncsubject 应用
asyncsubject 类似于 last
操作符,它会在 subject 结束后发出最后一个值,具体示例如下:
var subject = new rx.asyncsubject(); var observera = { next: value => console.log('observer a get value: ' + value), error: error => console.log('observer a error: ' + error), complete: () => console.log('observer a complete!') }; var observerb = { next: value => console.log('observer b get value: ' + value), error: error => console.log('observer b error: ' + error), complete: () => console.log('observer b complete!') }; subject.subscribe(observera); subject.next(1); subject.next(2); subject.next(3); subject.complete(); settimeout(() => { subject.subscribe(observerb); // 1秒后订阅 }, 1000);
以上代码运行后,控制台的输出结果:
observer a get value: 3
observer a complete!
observer b get value: 3
observer b complete!
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: PHP中最低级别的错误类型总结
下一篇: php实现ffmpeg处理视频的实践