RxJava subscribeOn 与 onserveOn 线程切换记录
通过多个实例来对比分析
注:test4()运行在主线程,所以下文中的 主线程均为指 test4()所在的线程。当然,可以将test4()运行在非主线程
- 1、不使用subscribeOn 和 observeOn
public class RxJavaDemoActivity extends BasicActivity{
@Override
protected void onCreate( Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjavamain);
test4();
}
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
}
输出结果如下:
08-17 09:06:14.028 8581-8581/net.yuanjin I/test0: main ,id = 1
08-17 09:06:14.028 8581-8581/net.yuanjin I/test1: main ,id = 1
08-17 09:06:14.028 8581-8581/net.yuanjin I/test2: main ,id = 1
08-17 09:06:14.036 8581-8581/net.yuanjin I/test3: main ,id = 1
可以看到,不使用 subscribeOn 和 observeOn 的时候,全部操作都是在主线程中执行
- 2、仅使用 subscribeOn
/**
* 仅使用 subscribeOn(Schedulers.newThread())
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
return getResources().getDrawable(integer);
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
08-17 09:13:02.052 15056-15056/net.yuanjin I/test0: main ,id = 1
08-17 09:13:02.052 15056-15402/net.yuanjin I/test1: RxNewThreadScheduler-2 ,id = 272
08-17 09:13:02.052 15056-15402/net.yuanjin I/test2: RxNewThreadScheduler-2 ,id = 272
08-17 09:13:02.056 15056-15402/net.yuanjin I/test3: RxNewThreadScheduler-2 ,id = 272
可以看到,当使用了 subscribeOn(Schedulers.newThread())
后,从test1 - test3全都在新的同一线程中执行。所以 subscribeOn 影响了 从触发事件(test1)开始及其之后的所有事件的线程(如果后面事件的线程未重定义的话)
- 3、仅使用 observeOn
/**
* 仅使用 observeOn(Schedulers.newThread())
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
return getResources().getDrawable(integer);
}
})
.observeOn(Schedulers.newThread())//注意observeOn的位置,在 test3 之前
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
08-17 09:21:24.032 23435-23435/net.yuanjin I/test0: main ,id = 1
08-17 09:21:24.036 23435-23435/net.yuanjin I/test1: main ,id = 1
08-17 09:21:24.036 23435-23435/net.yuanjin I/test2: main ,id = 1
08-17 09:21:24.044 23435-23772/net.yuanjin I/test3: RxNewThreadScheduler-1 ,id = 332
可以看到, 代码中 observeOn 在 test3 之前,它使在其之后的test3 执行在新的线程,而在其之前的都执行在主线程
- 4、仅使用 observeOn(Schedulers.newThread()),注:与3相比发生位置变化
/**
* 仅使用 observeOn(Schedulers.newThread())
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.observeOn(Schedulers.newThread())//注意 observeOn 位置在 test2 之前
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
return getResources().getDrawable(integer);
}
})
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
结果输出:
08-17 09:27:14.612 29242-29242/net.yuanjin I/test0: main ,id = 1
08-17 09:27:14.616 29242-29242/net.yuanjin I/test1: main ,id = 1
08-17 09:27:14.616 29242-29442/net.yuanjin I/test2: RxNewThreadScheduler-1 ,id = 362
08-17 09:27:14.620 29242-29442/net.yuanjin I/test3: RxNewThreadScheduler-1 ,id = 362
可以看到, 代码中 observeOn 在 test2 之前,它使在其之后的test2、test3 执行在新的线程,而在其之前的都执行在主线程
综上,可以看到 observeOn 会改变在其位置之后的所有操作的线程
- 5、同时使用 observeOn() 和 subscribeOn()
/**
* 同时使用 observeOn() 和 subscribeOn()
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
return getResources().getDrawable(integer);
}
})
.subscribeOn(Schedulers.newThread())//设置新线程
.observeOn(AndroidSchedulers.mainThread())//设置为主线程
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
08-17 09:39:04.976 8056-8056/net.yuanjin I/test0: main ,id = 1
08-17 09:39:04.984 8056-9290/net.yuanjin I/test1: RxNewThreadScheduler-1 ,id = 406
08-17 09:39:04.984 8056-9290/net.yuanjin I/test2: RxNewThreadScheduler-1 ,id = 406
08-17 09:39:05.012 8056-8056/net.yuanjin I/test3: main ,id = 1
可以看到,subscribeOn 改变了 test1 及其之后的操作的运行线程为新线程。又由于 observeOn 在 test3 之前,所以又将test3 的运行线程改为 主线程
- 6、同时使用 observeOn() 和 subscribeOn()
/**
* 同时使用 observeOn() 和 subscribeOn()
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.observeOn(AndroidSchedulers.mainThread())//设置为主线程,在test2 之前
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
return getResources().getDrawable(integer);
}
})
.observeOn(Schedulers.newThread())//设置为新线程,在test3 之前
.subscribeOn(Schedulers.newThread())
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
08-17 09:46:35.132 16555-16555/net.yuanjin I/test0: main ,id = 1
08-17 09:46:35.136 16555-16999/net.yuanjin I/test1: RxNewThreadScheduler-1 ,id = 491
08-17 09:46:35.180 16555-16555/net.yuanjin I/test2: main ,id = 1
08-17 09:46:35.184 16555-17000/net.yuanjin I/test3: RxNewThreadScheduler-2 ,id = 492
可以看到, subscribeOn 改变了 test1 及 其之后的线程为 新线程。 而由于在 test2 之前使用 observeOn 改变线程为 主线程,所以test2 运行在主线程。后面又由于在test3 之前使用 subscribeOn 改变线程为新线程, 所以test3 运行在了新线程
至此,可以看到
1、subscribeOn 改变了 从触发事件(test1)及其之后的所有操作的线程,且与位置放置无关
2、observeOn改变了在其之后的所有操作的线程,可多次使用,与位置有关
- 7、使用多个 subscribeOn
/**
* 使用多个 subscribeOn
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.subscribeOn(Schedulers.io())//io线程
.subscribeOn(Schedulers.newThread())//新线程
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onStart() {
Log.i("test onStart",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
08-18 16:00:52.886 23531-23531/net.yuanjin I/test0: main ,id = 1
08-18 16:00:52.886 23531-23531/net.yuanjin I/test onStart: main ,id = 1
08-18 16:00:52.890 23531-24047/net.yuanjin I/test1: RxIoScheduler-2 ,id = 497
08-18 16:00:52.890 23531-24047/net.yuanjin I/test2: RxIoScheduler-2 ,id = 497
08-18 16:00:52.898 23531-24047/net.yuanjin I/test3: RxIoScheduler-2 ,id = 497
可以看到,当有多个 subscribeOn 同时使用时,以最先使用的 subscribeOn 为准(另外,经验证,subscribeOn 与位置放置无关)
这个例子也可以看到, Subscriber 的 onStart()方法执行在最前,且运行线程为 主线程
- 8、doOnSubscribe
/**
* doOnSubscribe
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.doOnSubscribe(new Action0() {//注意 doOnSubscribe 在 subscribeOn 之前
@Override
public void call() {
Log.i("test doOnSubscribe",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onStart() {
Log.i("test onStart",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
net.yuanjin I/test0: main ,id = 1
net.yuanjin I/test onStart: main ,id = 1
net.yuanjin I/test doOnSubscribe: RxNewThreadScheduler-1 ,id = 625
net.yuanjin I/test1: RxNewThreadScheduler-1 ,id = 625
net.yuanjin I/test2: RxNewThreadScheduler-1 ,id = 625
net.yuanjin I/test3: RxNewThreadScheduler-1 ,id = 625
可以看到, doOnSubscribe() 执行在 onStart() 之后,在 test1(触发事件)之前,
且 doOnSubscribe 运行线程与 触发事件(test1)的线程一致。
另外,经验证, doOnSubscribe 的执行时序与代码放置位置无关
- 9、doOnSubscribe() 线程指定
Subscriber 的 onStart() 和 Observable 的 doOnSubscribe() 方法都在 subscribe() 调用后,在事件发送前(test1)执行,但区别在于 doOnSubscribe() 可以指定线程。
以前面例8为例,doOnScribe() 运行在新线程,如果我想让其运行在其他线程(如io线程)呢?看下面代码
/**
* doOnSubscribe() 线程指定
*/
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.subscribeOn(Schedulers.newThread())//指定线程为 新线程
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.i("test doOnSubscribe",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
})
.subscribeOn(Schedulers.io())//指定线程为 io线程
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onStart() {
Log.i("test onStart",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
@Override
public void onNext(Drawable drawable) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
net.yuanjin I/test0: main ,id = 1
net.yuanjin I/test onStart: main ,id = 1
net.yuanjin I/test doOnSubscribe: RxIoScheduler-2 ,id = 850
net.yuanjin I/test1: RxNewThreadScheduler-1 ,id = 851
net.yuanjin I/test2: RxNewThreadScheduler-1 ,id = 851
net.yuanjin I/test3: RxNewThreadScheduler-1 ,id = 851
可以看到, doOnSubscribe 运行在 io线程了,而 触发事件(test1)之后的事件处理都仍然在 新线程中。
线程切换逻辑再梳理
差不多就到这里了。前面例子只是大概的显示了线程切换。
具体的源码还没看,下面先记录下大概的线程切换逻辑
参考前面的文章 RxJava 链式调用流程源码记录分析(以map()为例)
0
如上图所示,RxJava 的链式调用
1、 在代码的构建阶段(调用 subscribe() 方法前),是按照代码书写顺序从上往下逐级构建的
2、在调用 subscribe() 方法后,先是从下往上,逐级回溯到触发事件(上例中的test1发生处) ,在这个过程中,遇到 subscribeOn 则切换线程
3、再从触发事件开始,逐级往下执行各级的 call() 直至 Subscriber 的 onNext()方法结束,在这个过程中,遇到 observeOn() 则切换线程
最后,来个长例子结束:
private void test4() {
Log.i("test0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i("test1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
subscriber.onNext(R.drawable.picasso_drawable);
}
})
.observeOn(Schedulers.io()) // observeOn 设置线程为 io线程
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
Log.i("test2",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
return integer;
}
})
.observeOn(Schedulers.newThread()) // observeOn 设置线程为 新线程
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Log.i("test3",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.i("test doOnSubscribe 1",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
})
.subscribeOn(Schedulers.newThread()) // subscribeOn 设置线程为 新线程
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.i("test doOnSubscribe 0",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
})
.subscribeOn(Schedulers.io()) // subscribeOn 设置线程为 io线程
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onStart() {
Log.i("test onStart",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
@Override
public void onNext(Drawable drawable) {
Log.i("test4",Thread.currentThread().getName().toString() + " ,id = "+Thread.currentThread().getId());
}
});
}
输出结果:
net.yuanjin I/test0: main ,id = 1
net.yuanjin I/test onStart: main ,id = 1
net.yuanjin I/test doOnSubscribe 0: RxIoScheduler-2 ,id = 959
net.yuanjin I/test doOnSubscribe 1: RxNewThreadScheduler-1 ,id = 960
net.yuanjin I/test1: RxNewThreadScheduler-1 ,id = 960
net.yuanjin I/test2: RxIoScheduler-3 ,id = 961
net.yuanjin I/test3: RxNewThreadScheduler-2 ,id = 962
net.yuanjin I/test4: RxNewThreadScheduler-2 ,id = 962
分析:
1、首先:代码从上往下构建
test0 : 此处test4 运行在主线程,所以 输出为 main 线程
2、执行 subscribe() 方法 ,开始从下往上回溯
3、先执行onStart 犯法
test onStart : 先执行 start 方法, 此处仍在 main 线程
4、执行 .subscribeOn(Schedulers.io())
改变了线程为io线程
5、执行 .doOnSubscribe(new Action0() { //先是 doOnSubscribe 0
doOnSubscribe 0: RxIoScheduler-2 ,id = 959 //由于4步改变了线程,所以此处为 io线程
6、执行 .subscribeOn(Schedulers.newThread())
改变线程为 新线程
7、执行 .doOnSubscribe(new Action0() { //这次是 doOnSubscribe 1
doOnSubscribe 1: RxNewThreadScheduler-1 ,id = 960 //所以此处线程变为了新线程
8、代码回溯经过了 observeOn 和 map 等,但 在回溯过程中是不发生作用的
9、终于回溯到 Observable.create 中的 call 方法了
test1: RxNewThreadScheduler-1 ,id = 960 //由于中间没有线程更换,所以 test1 处的线程与 7 处的一致,仍为 新线程
10、代码要从上往下运行各级的 call 方法处理逻辑了
11、执行 .observeOn(Schedulers.io())
显示碰到了 observeOn , observeOn 在此阶段就开始发挥作用了,此处将线程更换为 io线程
12、执行到 .map(new Func1
参考资料:
上一篇: redis灵魂拷问:聊一聊主从复制缓冲区
下一篇: Redis对象——集合(Set)