RxJS——主题(Subject)
主题(subjects)
什么是主题?rxjs 主题就是一个特性类型的 observable 对象,它允许值多路广播给观察者(observers)。当一个简单的 observable 是单播的(每个订阅的观察者它们自己都依赖 observable 的执行)时候,主题(subjects)就是多播的。
subjects 就像是一个 observable,但是它能多播到多个观察者(observers)。subjects 就像是事件发射器:它们维护众多侦听者的注册。
每一个 subject 都是一个 observable。给定一个 subject,你就能订阅它,提供一个 observer,开始正常接收值。从 observer 它的角度讲,它不知道 observable 的执行是否来自普通的单播 observable 或是 subject 。
在 subject 内部,subscribe
不会调用新的执行来发送值。它只是简单的在观察者列表中注册一个观察者,跟在其他库和语言中的 addlistener
的做法是很相似的。
每个 subject 也是一个 observer。它通过 next(v)
,error(e)
,complete()
是一个对象。为了给 subject 提供一个新值,只需要调用 next(thevalue)
,那么它将会多播给注册侦听到 subject 的观察者。
下面是一个例子,我们有附加了两个观察者对象,并且我们发送一些值给 subject:
import { subject } from 'rxjs'; const subject = new subject<number>(); subject.subscribe({ next: (v) => console.log(`observera: ${v}`) }); subject.subscribe({ next: (v) => console.log(`observerb: ${v}`) }); subject.next(1); subject.next(2); //logs: // observera: 1 // observerb: 1 // observera: 2 // observerb: 2
因为 subject 是一个观察者,也就是说你也许会提供一个 subject 作为参数给 subscribe
到任何 observable,就像下面这个例子:
import { subject, from } from 'rxjs'; const subject = new subject<number>(); subject.subscribe({ next: (v) => console.log(`observera: ${v}`) }) subject.subscribe({ next: (v) => console.log(`observerb: ${v}`) }); const observable = from([1, 2, 3]); observable.subscribe(subject); // 你可以订阅已经提供的 observable 对象 // logs: // observera: 1 // observerb: 1 // observera: 2 // observerb: 2 // observera: 3 // observerb: 3
通过上面的方法,本质上我们就仅仅只是通过 subject 把单播的可观察的执行转成了多播的。这个例子演示了主题如何让多个观察者共享 observable 的执行的唯一方法。
这里还有一些特殊的 subject 类型:behaviorsubject
,replaysubject
,asyncsubject
。
多播 observables
一个 “多播observable” 通过一个 subject 传递通知,它可能会有很多订阅者,而一个普通的 “单播 observable” 只会发送通知到单个观察者。
一个多播 observable 在后台(hood) 用一个 subject 让多个观察者都能看到相同的 observable 执行。
在后台,multicast
又是如何工作的呢:观察者订阅一个基础的 subject,并且这个 subject 订阅了源 observable。下面的例子跟上面的例子很相似,它使用了 observable.subscribe(subject)
:
import { from, subject } from 'rxjs'; import { multicast } from 'rxjs/operators'; const source = from([1, 2, 3]); const subject = new subject(); const multicasted = source.pipe(multicast(subject)); //这里在后台就是 `subject.subscribe({...})` multicasted.subscribe({ next: (v) => console.log(`observablea: ${v}`); }); multicasted.subscribe({ next: (v) => console.log(`observableb: ${v}`); }); //这个带后台就是 `source.subscribe(subject)` multicasted.connect();
multicast
返回一个看起来想平常使用的 observable,但是工作却像 subject,当它订阅的时候。multicast
返回的实际是 connectableobservable
,它只是一个使用 connect()
方法的 observable。
当那些共享的 observable 的执行开始执行的时候 connect()
方法明确执行是非常重要的。因为 connect()
会在后台执行 source.subscribe(subject)
,connect()
返回一个 subscription,它使你能够取消订阅,从而取消那些共享的 observable 的执行。
引用计数(reference counting)
手动调用 connect()
处理订阅(subscription)是很麻烦的。通常,我们想要当第一个观察者(observer)到达的时候自动连接,以及当最后一个观察者取消订阅的时候自动取消公共的执行。
考虑下面例子,它的订阅按此列表概述的发生:
- 第一个观察者订阅多播 observable
- 多播 observable 连接
-
next
发送值 0 给第一个观察者 - 第二个观察者订阅多播 observable
-
next
发送值 1 给第一个观察者 -
next
发送值 1 给第一个观察者 - 第一个观察者从多播 observable 取消订阅
-
next
发送值 2 给第二个观察者 - 第二个观察者从多播 observable 取消订阅
- 连接的多播 observable 取消订阅
为了达成上述过程,显示调用 connect()
,我们编写如下代码:
import { interval, subject } from 'rxjs'; import { multicast } from 'rxjs/operators'; const source = interval(500); const subject = new subject(); const multicasted = source.pipe(multicast(subject)); let subscription1, subscription2, subscriptionconnect; subscription1 = multicasted.subscribe({ next: (v) => console.log(`observablea: ${v}`) }); //这里应该调用 `connect()`,因为第一个订阅者订阅了 `multicasted`,它正在对消费的值感兴趣 subscriptionconnect = multicasted.connect(); settimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log(`observableb: ${v}`) }); },600); settimeout(() => { subscription1.unsubsribe(); },1200); //这里我们应该取消订阅公共的 observable 的执行 settimeout(() => { subscription2.unsubscribe(); subscriptionconnect.unsubscribe(); //这个是针对公共的 observable 的执行 }, 2000);
如果我们希望避免显示调用 connect()
,我们可以使用 connectableobservable.refcount()
(引用计数)方法,它返回一个 observable,而且它还是可以追踪它有的所有订阅者。当订阅者们的这个数字从 0 增到 1,它就会自动调用 connect()
,开始公共的执行。只有当计数从 1 到 0 时才会整个取消订阅,停止所有的执行。
refcount 使多播 observable 当第一个订阅者到达的时候自动开始执行,并且最后一个离开的时候停止执行。
下面是例子:
import { interval, subject } from 'rxjs'; import { multicast, refcount } from 'rxjs/operators'; const source = interval(500); const subject = new subject(); const refcounted = source.pipe(multicast(subject), refcount()); let subscription1, subscription2; //这里自动调用 `connect()`,因为第一个 ‘refcounted’ 的订阅者 console.log('observera subscribed'); subscription1 = refcounted.subscribe({ next: (v) => console.log(`observablea: ${v}`) }); suttimeout(() => { console.log('observerb subscribed'); subscription2 = refcounted.subscribe({ next: (v) => console.log(`observableb ${v}`) }); }, 600); settimeout(() => { console.log('observera unsubscribed'); subscription1.unsubscribe(); }, 1200); //这里公共的 observable 的执行将会停止,因为 'refcounted' 在这之后没有订阅者了 settimeout(() => { console.log('observerb unsubscribed'); subscription2.unsubscribe(); }, 2000); //logs // observera subscribed // observera: 0 // observerb subscribed // observera: 1 // observerb: 1 // observera unsubscribed // observerb: 2 // observerb unsubscribed
refcounted()
方法只存在于 connectableobservable
对象中,并且它返回的是一个 observable 而不是 connectableobservable
。
行为主题(behaviorsubject)
有一个 subjects 的变体就是 behaviorsubject
,它有一个 “当前值” 的概念。它会存储最近的发送给消费者的一个值,无论这个新的观察者是否订阅,它都将会立即从 behaviorsubject
接收这个 “当前值”。
behaviorsubject 对于表示 “过程值(values over time)” 是很有用的。例如一个表示生日的事件流是一个 subject,那么这个人的年龄的流将是一个 behaviorsubject
看下面的例子,behaviorsubject 初始化为 0,它在第一个观察者接收这个值的时候开始订阅。第二个观察者接收值 2,即使它在这个值 2 发送之后被订阅的。
import { behaviorsubject } from 'rxjs'; const subject = new behaviorsubject(0); subject.subject({ next: (v) => console.log(`observera: ${v}`) }); subject.next(1); subject.next(2); subject.subscribe({ next: v => console.log(`observerb: ${v}`) }); subject.next(3); // logs // observera: 0 // observera: 1 // observera: 2 // observerb: 2 // observera: 3 // observerb: 3
重播主题(replaysubject)
应答主题很像 behaviorsubject
,它能发送旧的值给新的订阅者,但是它也能记录部分 observable 的执行。
replaysubject 从 observable 的执行中记录多个值并且重新把这些值发送给新的订阅者
当创建一个 replaysubject
时,你可以指定这些如何重播:
import { replaysubject } from 'rxjs'; const subject = new replaysubject(3); // 缓冲 3 个值给新的订阅者 subject.subscribe({ next: v => console.log(`observera: ${v}`) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: v => console.log(`observerb: ${v}`) }); subject.next(5); // logs: // observera: 1 // observera: 2 // observera: 3 // observera: 4 // observerb: 2 // observerb: 3 // observerb: 4 // observera: 5 // observerb: 5
你也可以缓存大小里指定一个窗口时间,来确定记录那些值的年龄。在下面的代码中,我们使用 100 大小的缓冲,但是窗口时间参数是 500 毫秒。
import { replaysubject } from 'rxjs'; const subject = new replaysubject(100, 500 /* 窗口时间 */); subject.subscribe({ next: v => console.log(`observera: ${v}`) }); let i = 1; setinterval(() => subject.next(i++), 200); settimeout(() => { subject.subscribe({ next: v => console.log(`observerb: ${v}`) }) }, 1000); // logs // observera: 1 // observera: 2 // observera: 3 // observera: 4 // observera: 5 // observerb: 3 // observerb: 4 // observerb: 5 // observera: 6 // observerb: 6 // ...
异步主题(asyncsubject)
asyncsubject 是一个变体,它只会发送 observable 的执行的最后一个值给观察者们,并且只当执行完成的时候。
import { asyncsubject } from 'rxjs'; const subject = new asyncsubject(); subject.subscribe({ next: (v) => console.log(`observera: ${v}`) }) subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log(`observerb: ${v}`) }); subject.next(5); subject.complete(); // logs: // observera: 5 // observerb: 5
asyncsubject 跟 last()
操作符相似,它等待 complete
通知以便于发送一个值。
上一篇: Docker 容器命令大全