欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

Android异步框架 RxJava

程序员文章站 2024-01-20 17:33:40
观察者模式的概念 观察者A与被观察者B建立订阅关系,当被观察者B发生某种改变时,立即通知观察者A 添加依赖 基本模式 Observable被观察者 注意各地方添加泛型避免大片警告,onNext()是事件的回调,onComplete()是事件的结尾。onComplete()与onError互斥需要保持 ......

观察者模式的概念

观察者a与被观察者b建立订阅关系,当被观察者b发生某种改变时,立即通知观察者a

添加依赖

compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

基本模式

observable被观察者

注意各地方添加泛型避免大片警告,onnext()是事件的回调,oncomplete()是事件的结尾。oncomplete()与onerror互斥需要保持唯一性,并只能调用一次。

observable<string> observable= observable.create(new observableonsubscribe<string>() {
    @override
    public void subscribe(observableemitter<string> e) throws exception {
                e.onnext("消息1");
                e.onnext("消息2");
                e.onnext("消息3");
                e.oncomplete();
    }
});

observer观察者

创建观察者时回调的onsubscribe可以获取disposable对象,在合适的时候判断条件,调用dispose()即可接触订阅关系

observer<string> observer=new observer<string>() {
    @override
    public void onsubscribe(disposable d) {
        //通过判断解除订阅关系
         d.dispose();
    }

    @override
    public void onnext(string o) {
        //对应observable的onnext方法
    }

    @override
    public void onerror(throwable e) {
        //对应observable的onerror方法
    }

    @override
    public void oncomplete() {
        //对应observable的oncomplete方法
    }
};

建立订阅关系

observable.subscribeon(schedulers.io()) //指定事件生产在子线程
          .observeon(androidschedulers.mainthread()) //指定事件消费在ui线程
          .subscribe(observer);

observable被观察者的其他模式

//just模式,将自动发送onnext()事件
observable<string> observable = observable.just("发送消息");

//fromiterable模式,遍历集合,并自动发送onnext()事件
observable<string> observable = observable.fromiterable((iterable<string>) mlist);

//interval模式,定时自动发送整数序列,从0开始每隔2秒计数,
observable<long> observable = observable.interval(0,2, timeunit.seconds)

//range模式,自动发送特定的整数序列,0表示不发送,负数会抛异常,从1开始发送到20
observable<integer> observable = observable.range(1,20);

//timer模式,定时执行观察者的onnext()方法
observable<integer> observable = observable.timer(2, timeunit.seconds);

observable被观察者的更多创建方式以及操作符

如创建操作,数据过滤操作,条件操作,转载以下博客,很详细:

rxjava操作符大全

scheduler调度器

四种常见模式

schedulers.immediate() 默认模式,在当前线程运行

schedulers.newthread() 创建新的子线程运行

schedulers.io() 创建新的子线程运行,内部使用的是无上限的线程池,可重用空闲的线程,效率高

 androidschedulers.mainthread() 在ui主线程运行

订阅事件时的生产与消费线程

subscribeon() 指定observable(被观察者)所在的线程,或者叫做事件产生的线程

observeon() 指定 observer(观察者)所运行在的线程,或者叫做事件消费的线程

新的观察者模式

flowable被观察者

flowable<string> flowable = flowable.create(new flowableonsubscribe<string>() {
    @override
    public void subscribe(flowableemitter<string> e) throws exception {
                e.onnext("hello rxjava!");
                e.oncomplete();
    }
},backpressurestrategy.buffer);//增加背压模式

subscriber观察者

onsubscribe()会返回subscription对象,调用cancel()即可取消订阅关系,request()即可指定消费事件的数量 

subscriber<string> subscriber=new subscriber<string>() {
    @override
    public void onsubscribe(subscription s) {
         s.request(long.max_value);
    }

    @override
    public void onnext(string s) {
        log.i("rxjava", "onnext: "+s);
    }

    @override
    public void onerror(throwable t) {
        log.i("rxjava", "onerror");
    }

    @override
    public void oncomplete() {
        log.i("rxjava", "oncomplete");
    }
};
flowable.subscribe(subscriber);//建立订阅关系

backpressure背压模式

如果生产者和消费者不在同一线程的情况下,如果生产者的速度大于消费者的速度,就会产生backpressure问题。即异步情况下,backpressure问题才会存在。

buffer

所谓buffer就是把rxjava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生oom。

drop

当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉

latest

latest与drop功能基本一致,唯一的区别就是latest总能使消费者能够接收到生产者产生的最后一个事件

error

这种方式会在产生backpressure问题的时候直接抛出一个异常,这个异常就是著名的missingbackpressureexception