欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

异步编程RxJava-介绍

程序员文章站 2022-07-09 20:26:19
前言前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了Future和CompletableFuture;之后有同学在下面留言提到了RxJava,刚好最近在看微服务设计这 ......

前言
前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了future和completablefuture;之后有同学在下面留言提到了rxjava,刚好最近在看这本书,里面提到了响应式扩展(reactive extensions,rx),而rxjava是rx在jvm上的实现,所有打算对rxjava进一步了解。

rxjava简介
rxjava的官网地址:https://github.com/reactivex/rxjava
其中对rxjava进行了一句话描述:rxjava – reactive extensions for the jvm – a library for composing asynchronous and event-based programs using observable sequences for the java vm.
大意就是:一个在java vm上使用可观测的序列来组成异步的、基于事件的程序的库。
更详细的说明在netflix技术博客的一篇文章中描述了rxjava的主要特点:
1.易于并发从而更好的利用服务器的能力。
2.易于有条件的异步执行。
3.一种更好的方式来避免回调地狱。
4.一种响应式方法。

与completablefuture对比
之前提到completablefuture真正的实现了异步的编程模式,一个比较常见的使用场景:

completablefuture<integer> future = completablefuture.supplyasync(耗时函数);
future<integer> f = future.whencomplete((v, e) -> {
        system.out.println(v);
        system.out.println(e);
});
system.out.println("other...");

下面用一个简单的例子来看一下rxjava是如何实现异步的编程模式:

observable<long> observable = observable.just(1, 2)
        .subscribeon(schedulers.io()).map(new func1<integer, long>() {
            @override
            public long call(integer t) {
                try {
                    thread.sleep(1000); //耗时的操作
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
                return (long) (t * 2);
            }
        });
observable.subscribe(new subscriber<long>() {
    @override
    public void oncompleted() {
        system.out.println("oncompleted");
    }
    @override
    public void onerror(throwable e) {
        system.out.println("error" + e);
    }
    @override
    public void onnext(long result) {
        system.out.println("result = " + result);
    }
});
system.out.println("other...");

func1中以异步的方式执行了一个耗时的操作,subscriber(观察者)被订阅到observable(被观察者)中,当耗时操作执行完会回调subscriber中的onnext方法。
其中的异步方式是在subscribeon(schedulers.io())中指定的,schedulers.io()可以理解为每次执行耗时操作都启动一个新的线程。
结构上其实和completablefuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要rxjava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,rxjava还有很多好用的功能,在下面的内容会进行介绍。

异步观察者模式
上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者observable,然后把观察者subscriber添加到了被观察者列表中;
rxjava中一共提供了四种角色:observable、observer、subscriber、subjects
observables和subjects是两个被观察者,observers和subscribers是观察者;
当然我们也可以查看一下源码,看一下jdk中的observer和rxjava的observer
jdk中的observer:

public interface observer {
    void update(observable o, object arg);
}

rxjava的observer:

public interface observer<t> {
    void oncompleted();
    void onerror(throwable e);
    void onnext(t t);
}

同时可以发现subscriber是implements observer的:

public abstract class subscriber<t> implements observer<t>, subscription

可以发现rxjava中在observer中引入了2个新的方法:oncompleted()和onerror()
oncompleted():即通知观察者observable没有更多的数据,事件队列完结
onerror():在事件处理过程中出异常时,onerror()会被触发,同时队列自动终止,不允许再有事件发出。
正是因为rxjava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现rxjava的价值,所以这里给他命名为异步观察者模式

好了,下面正式介绍rxjava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍

maven引入

<dependency>
    <groupid>io.reactivex</groupid>
    <artifactid>rxjava</artifactid>
    <version>1.2.4</version>
</dependency>

创建observable
1.create()创建一个observable,并为它定义事件触发规则

observable<integer> observable = observable
            .create(new observable.onsubscribe<integer>() {
                @override
                public void call(subscriber<? super integer> observer) {
                    for (int i = 0; i < 5; i++) {
                        observer.onnext(i);
                    }
                    observer.oncompleted();
                }
            });
observable.subscribe(new observer<integer>() {...});

2.from()可以从一个列表中创建一个observable,observable将发射出列表中的每一个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items);
observable.subscribe(new observer<integer>() {...});

3.just()将传入的参数依次发送出来

observable<integer> observable = observable.just(1, 2, 3);
observable.subscribe(new observer<integer>() {...});

过滤observable
1.filter()来过滤我们观测序列中不想要的值

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).filter(
        new func1<integer, boolean>() {
            @override
            public boolean call(integer t) {
                return t == 1;
            }
        });
