Rx.NET 简介
它支持基本所有的主流语言.
这里我简单介绍一下Rx.NET.
之前我写了几篇关于RxJS的文章, 概念性的东西推荐看这些:
http://www.cnblogs.com/cgzl/p/8641738.html
http://www.cnblogs.com/cgzl/p/8649477.html
http://www.cnblogs.com/cgzl/p/8662625.html
基本概念和RxJS是一样的.
下面开始切入正题.
Rx.NET总览
Rx.NET总体上看可以分为三个部分:
- 核心部分: Observables, Observers和Subjects
- LINQ和扩展, 用于查询和过滤Observables
- 并发和调度的支持
.NET Core的Events
.net core里面的event是通过委托对观察者模式的实现.
但是event在.net core里面并不是头等公民:
- 人们对它的语法+=评价是褒贬不一的.
- 很难进行传递和组合
- 很难进行event的连串(chaining)和错误处理(尤其是同一个event有多个handler的时候)
- event并没有历史记录
举个例子:
鼠标移动这个事件(event), 鼠标移动的时候会触发该事件, 这些事件会进入某个管道并记录该鼠标的坐标, 这样就会产生一个数据的集合/序列/流.
这里我们就是构建了一个基于时间线的鼠标坐标的序列, 每一次触发事件就会在这个管道上产生一个新的值. 在另一端, 一旦管道上有了新的值, 那么管道的观察者就会得到通知, 这些观察者通过提供回调函数的方式来注册到该管道上. 管道每次更新的时候, 这些回调函数就会被调用, 从而刷新了观察者的数据.
这个例子里, Observable就是管道, 一系列的值在这里被生成. Observer(观察者)在Observable有新的值的时候会被通知.
核心接口
IObservable:
- Subscribe(IObserver<T> observer)
IObserver
- void OnNext<T>(T value), 序列里有新的值的时候会调用这个
- void OnCompleted(), 序列结束的时候调用这个
- void OnError(Exception ex), 发生错误的时候调用这个
这个和RxJS基本是一样的.
Marble图
可以通过marble图来理解Rx
这图表示的是IObserver, 每当有新的值在Observable出现的时候, 传递到IObservable的Subscribe方法的参数IObserver的OnNext方法就会调用. 发生错误的话 OnError方法就会调用, 整个流也就结束了. 没有错误的话, 走到结束就会调用OnComplete方法. 不过有些Observable是不会结束的.
Observable.Subscribe()返回的Subscription对象被Dispose后, Observer就无法收到新的数据了.
创建Observable流/序列
创建流/序列的方式:
- 返回简单的值
- 包装现有的值
- 写一个生成函数
简单的Observables
- Observable.Empty 返回一个直接结束的Obsevable序列
- Observable.Never 返回一个没有值, 且永远不会结束的序列
- Observable.Throw(exception), 返回一个带有错误的序列
- Observable.Return(xxx) 返回单值的序列
包装Observables
可以包装下面这些来返回Observable:
- Action
- Observable.Start(() => 42) 返回一个含有42的序列, 并在Action结束的时候, OnComplete方法被调用.
- Task
- Task.ToObservable() 使用这个扩展方法进行包装, 当Task结束的时候, Observable推送新的数据, 然后结束
- IEnumerable
- ienumerable.ToObservable() 也是扩展方法, ienumerable的每个值都会作为新的值被推送到Observable上, 最后结束OnComplete
- Event
- Observable.FromEventPattern(obj, "xxChanged") 这是个工厂方法, 需要提供触发event的对象和event的名字.
生成函数
- Range
- Interval, Timer
- Create(低级), Generate
看图解释:
Observable.Range(1, 4):
Observable.Interval(200):
Observable.Timer(200, () => 42):
Observable.Create<int>(o => { o.OnNext(42); o.OnComplete(); return Disposable.Empty; });
Observable.Generate(1, value => value < 5, value => value + 1, value => value);
例子
using System; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Threading.Tasks; namespace Test { class Program { static void Main(string[] args) { var sequence = GetTaskObservable(); sequence.Subscribe ( x => Console.WriteLine($"OnNext: {x}"), ex => Console.WriteLine($"OnError: {ex}"), () => Console.WriteLine("OnCompleted") ); Console.ReadKey(); } private static IObservable<int> GetSimpleObservable() { return Observable.Return(42); } private static IObservable<int> GetThrowObservable() { return Observable.Throw<int>(new ArgumentException("Error in observable")); } private static IObservable<int> GetEmptyObservable() { return Observable.Empty<int>(); } private static IObservable<int> GetTaskObservable() { return GetTask().ToObservable(); } private static async Task<int> GetTask() { return 42; } private static IObservable<int> GetRangeObservable() { return Observable.Range(2, 10); } private static IObservable<long> GetIntervalObservable() { return Observable.Interval(TimeSpan.FromMilliseconds(200)); } private static IObservable<int> GetCreateObservable() { return Observable.Create<int>(observer => { observer.OnNext(1); observer.OnNext(2); observer.OnNext(3); observer.OnNext(4); observer.OnCompleted(); return Disposable.Empty; }); } private static IObservable<int> GetGenerateObservable() { return Observable.Generate( 1, x => x < 5, x => x + 1, x => x ); } } }
请自行运行查看结果.
Cold 和 Hot Observable
Cold: Observable可以为每个Subscriber创建新的数据生产者
Hot: 每个Subscriber从订阅的时候开始在同一个数据生产者那里共享其余的数据.
从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.
举个例子:
Cold: 就相当于我在腾讯视频买体育视频会员, 可以从头看里面的足球比赛.
Hot: 就相当于看足球比赛的现场直播, 如果来晚了, 那么前面就看不到了.
把Cold 变 Hot, 使用.Publish()方法.
把Hot 变 Cold, 使用.Subscribe()方法把它变成Subject即可.
过滤和控制序列
LINQ操作符
操作符的类型:
- 过滤
- 合并
- 聚合
- 工具
过滤
sequence.Where(x => x % 2 == 0):
.OfType<Square>():
移除重复的:
.Distinct():
.DistinctUntilChanged():
过滤头尾元素:
.Take(2) .Skip(2):
.SkipLast(2) .TakeLast(2):
序列的阀:
a.TakeUnit(b)l a.SkipUntil(b):
实际例子: 把鼠标移动和点击转化为拖拽:
代码非常的简单:
var mouseDrags = mouseMoves.SkipUntil(mouseDowns).TakeUnit(mouseUps);
合并
a.Merge(b)
a.Amb(b), 其中的amb是ambiguous的缩写:
a.Concat(b):
为序列配对:
a.CombineLatest(b, (x, y) => x + y):
a.Zip(b, (x, y) => x + y):
序列的序列:
Merge()是可以达到这种效果的:
.Switch():
聚合
聚合就是指把序列聚合成一个值, 在序列结束后才能返回值
Count() Sum():
Aggregate():
Scan():
其他工具操作符
会有一些副作用
.Do(x => Log(x)): 但是记住不要改变序列的元素
.TimeStamp():
.Throttle(TimeSpan.FromSeconds(1)):
异步和多线程
异步就表示不一定按顺序执行, 但是它可以保证非阻塞, 通常会有回调函数(或者委托或者async await).
但是异步对于Rx来说就是它的本性
Rx的同步异步对比:
多线程
Rx不是多线程的, 但是它是线程*的(就是可以使用多个线程), 它被设计成只是用必须的线程而已.
多线程表示, 同时有多个线程在执行. 也可以称作并发. 它可以分担计算量. 但是据需要考虑线程安全了.
Rx已经做了一些抽象, 所以不必过多的考虑线程安全了.
例如:
Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(xxx):
UI的例子:
Observable.Interval(TimeSpan.FromSeconds(1)).ObserveOn(SynchronizationContext.Current).Subscribe(t => searchBox.Text = t.ToString()):
如果计算量比较大的话:
Observable.Create(大量工作).Subscribe(xxx):
UI假死, 这就不好了.
应该这样:
Observable.Create(大量工作).SubscribeOn(NewThreadScheduler.Default).ObserveOn(SynchronizationContext.Current).Subscribe(xxx):
Schedulers
Scheduler可以在Rx里面安排执行动作. 它使用IScheduler接口.
现在就可以把Scheduler理解为是对未来执行的一个抽象.
它同时也负责着Rx所有的并发工作.
Rx提供了很多Scheduler.
下面是.net现有有很多种在未来执行动作的方法:
Rx里面就这个:
IScheduler接口:
基本上不用直接去使用IScheduler, 因为内置了很多现成的Schedulers了:
- Immediate, 这是唯一一个不是异步的Scheduler
- CurrentThread
- EventLoop
- Dispatcher
- NewThread
- TaskPool, ThreadPool
Schedulers实际上到处都使用着:
应该用哪个Scheduler?
Fake Scheduler:
用于测试
上一篇: 好奇心旺盛的喵星人