欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

响应式编程 RxJava

程序员文章站 2022-05-04 08:53:53
...

 

编写不易,转载请注明(http://shihlei.iteye.com/blog/2426891)!

 

一 概述

 

最近使用Hystrix,看文档Hystrix底层基于Rxjava实现,很好奇,就一起研究了下,做个总结

 

二 响应式编程(Reactive Programming)

定义:一种基于异步数据流概念的编程模式。

核心:事件,可以被触发,传播,订阅

特点:易于并发,易于编写按条件执行代码,避免大量回调

最常见的使用场景:发送网络请求,获得结果,提交事件,更新

 

注:主要是基于观察者模式,关于观察者模式可以参考我之前的文章:《观察者模式及Guava EventBus》

 

三 RxJava 

1)概述

 

Rx: ReactiveX, 官方定义是基于“观察者模式”,实现基于事件流异步处理的lib,是用于实现响应式编程的框架。

 

特点:

(a)链式调用,异步处理复杂问题,

(b)使用推的方式,有数据主动推给消费者

 

主要类:

1)被观察者:Observable

(1)注册观察者 subscribe()

(2)事件回调

(3)增加如下方法

onComplated():用于生产者没有更多数据可用时发出通知

onError():生产者发生错误时能够发出通知

(4)Observables 能够组合而不是嵌套,避免发生大量嵌套回调

 

2)观察者:Observer 

(1)事件消费 consumer()

(2)提供如下回调可以可以模块处理各种情况

 

2)简单demo

(1)依赖

 

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.14</version>
        </dependency>

 

 

 (2)代码

就是个简单的“ 观察者模式 ” ,看看怎么做:

 

 

package x.rx.rxjava.demo;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;

/**
 * RxJava Demo
 * <p>
 *
 * @author shilei
 */
public class RxJavaDemo {

    public static void main(String[] args) {
        RxJavaDemo rxJavaDemo = new RxJavaDemo();
        rxJavaDemo.runObservable();
    }


    /**
     * 1)创建一个可观察对象,不断提供事件数据
     * 2)创建一个观察者,提供事件处理方法
     */
    public void runObservable() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //推送事件流
                emitter.onNext("Event : 1");
                emitter.onNext("Event : 2");

                emitter.onComplete();
            }
        }).subscribe(new Consumer<String>() { //接受事件处理
            @Override
            public void accept(String s) throws Exception {
                System.out.println("handle Event : " + s);
            }

        }, new Consumer<Throwable>() { // 事件产生过程产生error时
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println("Observable :  onError " + throwable.getMessage());
            }
        }, new Action() { // 收到complete通知时

            @Override
            public void run() throws Exception {
                System.out.println("Observable :  onComplete");
            }
        });
    }
}

 

 

 

3)源码分析:看看“观察者模式”怎么实现的

 

(1)创建一个被观察者Observable.create() 

 

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    } 

其中主要的被观察者:ObservableCreate<T>(source) 对象,包装了产生事件的回调方法

 

 

(2)注册观察者subscribe()方法,这里重要的是创建一个观察者:LambdaObserver,主要为支持流式开发,会将方法拆成函数接口填充。

 

 

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {

        //省略:。。。。。。

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

  

继续调用重载方法

 

 

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //省略:。。。。。。
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
             //省略:。。。。。。
        }
    }

 

找到最终的核心:回到subscribeActual(observer); 方法subscribeActual(observer) 是一个抽象方法,具体取决于我们创建Observable,Demo中,就是(1)中create的 new ObservableCreate<T>(source);看看他的subscribeActual 实现:

 

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

 

 

主要是创建一个CreateEmitter<T> 用于调用推送数据的回调。并关联了刚才的“观察者”,上面的 LambdaObserver 

这里最重要的一步,启动推动数据:source.subscribe(parent); 这步就是我们demo中提供的回调方法,还记得吗,会放下

 

 

 Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //推送事件流
                emitter.onNext("Event : 1");
                emitter.onNext("Event : 2");

                emitter.onComplete();
            }
        }

 

 

传进来的就是刚才源码中的内部类CreateEmitter<T>,所以调用 onNext,onComplete,onError 等都是调用 CreateEmitter<T> 的,其底层完成 LambdaObserver 的调用,及执行我们通过subscribe()提供的各种绑定方法

 

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

//省略 。。。。。。
}

 

 

