欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RxJava 2.0版本使用精炼详解

程序员文章站 2022-06-06 08:43:26
...

一,前期基础知识储备

笔者之前写过两篇关于RxJava1.0的文章

RxJava理论讲解和简单实现

RxJava常见操作符讲解》,感兴趣的读者可以看一看。里面有有关RxJava异步操作库的详细介绍,本文不再赘述。

RxJava1.0版本添加依赖:

    implementation 'io.reactivex:rxandroid:1.2.1'
    implementation 'io.reactivex:rxjava:1.2.1'

RxJava2.0版本添加依赖:

    // RxJava2.0
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'io.reactivex.rxjava2:rxjava:2.1.3'

注意二者的依赖环境已经发生改变,所以类和接口所处的包都已经发生改变。

目前RxJava已经更新到2.0版本,2.0版本和1.0版本还是有比较大的区别,这里会在下面的代码中有所体现。

二,上代码,具体实现

1. RxJava2的基础使用

从打印“Hello world”开始,给出从简单到复杂的不同实现方式。

1)简单方式

    private static void helloworldSimple() {
        // 创建消费者,消费者接收一个String类型的事件
        Consumer<String> consumer = new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: 0," + s);
            }
        };
        Observable.just("Hello WOrld").subscribe(consumer);
    }

打印结果:

accept: 0,Hello WOrld

在简单版本中,我们创建了一个消费者consumer,其实也可以称之为订阅者或者观察者,消费者实现accept()方法,接收一个字符串类型的数据或者事件。

被观察者Observable通过just()方法发出一个“Hello World”的呼唤,然后我们使用subscribe方法指定呼唤的接收者或者消费者,即consumer。那么consumer就能接收到被观察者的呼唤,打印出log。

2)复杂方式

    /**
     * Observer相比Consumer,会对消息的处理更加细化
     */
    private static void helloworldComplex() {
        // 创建一个观察者
        Observer<String> observer = new Observer<String>() {
            // 当Observable 调用subscribe方法时会回调该方法
            // onSubscribe表示已经开始观察被观察者了
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: 1");
            }
            // 调用onSubscribe方法后
            // 表示收到被观察者的消息
            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: 2," + s);
            }
            // 出错时调用
            @Override
            public void onError(Throwable e) {

            }
            // onNext之后调用
            // 表示接收消息结束
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: 3");
            }
        };

        Observable.just("Hello World").subscribe(observer);
    }

打印结果为:

onSubscribe: 1
onNext: 2,Hello World
onComplete: 3

这里我们创建了一个Observer而不是一个Consumer,Observer在接口方法上要多很多,有onSubscribe(), onNext(), onError(), onComplete()。被观察者还是Observable,也发出一声“Hello World”,然后通过subscribe指定观察者observer。

从结果中,可以看到observer中方法的调用顺序,onSubscribe()表示已经开始观察被观察者了,onNext()表示收到被观察者的消息,onComplete()表示接收消息结束。所以Observer相比Consumer会对消息的处理更加细化。

3)受控制的复杂方式

    /**
     * 在onSubscribe方法中会接收到一个Disposable对象,
     * 该对象相当于一个开关,如果开关关闭,则观察者不会接收到任何事件或数据
     */
    private static void helloworldComplexDisposable() {

        Observer<String> observer = new Observer<String>() {
            // 声明一个Disposable对象
            Disposable disposable;
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d; // 保存disposable对象
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: 2," + s);
                if (s.equals("No")) {
                    disposable.dispose();
                }
            }
            @Override
            public void onError(Throwable e) {

            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: 3");
            }
        };

        Observable.just("Hello World","No","No").subscribe(observer);
    }

打印结果如下:

onNext: 2,Hello World
onNext: 2,No

在onSubscribe()方法中会接收一个Disposable对象,该对象相当于一个开关,如果开关关闭,则观察者不会收到任何事件和数据。

使用一个变量保存Disposable对象,在onNext方法中如果传过来的字符串为“No”,则调用dispose()方法关闭事件的接收。被观察者会发出3个字符串,执行结果可以看到最后一个字符串没有打印,甚至onComplete()方法都不会执行。

4)十分复杂的方式

    /**
     * 使用Create方法创建被观察者,并实现subscribe方法,
     * 接收一个ObservableEmitter对象,即被观察者的发射器,发射器能够发出数据和事件
     * 使用链式调用,让代码看起来更加整洁,
     * 上面发出数据和事件,下面接收数据和事件
     */
    private static void helloworldPlus() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "subscribe: send hello world");
                emitter.onNext("hello world");
                Log.d(TAG, "subscribe: send No");
                emitter.onNext("No");
                Log.d(TAG, "subscribe: send No");
                emitter.onNext("No");
                Log.d(TAG, "subscribe: send complete");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

