RxJava的基础使用和基础流程能够看看上一篇的文章《Android的三方库 - RxJava:RxJava的使用和基本订阅流程》缓存
实际项目中常常会有一些数据获取操做,这就须要使用到RxJava的线程了。 因此让咱们来看看RxJava的线程切换。bash
首先看一个小的案例:ide
Log.i(TAG, "当前线程 : " + Thread.currentThread().getName());
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe: currentThread : " + Thread.currentThread().getName());
emitter.onNext("ONE");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: currentThread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: currentThread : " + Thread.currentThread().getName());
Log.i(TAG, "onNext: s : " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: currentThread : " + Thread.currentThread().getName());
}
});
复制代码
输出结果:oop
当前线程 : main
onSubscribe: currentThread : main
subscribe: currentThread : RxCachedThreadScheduler-1
onNext: currentThread : main
onNext: s : ONE
onComplete: currentThread : main
复制代码
而后咱们在线程中来执行这段代码:源码分析
new Thread(new Runnable() {
@Override
public void run() {
RxObservable3(); // 这个就是上面那段代码
}
}).start();
复制代码
输出结果:post
当前线程 : Thread-5
onSubscribe: currentThread : Thread-5
subscribe: currentThread : RxCachedThreadScheduler-1
onNext: currentThread : main
onNext: s : ONE
onComplete: currentThread : main
复制代码
咱们能够很明显得出几个结论:ui
1. onSubscribe运行的线程和代码运行所在的线程是一致的。
2. Observable 运行的线程是subscribeOn指定的线程。
3. Observer 运行的线程是 observeOn 指定的线程。
复制代码
如今咱们对源码来进行分析。this
在咱们的案例中的调用:spa
subscribeOn(Schedulers.io())
复制代码
进入到它的源码中:线程
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
在这里传入的参数 this 其实就是 当前建立的Observable
( 就是 ObservableCreate
)将它封装成 ObservableSubscribeOn
,并返回它的对象。
scheduler
就是你使用的 Schedulers.io()
。
咱们进入到 io()
中
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
复制代码
在这里的 IO
是一个对象,它的具体实现要看 IOTask
:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
--------
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
复制代码
能够看到咱们就这样获得了IoScheduler
的对象。先静置这里,等会使用。
咱们根据上一篇的文章的分析,知道subscribeActual()
是一个抽象方法,以前的实现是ObservableCreate
中,
如今ObservableCreate
对象被包装成为一个新的ObservableSubscribeOn
对象
所以咱们来看看 ObservableSubscribeOn
中的 subscribeActual
方法:
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
复制代码
目前看到都是没有使用过任何其余的线程,因此 观察者( observer
) 的 onSubscribe()
运行的线程就是当前的线程。
因此 onSubscribe()
执行的线程就是 当前的线程。
即:
onSubscribe: currentThread : Thread-5
复制代码
继续来看下一行代码,执行了 SubscribeTask
这个类。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
复制代码
这是一个Runnable
类,在run
方法中 执行 subscribe
方法,
这里的source
其实就是上一个Observable
,也就是ObservableCreate
的对象。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
-----------------------------------
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 这个createWorker 是一个抽象方法,具体的实现是在 Schedule的子类中,在这里也就是IoScheduler
final Worker w = createWorker();
// decoratedRun实际上仍是SubscribeTask这个对象
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将 Workder 和 Runnable 包装成为一个DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
// 执行
w.schedule(task, delay, unit);
return task;
}
复制代码
再继续看一下IoSchedule
r中的createWorker()
和 worker
的schedule()
方法。
final AtomicReference<CachedWorkerPool> pool; // AtomicReference 就是实现对象引用的原子更新
@NonNull
@Override
public Worker createWorker() {
// 获得一个EventLoopWorker对象,并传入一个缓存池
return new EventLoopWorker(pool.get());
}
--------------------------
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
.... //省略无关代码
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } // Runnable 最终是交给了threadWorker 去执行 return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 复制代码
咱们能够看下这个 ThreadWorker
,它是没有实现 scheduleActual()
这个方法的,
咱们来到它的父类 NewThreadWorker
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// 这个run 是SubscribeTask
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 继续封装成为ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
.... // 省略无关代码
Future<?> f;
try {
// 在这里用线程池执行
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
复制代码
最后就是 executor
这个线程池对象来执行任务,SubscribeTask
会被线程池执行,
也就是说 Observable
的subscribe()
方法会在IO线程中被调用。
以前的输出结果:
subscribe: currentThread : RxCachedThreadScheduler-1
复制代码
因此 subscribe()
执行的线程就是 subscribeOn
指定的线程(在这里就是IoScheduler
)。
案例:
//省略先后代码,看重点部分
.subscribeOn(Schedulers.io())//第一次
.subscribeOn(Schedulers.newThread())//第二次
.subscribeOn(AndroidSchedulers.mainThread())//第三次
复制代码
输出结果:
subscribe: currentThread : RxCachedThreadScheduler-1
复制代码
就是第一次的 subscribeOn()
的设置起做用了,这是为何呢?
由于 每一次调用 subscribeOn()
都会将以前的Observable
从新包装。
咱们来看这张图: 引用 玉刚说的 详解 RxJava 的消息订阅和线程切换原理中的图
从第三次的ObservableSubscribeOn
每次都会通知它的上一个Obsevable
最后都会上传到第一次的ObservableSubscribeOn
中,
因此不过设置多少,都只有第一次的subscribeOn()
才是生效的。
在案例中的设置:
.observeOn(AndroidSchedulers.mainThread())
复制代码
进入源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
-----------------
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
这个也是从新包装了一个ObservableObserveOn
对象,
这里的this
就是以前被封装过的 ObservableSubscribeOn
对象。
而后咱们继续进入到ObservableObserveOn
中去。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 若是是当前线程的话,直接调用ObservableSubscribeOn的subscribe()方法
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 这里的 scheduler 其实就是 AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
// 将worker封装成ObserveOnObserver
// 这里的source.subscribe是在IO线程中执行
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
接下来咱们来看看 ObserveOnObserver
中的源码:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t); // 加入队列中
}
schedule();
}
复制代码
主要是执行 schedule()
。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
复制代码
其实 ObserveOnObserver
本身也实现了 Runnable
,因此就是调用了本身。
这里的worker
就是 你传入的 主线程(mainThread
)
而后咱们来看看 run
方法:
@Override
public void run() {
if (outputFused) { // outputFused 默认是false
drainFused();
} else {
drainNormal();
}
}
复制代码
进入到 drainNormal()
中:
void drainNormal() {
int missed = 1;
// 这里就是存储消息的队列
final SimpleQueue<T> q = queue;
// 这里是 downstream 就是 自定义的Observer
final Observer<? super T> a = downstream;
.... //省略部分代码
// 队列中取出消息
v = q.poll();
.... //省略部分代码
// 这里就是调用自定义的Observer#onNext()
a.onNext(v);
....
}
}
复制代码
这样咱们最终会调用到 咱们自定义的Observer#onNext()
因此Observer#onNext()
是在 observeOn()
指定的线程中调用的。
在这里的包装顺序是
CreateEmitter
包装了SubscribeOnObserver
;
SubscribeOnObserver
包装了ObserveOnObserver
;
ObserveOnObserver
包装了自定义的Observer
。
对于调用onNext()
方法:
咱们在subscribe
中使用 ObservableEmitter
中调用了onNext()
;
会在SubscribeOnObserver
中调用onNext()
;
而后会继续调用到ObserveOnObserver#onNext()
;
因此在run
中调用的Observer
的对象实际上是自定义的Observer
。
以上就是我对于RxJava线程切换的源码分析,若有不对,请指正。