使用RxJava中遇到的一些”坑“
前言
大家越用rxjava,越觉得它好用,所以不知不觉地发现代码里到处都是rxjava的身影。然而,rxjava也不是银弹,其中仍然有很多问题需要解决。这里,我简单地总结一下自己遇到的一些“坑”,内容上可能会比较松散。
一、考虑主线程的切换
rxjava中一个常用的使用方法是——在其他线程中做处理,然后切换到ui线程中去更新页面。其中,线程切换就是使用了observeon()。后台下载文件,前台显示下载进度就可以使用这种方式完成。然而,实践发现这其中有坑。如果文件比较大,而下载包的粒度又比较小,这将导致很多通知积压下来,最终导致错误。
这种错误其实也是可以理解的,毕竟mainlooper是根据message来工作的,message过多必然会导致一些问题。当然,这还是比较想当然的想法,最终还是需要到源码中一探究竟。observeon的原理在前面关于rxjava的文章已经有过分析,这里还是简单列一下代码。其中的重点还是operatorobserveon的内部类——observeonsubscriber。其重要代码片段如下:
/** observe through individual queue per observer. */ static final class observeonsubscriber<t> extends subscriber<t> implements action0 { final subscriber<? super t> child; final scheduler.worker recursivescheduler; final notificationlite<t> on; final boolean delayerror; final queue<object> queue; /** the emission threshold that should trigger a replenishing request. */ final int limit; // the status of the current stream volatile boolean finished; final atomiclong requested = new atomiclong(); final atomiclong counter = new atomiclong(); /** * the single exception if not null, should be written before setting finished (release) and read after * reading finished (acquire). */ throwable error; /** remembers how many elements have been emitted before the requests run out. */ long emitted; // do not pass the subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the subscription is chained public observeonsubscriber(scheduler scheduler, subscriber<? super t> child, boolean delayerror, int buffersize) { this.child = child; this.recursivescheduler = scheduler.createworker(); this.delayerror = delayerror; this.on = notificationlite.instance(); int calculatedsize = (buffersize > 0) ? buffersize : rxringbuffer.size; // this formula calculates the 75% of the buffersize, rounded up to the next integer this.limit = calculatedsize - (calculatedsize >> 2); if (unsafeaccess.isunsafeavailable()) { queue = new spscarrayqueue<object>(calculatedsize); } else { queue = new spscatomicarrayqueue<object>(calculatedsize); } // signal that this is an async operator capable of receiving this many request(calculatedsize); } void init() { // don't want this code in the constructor because `this` can escape through the // setproducer call subscriber<? super t> localchild = child; localchild.setproducer(new producer() { @override public void request(long n) { if (n > 0l) { backpressureutils.getandaddrequest(requested, n); schedule(); } } }); localchild.add(recursivescheduler); localchild.add(this); } @override public void onnext(final t t) { if (isunsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onerror(new missingbackpressureexception()); return; } schedule(); } @override public void oncompleted() { if (isunsubscribed() || finished) { return; } finished = true; schedule(); } @override public void onerror(final throwable e) { if (isunsubscribed() || finished) { rxjavahooks.onerror(e); return; } error = e; finished = true; schedule(); } protected void schedule() { if (counter.getandincrement() == 0) { recursivescheduler.schedule(this); } } }
关键点就在于这个queue成员,这个队列存放了需要进行发送给下行线程的消息。对于主线程来说,符合其实是比较重的,从消息的生产者和消费者的模式讲,过多过快的消息会导致消息阻塞。甚至,都到不了阻塞的情况,因为queue的大小会有上限,在onnext()
方法中的queue.offer()
可能会产生异常,这取决于queue的实现方式。但无论如何都不可能无限大,所以无法保证绝对不出异常。
解决这个问题的方法其实也很简单,可以在生产者降低消息的产生频率。也可以在消息处理的时候先不进行线程切换,而是通过判断,在必要的时候进行线程切换,比如使用runonuithread()
。
二、rxjava避免内存泄漏
rxjava的响应式机制本质上还是回调实现的,因此内存泄漏也是会出现的。倘若不对subscription进行管理,内存泄漏会非常严重。对于subscription,其实有几个比较广泛使用的方法,比如rxlifecycle,以及简单的compositesubscription。至于它们的使用方法,其实都非常简单,这里就不赘述了。
说到内存泄漏,就谈点题外话,动画也可能导致内存泄漏。其原因仍然是一些回调函数,这些回调函数实现的view变化的功能,但是在被撤销以后,回调函数没有取消掉,同时view可能持有context信息,从而导致内存泄漏。最近才发现,loadtoastview这个开源库一直存在内存泄漏,其原因正如上文所说。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。