谈谈RxJava2中的异常及处理方法
前言
众所周知,rxjava2 中当链式调用中抛出异常时,如果没有对应的 consumer 去处理异常,则这个异常会被抛出到虚拟机中去,android 上的直接表现就是 crash,程序崩溃。
订阅方式
说异常处理前咱们先来看一下 rxjava2 中 observable 订阅方法 subscribe() 我们常用的几种订阅方式:
// 1 subscribe() // 2 disposable subscribe(consumer<? super t> onnext) // 3 disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror) // 4 disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror,action oncomplete) // 5 disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror,action oncomplete, consumer<? super disposable> onsubscribe) // 6 void subscribe(observer<? super t> observer)
无参和以 consumer为参数的几种方法内部都是以默认参数补齐的方式最终调用第 5 个方法,而方法 5 内部通过 lambdaobserver 将参数包装成 observer 再调用第 6 个方法
public final disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror, action oncomplete, consumer<? super disposable> onsubscribe) { objecthelper.requirenonnull(onnext, "onnext is null"); objecthelper.requirenonnull(onerror, "onerror is null"); objecthelper.requirenonnull(oncomplete, "oncomplete is null"); objecthelper.requirenonnull(onsubscribe, "onsubscribe is null"); lambdaobserver<t> ls = new lambdaobserver<t>(onnext, onerror, oncomplete, onsubscribe); subscribe(ls); return ls; }
所以使用 consumer 参数方式和 observer 参数方式进行订阅除了观察回调来源不一样其他没有任何差别。但就是因为这种差别,在异常情况发生时的处理结果上也会产生差别
异常处理
我们分别进行一下几种方式模拟异常:
1、observer onnext 中抛出异常(切换线程)
apiservice.newjsonkeydata() .doonsubscribe { t -> compositedisposable.add(t) } .compose(rxscheduler.sync()) // 封装的线程切换 .subscribe(object : observer<list<zoodata>> { override fun oncomplete() { } override fun onsubscribe(d: disposable) { } override fun onnext(t: list<zoodata>) { throw runtimeexception("runtime exception") } override fun onerror(e: throwable) { log.d("error", e.message) } })
结果:不会触发 onerror,app 崩溃
2、observer onnext 中抛出异常(未切换线程)
observable.create<string> { it.onnext("ssss") } .subscribe(object : observer<string> { override fun oncomplete() { } override fun onsubscribe(d: disposable) { } override fun onnext(t: string) { log.d("result::", t) throw runtimeexception("run llllll") } override fun onerror(e: throwable) { log.e("sss", "sss", e) } })
结果:会触发 onerror,app 未崩溃
3、observer map 操作符中抛出异常
apiservice.newjsonkeydata() .doonsubscribe { t -> compositedisposable.add(t) } .map { throw runtimeexception("runtime exception") } .compose(rxscheduler.sync()) .subscribe(object : observer<list<zoodata>> { override fun oncomplete() { } override fun onsubscribe(d: disposable) { } override fun onnext(t: list<zoodata>) { } override fun onerror(e: throwable) { log.d("error", e.message) } })
结果:会触发 observer 的 onerror,app 未崩溃
4、consumer onnext 中抛出异常
apiservice.newjsonkeydata() .doonsubscribe { t -> compositedisposable.add(t) } .compose(rxscheduler.sync()) .subscribe({ throw runtimeexception("messsasassssssssssssssssssssssssssssssssssssss") }, { log.d("error", it.message) })
结果 a:有 errorconsumer 触发 errorconsumer,app 未崩溃
apiservice.newjsonkeydata() .doonsubscribe { t -> compositedisposable.add(t) } .compose(rxscheduler.sync()) .subscribe { throw runtimeexception("messsasassssssssssssssssssssssssssssssssssssss") }
结果 b:无 errorconsumer,app 崩溃
那么为什么会出现这些不同情况呢?我们从源码中去一探究竟。
consumer 订阅方式的崩溃与不崩溃
subscribe() 传入 consumer 类型参数最终在 observable 中会将传入的参数转换为 lambdaobserver 再调用 subscribe(lambdaobserver)进行订阅。展开 lambdaobserver:(主要看 onnext 和 onerror 方法中的处理)
. . . @override public void onnext(t t) { if (!isdisposed()) { try { onnext.accept(t); } catch (throwable e) { exceptions.throwiffatal(e); get().dispose(); onerror(e); } } } @override public void onerror(throwable t) { if (!isdisposed()) { lazyset(disposablehelper.disposed); try { onerror.accept(t); } catch (throwable e) { exceptions.throwiffatal(e); rxjavaplugins.onerror(new compositeexception(t, e)); } } else { rxjavaplugins.onerror(t); } } . . .
onnext 中调用了对应 consumer 的 apply() 方法,并且进行了 try catch。因此我们在 consumer 中进行的工作抛出异常会被捕获触发 lambdaobserver 的 onerror。再看 onerror 中,如果订阅未取消且 errorconsumer 的 apply() 执行无异常则能正常走完事件流,否则会调用 rxjavaplugins.onerror(t)。看到这里应该就能明白了,当订阅时未传入 errorconsumer时 observable 会指定 onerrormissingconsumer 为默认的 errorconsumer,发生异常时抛出 onerrornotimplementedexception。
rxjavaplugins.onerror(t)
上面分析,发现异常最终会流向 rxjavaplugins.onerror(t)。这个方法为 rxjava2 提供的一个全局的静态方法。
public static void onerror(@nonnull throwable error) { consumer<? super throwable> f = errorhandler; if (error == null) { error = new nullpointerexception("onerror called with null. null values are generally not allowed in 2.x operators and sources."); } else { if (!isbug(error)) { error = new undeliverableexception(error); } } if (f != null) { try { f.accept(error); return; } catch (throwable e) { // exceptions.throwiffatal(e); todo decide e.printstacktrace(); // nopmd uncaught(e); } } error.printstacktrace(); // nopmd uncaught(error); }
查看其源码发现,当 errorhandler 不为空时异常将由其消耗掉,为空或者消耗过程产生新的异常则 rxjava 会将异常抛给虚拟机(可能导致程序崩溃)。 errorhandler本身是一个 consumer 对象,我们可以通过如下方式配置他:
rxjavaplugins.seterrorhandler(object : consumer1<throwable> { override fun accept(t: throwable?) { todo("not implemented") //to change body of created functions use file | settings | file templates. } })
数据操作符中抛出异常
以 map 操作符为例,map 操作符实际上 rxjava 是将事件流 hook 了另一个新的 observable observablemap
@checkreturnvalue @schedulersupport(schedulersupport.none) public final <r> observable<r> map(function<? super t, ? extends r> mapper) { objecthelper.requirenonnull(mapper, "mapper is null"); return rxjavaplugins.onassembly(new observablemap<t, r>(this, mapper)); }
进入 observablemap 类,发现内部订阅了一个内部静态类 mapobserver,重点看 mapobserver 的 onnext 方法
public void onnext(t t) { if (done) { return; } if (sourcemode != none) { downstream.onnext(null); return; } u v; try { v = objecthelper.requirenonnull(mapper.apply(t), "the mapper function returned a null value."); } catch (throwable ex) { fail(ex); return; } downstream.onnext(v); }
onnext 中 try catch 了 mapper.apply(),这个 apply 执行的就是我们在操作符中实现的 function 方法。因此在 map 之类数据变换操作符中产生异常能够自身捕获并发送给最终的 observer。如果此时的订阅对象中能消耗掉异常则事件流正常走 onerror() 结束,如果订阅方式为上以节中的 consumer,则崩溃情况为上一节中的分析结果。
observer 的 onnext 中抛出异常
上述的方式 1 为一次网络请求,里面涉及到线程的切换。方式 2 为直接 create 一个 observable 对象,不涉及线程切换,其结果为线程切换后,观察者 observer 的 onnext() 方法中抛出异常无法触发 onerror(),程序崩溃。
未切换线程的 observable.create
查看 create() 方法源码,发现内部创建了一个 observablecreate 对象,在调用订阅时会触发 subscribeactual() 方法。在 subscribeactual() 中再调用我们 create 时传入的 observableonsubscribe 对象的 subscribe() 方法来触发事件流。
@override protected void subscribeactual(observer<? super t> observer) { // 对我们的观察者使用 createemitter 进行包装,内部的触发方法是相对应的 createemitter<t> parent = new createemitter<t>(observer); observer.onsubscribe(parent); try { // source 为 create 时创建的 observableonsubscribe 匿名内部接口实现类 source.subscribe(parent); } catch (throwable ex) { exceptions.throwiffatal(ex); parent.onerror(ex); } }
上述代码中的订阅过程是使用 try catch 今夕包裹的。订阅及订阅触发后发送的事件流都在一个线程,所以能够捕获整个事件流中的异常。(ps : 大家可以尝试下使用 observeon() 切换事件发送线程。会发现异常不能再捕获,程序崩溃)
涉及线程变换时的异常处理
retrofit 进行网络请求返回的 observable 对象实质上是 rxjava2calladapter 中生成的 bodyobservable,期内部的 onnext 是没有进行异常捕获的。其实这里是否捕获并不是程序崩溃的根本原因,因为进行网络请求,必然是涉及到线程切换的。就算此处 try catch 处理了,也并不能捕获到事件流下游的异常。
@override public void onnext(response<r> response) { if (response.issuccessful()) { observer.onnext(response.body()); } else { terminated = true; throwable t = new httpexception(response); try { observer.onerror(t); } catch (throwable inner) { exceptions.throwiffatal(inner); rxjavaplugins.onerror(new compositeexception(t, inner)); } } }
以我们在最终的 observer 的 onnext 抛出异常为例,要捕获这次异常那么必须在最终的调用线程中去进行捕获。即 .observeon(androidschedulers.mainthread()) 切换过来的 android 主线程。与其他操作符一样,线程切换时产生了一组新的订阅关系,rxjava 内部会创建一个新的观察对象 observableobserveon。
@override public void onnext(t t) { if (done) { return; } if (sourcemode != queuedisposable.async) { queue.offer(t); } schedule(); } . . . void schedule() { if (getandincrement() == 0) { worker.schedule(this); // 执行 observableobserveon 的 run 方法 } } . . . @override public void run() { if (outputfused) { drainfused(); } else { drainnormal(); } }
而执行任务的 worker 即为对应线程 scheduler 的对应实现子类所创建的 worker,以 androidschedulers.mainthread() 为例,scheduler 实现类为 handlerscheduler,其对应 worker 为 handlerworker,最终任务交给 scheduledrunnable 来执行。
private static final class scheduledrunnable implements runnable, disposable { private final handler handler; private final runnable delegate; private volatile boolean disposed; // tracked solely for isdisposed(). scheduledrunnable(handler handler, runnable delegate) { this.handler = handler; this.delegate = delegate; } @override public void run() { try { delegate.run(); } catch (throwable t) { rxjavaplugins.onerror(t); } } @override public void dispose() { handler.removecallbacks(this); disposed = true; } @override public boolean isdisposed() { return disposed; } }
会发现,run 中 进行了 try catch。但 catch 内消化异常使用的是全局异常处理 rxjavaplugins.onerror(t);,而不是某一个观察者的 onerror。所以在经过切换线程操作符后,观察者 onnext 中抛出的异常,onerror 无法捕获。
处理方案
既然知道了问题所在,那么处理问题的方案也就十分清晰了。
1、注册全局的异常处理
rxjavaplugins.seterrorhandler(object : consumer<throwable> { override fun accept(t: throwable?) { // do something } })
2、consumer 作为观察者时,不完全确定没有异常一定要添加异常处理 consumer
apiservice.stringdata() .doonsubscribe { t -> compositedisposable.add(t) } .compose(rxscheduler.sync()) .subscribe(consumer<boolean>{ }, consumer<throwable> { })
3、observer 可以创建一个 baseobaerver 将 onnext 内部进行 try catch 人为的流转到 onerror 中,项目中的观察这都使用这个 baseobserver 的子类。
@override public void onnext(t t) { try { onsuccess(t); } catch (exception e) { onerror(e); } data = t; success = true; }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。