observable.subscribe(new observer<integer>() {...});

2.take()和tasklast()分别取前几个元素和后几个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).take(3);
observable.subscribe(new observer<integer>() {...});
observable<integer> observable = observable.from(items).takelast(2);

3.distinct()和distinctuntilchanged()
distinct()过滤掉重复的值

list<integer> items = new arraylist<integer>();
items.add(1);
items.add(10);
items.add(10);
observable<integer> observable = observable.from(items).distinct();
observable.subscribe(new observer<integer>() {...});

distinctuntilchanged()列发射一个不同于之前的一个新值时让我们得到通知

list<integer> items = new arraylist<integer>();
items.add(1);
items.add(100);
items.add(100);
items.add(200);
observable<integer> observable = observable.from(items).distinctuntilchanged();
observable.subscribe(new observer<integer>() {...});

4.first()和last()分别取第一个元素和最后一个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
// observable<integer> observable = observable.from(items).first();
observable<integer> observable = observable.from(items).last();
observable.subscribe(new observer<integer>() {...});

5.skip()和skiplast()分别从前或者后跳过几个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
// observable<integer> observable = observable.from(items).skip(2);
observable<integer> observable = observable.from(items).skiplast(2);
observable.subscribe(new observer<integer>() {...});

6.elementat()取第几个元素进行发射

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).elementat(2);
observable.subscribe(new observer<integer>() {...});

7.sample()指定发射间隔进行发射

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 50000; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).sample(1,timeunit.microseconds);
observable.subscribe(new observer<integer>() {...});

8.timeout()设定的时间间隔内如果没有得到一个值则发射一个错误

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).timeout(1,timeunit.microseconds);
observable.subscribe(new observer<integer>() {...onerror()...});

9.debounce()在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).debounce(1,timeunit.microseconds);
observable.subscribe(new observer<integer>() {...});

转换observable
1.map()接收一个指定的func对象然后将它应用到每一个由observable发射的值上

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).map(
        new func1<integer, integer>() {
            @override
            public integer call(integer t) {
                return t * 2;
            }
        });
observable.subscribe(new observer<integer>() {...});

2.flatmap()函数提供一种铺平序列的方式,然后合并这些observables发射的数据

final scheduler scheduler = schedulers.from(executors.newfixedthreadpool(3));
list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).flatmap(
        new func1<integer, observable<? extends integer>>() {
            @override
            public observable<? extends integer> call(integer t) {
                list<integer> items = new arraylist<integer>();
                items.add(t);
                items.add(99999);
                return observable.from(items).subscribeon(scheduler);
            }
        });
observable.subscribe(new observer<integer>() {...});

重要的一点提示是关于合并部分:它允许交叉。这意味着flatmap()不能够保证在最终生成的observable中源observables确切的发射
顺序。

3.concatmap()函数解决了flatmap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。
示例代码同上,将flatmap替换为concatmap,输出的结果来看是有序的

4.switchmap()和flatmap()很像,除了一点:每当源observable发射一个新的数据项(observable)时,它将取消订阅并停止监视之前那个数据项产生的observable,并开始监视当前发射的这一个。
示例代码同上,将flatmap替换为switchmap,输出的结果只剩最后一个值

5.scan()是一个累积函数,对原始observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).scan(
        new func2<integer, integer, integer>() {
 
            @override
            public integer call(integer t1, integer t2) {
                system.out.println(t1 + "+" + t2);
                return t1 + t2;
            }
        });
observable.subscribe(new observer<integer>() {...});

6.groupby()来分组元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<groupedobservable<integer, integer>> observable = observable
                .from(items).groupby(new func1<integer, integer>() {
                    @override
                    public integer call(integer t) {
                        return t % 3;
                    }
                });
