欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RxJava2|Observable

程序员文章站 2022-08-12 10:52:36
RxJava2 Observable 前述 java maven rxjava 我也不知道称呼为基类好不好... 官方介绍. " " : 0..N flows, no backpressure. 0...N flows, 但不支持背压. 示例( 的简单使用) RxJava是基于观察者模式的. 是一个 ......

rxjava2 observable

前述

java-1.8

maven-3

rxjava-2.2.3

我也不知道称呼为基类好不好...

官方介绍.

0...n flows, 但不支持背压.

示例(observable的简单使用)

rxjava是基于观察者模式的. observable是一个被观察者(它的观察者是observer).

observable的操作实现类中会生成6份信号(由while实现)并发射, 可它的观察者observer则只接收前4份信号, 并逐一打印(也就是处理).

observable的操作实现类 - helloobservable.java

package yag;

import io.reactivex.observable;
import io.reactivex.observableonsubscribe;
import io.reactivex.observer;
import io.reactivex.disposables.disposable;

/**
 * @author senyag
 */
public class helloobservable {

    public void helloobservable(){

        // 代码这样排个人觉得会直观一些.

        // 初始化observable
        observable
                // create()操作符: 通过以编程方式调用observer方法从头开始创建一个observable
                .create((observableonsubscribe<integer>) observableemitter -> {
                    //observableemitter: 发射器
                    integer i = 0;
                    while ( i < 7){
                        i++;
                        observableemitter.onnext(i);
                    }
                })
            
           
                // subscribe()操作符: 根据observable的发射和通知进行操作
                .subscribe(new observer<integer>() {  // observer 就是观察者

                    private disposable mdisposable;

                    @override
                    public void onsubscribe(disposable disposable) {
                        mdisposable = disposable;
                    }

                    @override
                    public void onnext(integer i) {
                        if (i == 5){
                            // mdisposable可以切断操作, 让observer不再接收信息.
                            mdisposable.dispose();
                        }else {
                            system.out.println("现在接收到的信号是: 第" + i + "信号");
                        }
                    }

                    @override
                    public void onerror(throwable throwable) {

                    }

                    @override
                    public void oncomplete() {

                    }
                });
    }
}

执行者 - runner.java

package yag;

public class runner {

    public static void main(string[] args){
        helloobservable helloobservable = new helloobservable();
        helloobservable.helloobservable();
    }
}

执行结果

现在接收到的信号是: 第1信号
现在接收到的信号是: 第2信号
现在接收到的信号是: 第3信号
现在接收到的信号是: 第4信号

process finished with exit code 0

小结

用到了两个操作符: create()(创建发射器)和subscribe()(处理所发射的请求). 官方中针对这些操作符给出了特定的一页来介绍它们: