RxJava2.0变换操作符(二)
程序员文章站
2024-02-29 08:50:34
...
1、转换操作符:
①、Map(作用:数据类型色转换)
//实现的功能:将Integer类型转换成String类型
Observable.create(
//创建发射器
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
//发射事件
e.onNext(0);
e.onNext(1);
e.onNext(2);
}
})
//转换
.map(
//Integer类型转换成String类型
new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "Integer类型转换成String类型:"+integer;
}
})
//订阅
.subscribe(
//创建简单的观察者
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("接收到的事件:"+s);
}
}
);
运行结果:
②FlatMap(作用:将被观察者发送的事件序列进行 拆分 & 单独转换,在合并成一个新的事件序列,最后进行发送)
原理:
1、为事件序列中每个事件都创建一个 Observable(被观察者)
2、将对每一个 原始事件 转换后的 新的事件 都放入到对应的 Observable
3、将新建的每一个 Observable 都合并到一个 新建的、总的 Observable
4、新建的、总的 Observable 将合并的事件序列 发送给观察者 Observer
示意图:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> lists=new ArrayList<String>();
//把一个事件拆分成n个子事件
for (int i = 0; i < 5; i++) {
lists.add("我是事件:"+integer+",被拆分后的子事件:"+i);
}
//发射数组遍历后的数据
return Observable.fromIterable(lists);
}
})
//订阅
.subscribe(
//创建简单的观察者
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("接收到的事件:"+s);
}
}
);
运行结果:
注意:新合并生成的事件序列顺序是无序的,与旧序列发送的顺序无关。(上面的运行结果为有序纯属巧合)。
③ConcatMap(作用:类似FlatMap,区别,它是有序的)
Observable.create(
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
}
}
)
//有序的转换符
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> lists=new ArrayList<String>();
//把事件拆分成N份
for (int i = 0; i < 5; i++) {
lists.add("我是事件:"+integer+",被拆分成:"+i);
}
return Observable.fromIterable(lists);
}
})
//订阅
.subscribe(
//创建观测者
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("接收到的事件:"+s);
}
}
);
运行结果:
总结:ConcatMap,新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序。而 FlatMap,新合并的事件序列是无序的。
④Buffer
作用:定期从 被观测者需要发送的事件中 取出一定数量的事件 并且 放到缓存区中,最终发送
Observable.create(
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onNext(5);
}
}
)
//参数:设置缓存区的大小和步长
//缓存区的大小 = 每次从被观察者中获取的事件数量
//步长 = 每次获取新事件的数量
.buffer(3,1)
.subscribe(
new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
System.out.println("缓存区里存放放的事件总数:"+integers.size());
for (Integer integer : integers) {
System.out.println("从缓存区获取到的事件:"+integer);
}
}
}
);
运行结果:
2、开发实际应用:使用FlatMap嵌套网络请求(比如我们实际开发中的,先注册,注册完成以后去做登录操作。这样就是两步的操作,可以直接用FlatMap来实现)
//举例,为了模拟注册登录,我只是模拟两次不同的网络请求,而实际的注册登录也是同一个道理:
final Observable<RegisterEntity> observableForRegister = RetrofitFactory.getRetrofit().create(NetApi.class).doRegister();
final Observable<LoginEntity> observableForLogin = RetrofitFactory.getRetrofit().create(NetApi.class).doLogin();
observableForRegister
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//先发起注册请求
.doOnNext(new Consumer<RegisterEntity>() {
@Override
public void accept(@NonNull RegisterEntity registerEntity) throws Exception {
System.out.println("第一次网络请求:" + "注册成功");
System.out.println("注册返回的结果:" + registerEntity.toString());
}
})
//切换到io线程进行登录请求
.subscribeOn(Schedulers.io())
//切换到主线程处理登录请求返回的结果
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<RegisterEntity, ObservableSource<LoginEntity>>() {
@Override
public ObservableSource<LoginEntity> apply(@NonNull RegisterEntity registerEntity) throws Exception {
//将注册请求转换成登录请求,即发起第二次网络请求(即登录请求)
return observableForLogin;
}
})
.subscribe(
//创建观察者
new Consumer<LoginEntity>() {
@Override
public void accept(@NonNull LoginEntity loginEntity) throws Exception {
System.out.println("第二次网络请求:" + "登录成功");
System.out.println("登录成功还回的结果:" + loginEntity.toString());
}
}
, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("登录失败" + throwable);
}
}
);
上一篇: 注意力机制和Seq2seq模型
下一篇: Python实现单词翻译功能