欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  Java

深入浅出RxJava_06[传输过程的条件&组合操作]的详情

程序员文章站 2022-05-19 09:25:54
...


本教程基于RxJava1.x版本进行全面讲解,后续课程将陆续更新,敬请关注…

下面的一系列函数都是用来判断传输的数据是否符合某些条件。

  1. all - 判定是否Observable发射的所有数据都满足某个条件,如果原始Observable的任何一项数据不满足条件就返回False

  2. Amb - 给定2个Observable,只发送那个首先发送数据的Observable,后者会被忽略掉。

  3. Contains - 判定一个Observable是否发射一个特定的值

  4. DefaultIfEmpty - 发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值

  5. SequenceEqual - 判定两个Observables是否发射相同的数据序列

  6. SkipUntil - SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable.

  7. SkipWhile - SkipWhile订阅原始的Observable,但是忽略它的发射物,直到你指定的某个条件变为false的那一刻,它开始发射原始Observable。

  8. TakeUntil - 使用一个谓词函数而不是第二个Observable来判定是否需要终止发射数据,它的行为类似于takeWhile。

  9. TakeWhile - TakeWhile发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。

1.all

判定是否Observable发射的所有数据都满足某个条件,如果原始Observable的任何一项数据不满足条件就返回False

如有群学生信息,数据如下:

private ArrayList<Student> initStudents() {
    ArrayList<Student> persons = new ArrayList<>();
    persons.add(new Student("张三", 16));
    persons.add(new Student("李四", 21));
    persons.add(new Student("王二麻子", 18));
    return persons;
}

入学的时候要求岁数不大于20岁才能读书,这样,就可以使用all函数来判断。

Observable<Student> observable = Observable.from(initStudents());
Observable<Boolean> booleanObservable = 
                observable.all(new Func1<Student, Boolean>() {
                    //设置判断规则
                    @Override
                    public Boolean call(Student student) {
                        return student.age<20;
                    }
                });
booleanObservable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        //这里打印了false  因为学生里面李四21岁
        Log.i(TAG, "call: "+aBoolean);
    }
});

2.Amb

给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据,Amb将忽略和丢弃其它所有Observables的发射物。

Observable<String> o1 = Observable.just("a", "b", "c");
Observable<String> o2 = Observable.just("d", "f", "e");

//判断上面的2个被观察者对象哪个先发送数据 发送数据慢的被观察者将被忽略
Observable<String> observable = Observable.amb(o2,o1);

observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, "call: "+s);
    }
});

3.Contains

给Contains传一个指定的值,如果原始Observable发射了那个值,它返回的Observable将发射true,否则发射false。

Observable<String> o1 = Observable.just("a", "b", "c");
Observable<Boolean> observable = o1.contains("a");

observable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        //true
        Log.i(TAG, "call: "+aBoolean);
    }
}

4.DefaultIfEmpty

DefaultIfEmpty简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompletedd的形式),DefaultIfEmpty返回的Observable就发射一个你提供的默认值。

Observable<String> o1 = Observable
        .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //这里没有发送任何onNext事件,而是调用onCompleted()事件
                subscriber.onCompleted();
            }
        });
    //如果上面的o1不发送任何onNext而结束 则发送一个默认的值d
    Observable<String> o2 = o1.defaultIfEmpty("d");
    o2.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            //打印d
            Log.i(TAG, "call: "+s);
        }
    });

比如在被观察者访问网络请求的时候,后台服务器没有返回返回数据,可以使用一个默认的数据代替。

5.SequenceEqual

判定两个Observables是否发射相同的数据序列,它会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false。

Observable<Integer> o1 = Observable.just(1,2,3);
Observable<Integer> o2 = Observable.just(1,2,3);
Observable<Boolean> observable = Observable.sequenceEqual(o1, o2);

observable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        Log.i(TAG, "call: "+aBoolean);
    }
});

6.1 SkipUntil

SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。

Observable<Integer> o1 = Observable.just(1,2,3);
Observable<Integer> o2 = o1.skipUntil(Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        //如果不发送如下信息  则上面的1,2,3都不会发送
        //如果发送了4 则直接发送1,2,3 但是4 5并没有发送
        subscriber.onNext(4);
        subscriber.onNext(5);
    }
}));

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

输出:

call: 1
call: 2
call: 3

6.2 SkipUntil

SkipWhile订阅原始的Observable,但是忽略它的发射物,直到你指定的某个条件变为false的那一刻,它开始发射原始Observable。

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.skipWhile(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer value) {
        //直到发送的数据"等于"4 才开始从4发送数据 这里打印4,5,6
        return value!=4;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

输出:

call: 4
call: 5
call: 6

7.1 TakeUntil

使用一个takeUntil内部的函数来决定是否终止后面的数据继续发送。代码如下:

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.takeUntil(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer value) {
        //发送数据直到等于4才停止发送
        return value==4;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

上面的代码类似于:

ArrayList<Integer> datas=new ArrayList<>();
for (int i = 1; i <= 6 ; i++) {
    datas.add(i);
}
int index=0;
while(datas.get(index)<=4){
    Log.i(TAG, "onCreate: "+datas.get(index));
    index++;
}

7.2 TakeWhile

TakeWhile发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.takeWhile(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
        //发送数据 直到发送的数据等于才停止发送
        return integer!=5;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

上面的代码打印出了1~4,类似于如下的while语句:

ArrayList<Integer> datas=new ArrayList<>();
for (int i = 1; i <= 6 ; i++) {
    datas.add(i);
}
int index=0;
while(datas.get(index)!=5){
    Log.i(TAG, "onCreate: "+datas.get(index));
    index++;
}

下面展示的函数可用于组合多个Observables。

  1. Merge - 交错合并多个Observables的发射物

  2. StartWith - 在数据序列的开头插入指定的项

  3. Zip - 根据函数提供的规则合并两个observable的发射数据,发射数据的总数以发送最小的observable发射个数为准。

8.Merge

合并多个Observables的发射物。使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。

Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

示例代码

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

输出

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

9.StartWith

在数据序列的开头插入指定的项

如果你想要一个Observable在发射数据之前先发射一个指定的数据序列,可以使用StartWith操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用Concat操作符。)

Observable<String> o1 = Observable.just("模拟网络请求");
Observable<String> o2 = o1.startWith("网络请求之前的准备工作");
o2.subscribe(new Action1<String>() {
    @Override
    public void call(String value) {
        Log.i(TAG, value);
    }
});

输出:

网络请求之前的准备工作
模拟网络请求

类似的函数有:

  • Javadoc: startWith(Iterable))

  • Javadoc: startWith(T)) (最多接受九个参数)

  • Javadoc: startWith(Observable))

10.Zip

假设被观察者o1发送的数据为1 2 ,被观察者o2发送的数据为3,4,5。那么zip压缩发送的数据个数以最低个也就是2个为准。并且 新发送的每个数据以Func2的返回数据为准。

  • Javadoc: zip(Observable1,Observable2,Func2));

示例代码如下:

Observable<Integer> o1 = Observable.just(1,2);
Observable<Integer> o2 = Observable.just(3,4,5);

Observable<Integer> zipObservable = Observable
        .zip(o1, o2, new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer o, Integer o2) {
                return o + o2;
            }
        });

zipObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer s) {
        Log.i(TAG, "call: "+s);
    }
});

输出如下:

call: 4 // 1+3
call: 6   // 2+4


以上就是深入浅出RxJava_06[传输过程的条件&组合操作]的详情的内容,更多相关内容请关注PHP中文网(www.php.cn)!