observable.subscribe(new observer<groupedobservable<integer, integer>>() {
        @override
        public void onnext(final groupedobservable<integer, integer> t) {
            t.subscribe(new action1<integer>() {
                @override
                public void call(integer value) {
                    system.out.println("key:" + t.getkey()+ ", value:" + value);
                }
            });
                  
});

7.buffer()函数将源observable变换一个新的observable,这个新的observable每次发射一组列表值而不是一个一个发射。

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<list<integer>> observable = observable.from(items).buffer(2);
observable.subscribe(new observer<list<integer>>() {...});

8.window()函数和 buffer()很像,但是它发射的是observable而不是列表

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<observable<integer>> observable = observable.from(items).window(2);
observable.subscribe(new observer<observable<integer>>() {
    @override
    public void onnext(observable<integer> t) {
        t.subscribe(new action1<integer>() {
            @override
            public void call(integer t) {
                system.out.println("this action1 = " + this+ ",result = " + t);
            }
        });
        //oncompleted和onerror
});

9.cast()它将源observable中的每一项数据都转换为新的类型,把它变成了不同的class

list<father> items = new arraylist<father>();
items.add(new son());
items.add(new son());
items.add(new father());
items.add(new father());
observable<son> observable = observable.from(items).cast(son.class);
observable.subscribe(new observer<son>() {...});
 
class father {
}
 
class son extends father {
}

组合observables
1.merge()方法将帮助你把两个甚至更多的observables合并到他们发射的数据项里

list<integer> items1 = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items1.add(i);
}
list<integer> items2 = new arraylist<integer>();
for (int i = 5; i < 10; i++) {
    items2.add(i);
}
observable<integer> observable1 = observable.from(items1);
observable<integer> observable2 = observable.from(items2);
observable<integer> observablemerge = observable.merge(observable1,observable2);
observable.subscribe(new observer<integer>() {...});

2.zip()合并两个或者多个observables发射出的数据项,根据指定的函数 func* 变换它们,并发射一个新值

list<integer> items1 = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items1.add(i);
}
list<integer> items2 = new arraylist<integer>();
for (int i = 5; i < 10; i++) {
    items2.add(i);
}
observable<integer> observable1 = observable.from(items1);
observable<integer> observable2 = observable.from(items2);
observable<integer> observablezip = observable.zip(observable1,
        observable2, new func2<integer, integer, integer>() {
            @override
            public integer call(integer t1, integer t2) {
                return t1 * t2;
            }
        });
observable.subscribe(new observer<integer>() {...});

3.combinelatest()把两个observable产生的结果进行合并,这两个observable中任意一个observable产生的结果,都和另一个observable最后产生的结果,按照一定的规则进行合并。

observable<long> observable1 = observable.interval(1000,timeunit.milliseconds);
observable<long> observable2 = observable.interval(1000,timeunit.milliseconds);
observable.combinelatest(observable1, observable2,
        new func2<long, long, long>() {
            @override
            public long call(long t1, long t2) {
                system.out.println("t1 = " + t1 + ",t2 = " + t2);
                return t1 + t2;
            }
        }).subscribe(new observer<long>() {...});
thread.sleep(100000);

4.join()类似combinelatest(),但是join操作符可以控制每个observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个observable产生的结果按照一定的规则进行合并

observable<long> observable1 = observable.interval(1000,
                timeunit.milliseconds);
        observable<long> observable2 = observable.interval(1000,
                timeunit.milliseconds);
        observable1.join(observable2, new func1<long, observable<long>>() {
            @override
            public observable<long> call(long t) {
                system.out.println("left=" + t);
                return observable.just(t).delay(1000, timeunit.milliseconds);
            }
        }, new func1<long, observable<long>>() {
            @override
            public observable<long> call(long t) {
                system.out.println("right=" + t);
                return observable.just(t).delay(1000, timeunit.milliseconds);
            }
        }, new func2<long, long, long>() {
            @override
            public long call(long t1, long t2) {
                return t1 + t2;
            }
        }).subscribe(new observer<long>() {
            @override
            public void oncompleted() {
                system.out.println("observable  completed");
            }
 
            @override
            public void onerror(throwable e) {
                system.out.println("oh,no!  something   wrong   happened!");
            }
 
            @override
            public void onnext(long t) {
                system.out.println("[result=]" + t);
            }
        });
 
        thread.sleep(100000);

5.switchonnext()把一组observable转换成一个observable,对于这组observable中的每一个observable所产生的结果,如果在同一个时间内存在两个或多个observable提交的结果,只取最后一个observable提交的结果给订阅者

observable<observable<long>> observable = observable.interval(2, timeunit.seconds)
        .map(new func1<long, observable<long>>() {
            @override
            public observable<long> call(long along) {
                return observable.interval(1, timeunit.milliseconds).take(5);
            }
        }).take(2);
 
observable.switchonnext(observable).subscribe(new observer<long>() {...});
thread.sleep(1000000);

6.startwith()在observable开始发射他们的数据之前,startwith()通过传递一个参数来先发射一个数据序列

observable.just(1000, 2000).startwith(1, 2).subscribe(new observer<integer>() {...});

总结
本文主要对rxjava进行了简单的介绍,从异步编程这个角度对rxjava进行了分析;并且针对observable的过滤,转换,组合的api进行了简单的介绍,当然我们更关心的是rxjava有哪些应用场景。