观察者还是原来的观察者,被观察者则使用create()的方法创建出来,并实现了subscribe()方法,接收一个ObservableEmitter对象,即被观察者的发射器发射器能够发出数据和事件

打印结果为:

onSubscribe: 
subscribe: send hello world
onNext: hello world
subscribe: send No
onNext: No
subscribe: send No
onNext: No
subscribe: send complete
onComplete: 

从打印结果可知,发射器每发出一个数据或者事件,观察者就会收到。上述代码中使用了链式调用,让代码看起来更加整洁。上面发出数据和事件,下面接收数据和事件。

2.RxJava操作符

RxJava提供了大量的操作符来完成对数据的处理,这些操作符也可以理解为函数。如果把RxJava比作一条数据流水线,那么操作符就是一道工序,数据通过这些工序的加工变换,组装,最后生产出我们需要的数据。

记住:操作符都是对被观察发出数据的操作。

1)过滤操作符filter

2019男篮世界杯,中国最终负于委内瑞拉,其中关键一局中国对战波兰,周琦发球失误,罚球失误,将比赛拖入加时赛,最终输掉比赛,由此由来成语“摇头叹琦”。这里演示过滤“摇头叹琦”。

    /**
     * filter 过滤操作符
     */
    private static void filter() {
        Observable.just("姚明","阿联","摇头叹琦","大侄子")
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        Log.d(TAG, "test: " + s);
                        return s.equals("摇头叹琦"); // 只检查出摇头叹琦
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

打印结果为:

test: 姚明
test: 阿联
test: 摇头叹琦
accept: 摇头叹琦
test: 大侄子

2)map变换操作符


    private static void map () {
        Student student = new Student("Jack");
        // map操作符,从student类型转换成Developer
        Observable.just(student).map(new Function<Student, Developer>() {

            @Override
            public Developer apply(Student student) throws Exception {
                Log.d(TAG, "apply: " + student.toString());
                Developer developer = new Developer();
                developer.setName(student.getName());
                developer.setSkill("Android");
                return developer;
            }
        }).subscribe(new Observer<Developer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Developer developer) {
                Log.d(TAG, "onNext: " + developer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

map操作符能够完成数据类型的转换。被观察者发送出一个student,而观察者想要接收一个developer(两个都是简单的实体类),那么在student发送给观察者之前,需要对student进行一些培训,让它转换成一个developer。

打印结果为:

onSubscribe: 
apply: Student{name='Jack'}
onNext: Developer{name='Jack', skill='Android'}
onComplete: 

结果:被观察者发送出一个Student,在观察者的onNext()方法中收到了一个Developer。

篇幅有限,文中只介绍了两种操作符的用法,更多内容请参考:

RxJava2-Android-Samples

 

3.异步

作为一个异步操作库,RxJava提供了非常方便的API来完成线程的调度,内置的线程调度器有:

  • Schedule.single(),单线程调度器,线程可复用;
  • Schedule.newThread(),为每个任务创建新的线程;
  • Schedule.io(),处理I/O任务,内部线程池实现,可根据需求增长;
  • Schedule.computation(),处理计算任务,如事件循环和回调任务
  • AndroidSchedulers.mainThread(),Android主线程调度器,属于RxAndroid。

线程调度器实际上指定事件或者数据在什么样的线程中处理。这里主要涉及两个API:subscribeOn()observeOn()

1)subscribeOn

默认情况下,被观察者和观察者在同一线程中执行。而subscribeOn方法实际上是指定被观察者的代码在那种线程中执行。

需要注意的是,subscribeOn方法调用的位置没有特殊指定,它可以放置在其他操作符的前面,中间或者后面,subscribeOn也可以被多次调用,但以第一次调用为准。

2)observerOn

observeOn指定的是后续的操作符以及观察者的代码在什么样的线程中执行。而且observeOn可以被多次调用,每次都生效。

注意observerOn既可以指定被观察者操作符的线程;又可以指定观察者所在的线程

______________________________________________________________________________

4. RxJava2.0与Retrofit2.0集成

笔者之前写过三篇Retrofit的文章

Retrofit基础理论和简单实现

Retrofit各种注解的使用

RxJava+Retrofit完成网络请求

感兴趣的读者可以去看一看,里面详细阐述了Retrofit2.0的用法,并且结合RxJava1.0实现了简单的网络请求。

1)添加依赖并开启网络权限

    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex.rxjava2:rxjava:2.1.3'

    compile 'com.squareup.retrofit2:retrofit:2.3.0'
    compile 'com.squareup.retrofit2:converter-gson:2.3.0'
    compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
 <uses-permission android:name="android.permission.INTERNET"/>