至此流程调用链得以执行,绕了这么大圈好坏,我还要想想。

 

4)Observable(被观察者)常用操作

 

(1)创建:

Observable.from() :遍历集合或数组,逐个推送其中的数据。

Observable.just(): 推送一个普通的Java函数的返回值:非常有用,我们可以将远程调用的结果,推送到处理类中。

Observable .just():可以接受多个参数进行推送

Observable.timer()、Observable.interval(): 定时发送,可以用于代替定时任务

 

(2)预处理:

map() 在“观察者”消费前,将处理逻辑作用于每个推送的数据

 

(3)过滤:

filter() 从推送的数据中过滤出想要的进行处理

distinct() 排重,用于推送是出错引起的重复推送的情况(推送的数据一般实现Comparable接口)

distinctUnitChange() 观察者可能一直有数据产生,这里要求直到有数据变化,观察者才能接收到

 

(4)组合:同时处理多个来源的事件

marge() 将多个Observable 的数据合并到一起处理

concat() 顺序执行,特别适合使用队列的情况

 

四 RxJava 应用场景举例

1)场景一:异步网络请求

(1)依赖

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.14</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>

 

(2)代码

package x.rx.rxjava.demo;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;

/**
 * RxJava 异步网络请求
 *
 * @author shilei
 */
public class WeatherRxJavaDemo {


    private static final String WATHER_OPEN_API_URL = "http://www.weather.com.cn/data/cityinfo/101010100.html";

    public static void main(String[] args) throws Exception {
        new WeatherRxJavaDemo().run();

        //等待异步处理结束
        TimeUnit.SECONDS.sleep(1L);
    }

    private void run() {
        Observable
                .create(emitter -> {
                    try {
                        emitter.onNext(getWatherResult());
                        emitter.onComplete();
                    }catch (Throwable e){
                        emitter.onError(e);
                    }

                })
                .subscribeOn(Schedulers.newThread())//异步
                .subscribe(weatherResult -> {
                    //正常获得结果
                    System.out.println("weather : " + weatherResult);
                }, throwable -> {
                    //异常记录日志
                    System.out.println("weather error : " + throwable.getMessage());
                });
    }

    private String getWatherResult() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
        return restTemplate.getForObject(WATHER_OPEN_API_URL, String.class);
    }
}

 

2)场景二:Observable 处理链

使用concat 连接Observable ,任何一个满足要求推送数据,后面的就不会执行。

这里实现cache的逐级读取模拟。

 

 

package x.rx.rxjava.demo;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.schedulers.Schedulers;

/**
 * rxjava demo: 实现数据逐级缓存异步读取
 * <p>
 *
 * @author shilei
 */
public class ObservableConcatDemo {


    public static void main(String[] args) throws Exception {
        new ObservableConcatDemo().run();
    }


    private void run() throws Exception {
        //任务1
        Observable<String> memeryCacheJob = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String data = "memery data"; // 模拟从内存查询数据,这里模拟查不到

                if (Objects.nonNull(data)) {
                    // 推送从memery查到了,推动结果,同时整个流程
                    emitter.onNext(data);
                } else {
                    //查差不到,结束
                    System.out.println("get date from memery : null");
                    emitter.onComplete();
                }
            }
        });

        Observable<String> redisCacheJob = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String data = null; //模拟从redis 查询数据,这里模拟查不到

                if (Objects.nonNull(data)) {
                    //从redis 查到了,推送结果,同时整个流程
                    emitter.onNext(data);
                } else {
                    System.out.println("get date from redis : null");
                    emitter.onComplete();
                }
            }
        });

        Observable<String> mysqlJob = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String data = "db data"; //模拟从db 查询数据,查询成功

                if (Objects.nonNull(data)) {
                    //db 查到了,推送结果,同时整个流程
                    emitter.onNext(data);
                } else {
                    System.out.println("get date from db : null");
                    emitter.onComplete();
                }
            }
        });

        // 使用concat 任何一个满足要求推送数据,后面的就不会执行。
        Observable.concat(memeryCacheJob, redisCacheJob, mysqlJob)
                .subscribeOn(Schedulers.newThread()) //异步
                .subscribe(data -> {
                    System.out.println("data result : " + data);
                });

        //避免主线程退出
        TimeUnit.SECONDS.sleep(1L);
    }
}