响应式编程 RxJava
编写不易,转载请注明(http://shihlei.iteye.com/blog/2426891)!
一 概述
最近使用Hystrix,看文档Hystrix底层基于Rxjava实现,很好奇,就一起研究了下,做个总结
二 响应式编程(Reactive Programming)
定义:一种基于异步数据流概念的编程模式。
核心:事件,可以被触发,传播,订阅
特点:易于并发,易于编写按条件执行代码,避免大量回调
最常见的使用场景:发送网络请求,获得结果,提交事件,更新
注:主要是基于观察者模式,关于观察者模式可以参考我之前的文章:《观察者模式及Guava EventBus》
三 RxJava
1)概述
Rx: ReactiveX, 官方定义是基于“观察者模式”,实现基于事件流异步处理的lib,是用于实现响应式编程的框架。
特点:
(a)链式调用,异步处理复杂问题,
(b)使用推的方式,有数据主动推给消费者
主要类:
1)被观察者:Observable
(1)注册观察者 subscribe()
(2)事件回调
(3)增加如下方法
onComplated():用于生产者没有更多数据可用时发出通知
onError():生产者发生错误时能够发出通知
(4)Observables 能够组合而不是嵌套,避免发生大量嵌套回调
2)观察者:Observer
(1)事件消费 consumer()
(2)提供如下回调可以可以模块处理各种情况
2)简单demo
(1)依赖
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency>
(2)代码
就是个简单的“ 观察者模式 ” ,看看怎么做:
package x.rx.rxjava.demo; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; /** * RxJava Demo * <p> * * @author shilei */ public class RxJavaDemo { public static void main(String[] args) { RxJavaDemo rxJavaDemo = new RxJavaDemo(); rxJavaDemo.runObservable(); } /** * 1)创建一个可观察对象,不断提供事件数据 * 2)创建一个观察者,提供事件处理方法 */ public void runObservable() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { //推送事件流 emitter.onNext("Event : 1"); emitter.onNext("Event : 2"); emitter.onComplete(); } }).subscribe(new Consumer<String>() { //接受事件处理 @Override public void accept(String s) throws Exception { System.out.println("handle Event : " + s); } }, new Consumer<Throwable>() { // 事件产生过程产生error时 @Override public void accept(Throwable throwable) throws Exception { System.out.println("Observable : onError " + throwable.getMessage()); } }, new Action() { // 收到complete通知时 @Override public void run() throws Exception { System.out.println("Observable : onComplete"); } }); } }
3)源码分析:看看“观察者模式”怎么实现的
(1)创建一个被观察者:Observable.create()
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
其中主要的被观察者:ObservableCreate<T>(source) 对象,包装了产生事件的回调方法
(2)注册观察者:subscribe()方法,这里重要的是创建一个观察者:LambdaObserver,主要为支持流式开发,会将方法拆成函数接口填充。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //省略:。。。。。。 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; }
继续调用重载方法
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { //省略:。。。。。。 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { //省略:。。。。。。 } }
找到最终的核心:回到subscribeActual(observer); 方法subscribeActual(observer) 是一个抽象方法,具体取决于我们创建Observable,Demo中,就是(1)中create的 new ObservableCreate<T>(source);看看他的subscribeActual 实现:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
主要是创建一个CreateEmitter<T> 用于调用推送数据的回调。并关联了刚才的“观察者”,上面的 LambdaObserver
这里最重要的一步,启动推动数据:source.subscribe(parent); 这步就是我们demo中提供的回调方法,还记得吗,会放下
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { //推送事件流 emitter.onNext("Event : 1"); emitter.onNext("Event : 2"); emitter.onComplete(); } }
传进来的就是刚才源码中的内部类CreateEmitter<T>,所以调用 onNext,onComplete,onError 等都是调用 CreateEmitter<T> 的,其底层完成 LambdaObserver 的调用,及执行我们通过subscribe()提供的各种绑定方法
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } //省略 。。。。。。 }
至此流程调用链得以执行,绕了这么大圈好坏,我还要想想。
4)Observable(被观察者)常用操作
(1)创建:
Observable.from() :遍历集合或数组,逐个推送其中的数据。
Observable.just(): 推送一个普通的Java函数的返回值:非常有用,我们可以将远程调用的结果,推送到处理类中。
Observable .just():可以接受多个参数进行推送
Observable.timer()、Observable.interval(): 定时发送,可以用于代替定时任务
(2)预处理:
map() 在“观察者”消费前,将处理逻辑作用于每个推送的数据
(3)过滤:
filter() 从推送的数据中过滤出想要的进行处理
distinct() 排重,用于推送是出错引起的重复推送的情况(推送的数据一般实现Comparable接口)
distinctUnitChange() 观察者可能一直有数据产生,这里要求直到有数据变化,观察者才能接收到
(4)组合:同时处理多个来源的事件
marge() 将多个Observable 的数据合并到一起处理
concat() 顺序执行,特别适合使用队列的情况
四 RxJava 应用场景举例
1)场景一:异步网络请求
(1)依赖
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.0.6.RELEASE</version> </dependency>
(2)代码
package x.rx.rxjava.demo; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.web.client.RestTemplate; /** * RxJava 异步网络请求 * * @author shilei */ public class WeatherRxJavaDemo { private static final String WATHER_OPEN_API_URL = "http://www.weather.com.cn/data/cityinfo/101010100.html"; public static void main(String[] args) throws Exception { new WeatherRxJavaDemo().run(); //等待异步处理结束 TimeUnit.SECONDS.sleep(1L); } private void run() { Observable .create(emitter -> { try { emitter.onNext(getWatherResult()); emitter.onComplete(); }catch (Throwable e){ emitter.onError(e); } }) .subscribeOn(Schedulers.newThread())//异步 .subscribe(weatherResult -> { //正常获得结果 System.out.println("weather : " + weatherResult); }, throwable -> { //异常记录日志 System.out.println("weather error : " + throwable.getMessage()); }); } private String getWatherResult() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8)); return restTemplate.getForObject(WATHER_OPEN_API_URL, String.class); } }
2)场景二:Observable 处理链
使用concat 连接Observable ,任何一个满足要求推送数据,后面的就不会执行。
这里实现cache的逐级读取模拟。
package x.rx.rxjava.demo; import java.util.Objects; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.schedulers.Schedulers; /** * rxjava demo: 实现数据逐级缓存异步读取 * <p> * * @author shilei */ public class ObservableConcatDemo { public static void main(String[] args) throws Exception { new ObservableConcatDemo().run(); } private void run() throws Exception { //任务1 Observable<String> memeryCacheJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = "memery data"; // 模拟从内存查询数据,这里模拟查不到 if (Objects.nonNull(data)) { // 推送从memery查到了,推动结果,同时整个流程 emitter.onNext(data); } else { //查差不到,结束 System.out.println("get date from memery : null"); emitter.onComplete(); } } }); Observable<String> redisCacheJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = null; //模拟从redis 查询数据,这里模拟查不到 if (Objects.nonNull(data)) { //从redis 查到了,推送结果,同时整个流程 emitter.onNext(data); } else { System.out.println("get date from redis : null"); emitter.onComplete(); } } }); Observable<String> mysqlJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = "db data"; //模拟从db 查询数据,查询成功 if (Objects.nonNull(data)) { //db 查到了,推送结果,同时整个流程 emitter.onNext(data); } else { System.out.println("get date from db : null"); emitter.onComplete(); } } }); // 使用concat 任何一个满足要求推送数据,后面的就不会执行。 Observable.concat(memeryCacheJob, redisCacheJob, mysqlJob) .subscribeOn(Schedulers.newThread()) //异步 .subscribe(data -> { System.out.println("data result : " + data); }); //避免主线程退出 TimeUnit.SECONDS.sleep(1L); } }
上一篇: Java常用类和异常
下一篇: 观察者模式及Guava EventBus