2)定义网络接口

通过豆瓣API去请求豆瓣电影Top250的网址:

http://douban.uieee.com/v2/movie/top250?start=0&count=10

 

所以定义的接口如下:

public interface Api {
    @GET("top250")
    Observable<MovieBean> listTop250(@Query("start") int start,
                     @Query("count") int count);
}

注意:豆瓣API接口网址可能会有变化,以豆瓣API的官方文档为准。

参数start表示查询数据的起始位置,count表示查询数据的个数。返回Observeable类型而不是Call类型。MovieBean为网络结果解析成的JavaBean。

3) 使用单例模式获取Retrofit实例

public class MovieRetrofit {

    private static MovieRetrofit sMovieRetrofit;
    private final Api mApi;
    // 使用单例模式
    public static MovieRetrofit getInstance() {
        if (sMovieRetrofit == null) {
            synchronized (MovieRetrofit.class) {
                if (sMovieRetrofit == null) {
                    sMovieRetrofit = new MovieRetrofit();
                }
            }
        }
        return sMovieRetrofit;
    }

    private MovieRetrofit() {
        Retrofit retrofit = new Retrofit.Builder().baseUrl("http://douban.uieee.com/v2/movie/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
        // 创建网络接口代理
        mApi = retrofit.create(Api.class);
    }
    // 返回API接口对象的实现
    public Api getApi() {
        return mApi;
    }
}

使用单例模式获取Retrofit实例对象, 实现Api的接口,获取接口实例对象.

4)发送网络请求刷新列表

接下来就可以使用接口获取一个Observable对象,通过subscribeOn指定网络请求以及网络响应解析的线程,通过ObserverOn指定刷新UI的线程,实现如下:

/**
 * 简化代码,Activity继承自ListActivity
 */
public class MovieListActivity extends ListActivity {
    private static final String TAG = "MovieListActivity";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        // 获取Observable对象,查询排名前25的电影
        Observable<MovieBean> movieBeanObservable = MovieRetrofit.getInstance().getApi().listTop250(0, 25);
        movieBeanObservable.subscribeOn(Schedulers.io())
                // 将网络结果转为电影名的集合
                .map(new Function<MovieBean, List<String>>() {
                    @Override
                    public List<String> apply(MovieBean movieBean) throws Exception {
                        List<String> array = new ArrayList<String>();
                        for (int i = 0; i < movieBean.getSubjects().size(); i++) {
                            String title = movieBean.getSubjects().get(i).getTitle();
                            Log.d(TAG, "apply: " + title);
                            array.add(title);
                        }
                        return array;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<List<String>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<String> value) {
                        ArrayAdapter<String> arrayAdapter = new ArrayAdapter<String>(MovieListActivity.this, 
                                android.R.layout.simple_list_item_1, value);
                        setListAdapter(arrayAdapter); // 设置Adapter刷新列表
                    }

                    @Override
                   public void onError(Throwable e) {
                        Toast.makeText(MovieListActivity.this, "onError", 
                                Toast.LENGTH_SHORT).show();
                        Log.d(TAG, "onError: " + e.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        Toast.makeText(MovieListActivity.this, "onComplete", 
                                Toast.LENGTH_SHORT).show();
                    }
                });
    }
}

Retrofit和RxJava结合后,就可以使用RxJava对结果进行一系列的操作,当需要对网络结果做一些复杂的处理时,RxJava的优势非常大.

更多RxJava在Android平台的使用,可以参考开源项目:

RxJava-Android-Samples

最后给出用到的实体类MovieBean(实体类比较复杂,建议使用Json网站去自动根据Json数据生成Java Bean.)

 

public class MovieBean {

	private int count;
	private int start;
	private int total;
	private String title;

	private List<SubjectsBean> subjects;

	public int getCount() {
		return count;
	}

	public void setCount(int count) {
		this.count = count;
	}

	public int getStart() {
		return start;
	}

	public void setStart(int start) {
		this.start = start;
	}

	public int getTotal() {
		return total;
	}

	public void setTotal(int total) {
		this.total = total;
	}

	public String getTitle() {
		return title;
	}

	public void setTitle(String title) {
		this.title = title;
	}

	public List<SubjectsBean> getSubjects() {
		return subjects;
	}

	public void setSubjects(List<SubjectsBean> subjects) {
		this.subjects = subjects;
	}

	public static class SubjectsBean {

		private RatingBean rating;
		private String title;
		private int collect_count;
		private String original_title;
		private String subtype;
		private String year;

		private ImagesBean images;
		private String alt;
		private String id;
		private List<String> genres;

		private List<CastsBean> casts;

		private List<DirectorsBean> directors;

		public RatingBean getRating() {
			return rating;
		}

		public void setRating(RatingBean rating) {
			this.rating = rating;
		}

		public String getTitle() {
			return title;
		}

		public void setTitle(String title) {
			this.title = title;
		}

		public int getCollect_count() {
			return collect_count;
		}

		public void setCollect_count(int collect_count) {
			this.collect_count = collect_count;
		}

		public String getOriginal_title() {
			return original_title;
		}

		public void setOriginal_title(String original_title) {
			this.original_title = original_title;
		}

		public String getSubtype() {
			return subtype;
		}

		public void setSubtype(String subtype) {
			this.subtype = subtype;
		}

		public String getYear() {
			return year;
		}

		public void setYear(String year) {
			this.year = year;
		}

		public ImagesBean getImages() {
			return images;
		}

		public void setImages(ImagesBean images) {
			this.images = images;
		}

		public String getAlt() {
			return alt;
		}

		public void setAlt(String alt) {
			this.alt = alt;
		}

		public String getId() {
			return id;
		}

		public void setId(String id) {
			this.id = id;
		}

		public List<String> getGenres() {
			return genres;
		}

		public void setGenres(List<String> genres) {
			this.genres = genres;
		}

		public List<CastsBean> getCasts() {
			return casts;
		}

		public void setCasts(List<CastsBean> casts) {
			this.casts = casts;
		}

		public List<DirectorsBean> getDirectors() {
			return directors;
		}

		public void setDirectors(List<DirectorsBean> directors) {
			this.directors = directors;
		}

		public static class RatingBean {
			private int max;
			private double average;
			private String stars;
			private int min;

			public int getMax() {
				return max;
			}

			public void setMax(int max) {
				this.max = max;
			}

			public double getAverage() {
				return average;
			}

			public void setAverage(double average) {
				this.average = average;
			}

			public String getStars() {
				return stars;
			}

			public void setStars(String stars) {
				this.stars = stars;
			}

			public int getMin() {
				return min;
			}

			public void setMin(int min) {
				this.min = min;
			}
		}

		public static class ImagesBean {
			private String small;
			private String large;
			private String medium;

			public String getSmall() {
				return small;
			}

			public void setSmall(String small) {
				this.small = small;
			}

			public String getLarge() {
				return large;
			}

			public void setLarge(String large) {
				this.large = large;
			}

			public String getMedium() {
				return medium;
			}

			public void setMedium(String medium) {
				this.medium = medium;
			}
		}

		public static class CastsBean {
			private String alt;

			private AvatarsBean avatars;
			private String name;
			private String id;

			public String getAlt() {
				return alt;
			}

			public void setAlt(String alt) {
				this.alt = alt;
			}

			public AvatarsBean getAvatars() {
				return avatars;
			}

			public void setAvatars(AvatarsBean avatars) {
				this.avatars = avatars;
			}

			public String getName() {
				return name;
			}

			public void setName(String name) {
				this.name = name;
			}

			public String getId() {
				return id;
			}

			public void setId(String id) {
				this.id = id;
			}

			public static class AvatarsBean {
				private String small;
				private String large;
				private String medium;

				public String getSmall() {
					return small;
				}

				public void setSmall(String small) {
					this.small = small;
				}

				public String getLarge() {
					return large;
				}

				public void setLarge(String large) {
					this.large = large;
				}

				public String getMedium() {
					return medium;
				}

				public void setMedium(String medium) {
					this.medium = medium;
				}
			}
		}

		public static class DirectorsBean {
			private String alt;

			private AvatarsBean avatars;
			private String name;
			private String id;

			public String getAlt() {
				return alt;
			}

			public void setAlt(String alt) {
				this.alt = alt;
			}

			public AvatarsBean getAvatars() {
				return avatars;
			}

			public void setAvatars(AvatarsBean avatars) {
				this.avatars = avatars;
			}

			public String getName() {
				return name;
			}

			public void setName(String name) {
				this.name = name;
			}

			public String getId() {
				return id;
			}

			public void setId(String id) {
				this.id = id;
			}

			public static class AvatarsBean {
				private String small;
				private String large;
				private String medium;

				public String getSmall() {
					return small;
				}

				public void setSmall(String small) {
					this.small = small;
				}

				public String getLarge() {
					return large;
				}

				public void setLarge(String large) {
					this.large = large;
				}

				public String getMedium() {
					return medium;
				}

				public void setMedium(String medium) {
					this.medium = medium;
				}
			}
		}
	}
}