本文章主要是对RxJava2的线程切换流程进行源码分析,在阅读以前,能够先阅读如下文章:java
RxJava2源码分析——订阅react
本文章用的RxJava和RxAndroid版本以下:android
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
复制代码
咱们先写段示例代码,代码以下:git
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
Log.i("TanJiaJun", "subscribe方法所在的线程:" + Thread.currentThread().getName());
})
// 切换上游Observable到io线程
.subscribeOn(Schedulers.io())
// 切换下游Observer到主线程,使用AndroidSchedulers.mainThread须要使用RxAndroid这个库
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun", "onSubscribe方法所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun", "onNext方法所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun", "onError所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("TanJiaJun", "onComplete方法所在的线程:" + Thread.currentThread().getName());
}
});
复制代码
首先咱们看下Schedulers这个类。github
阅读源码后,咱们能够得知,总共有5种类型。框架
@NonNull
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
复制代码
该方法返回一个默认、共享的调度器实例用于计算工做,这能够用于事件循环、处理回调和其余计算工做。异步
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
复制代码
该方法返回一个默认、共享的调度器实例用于IO绑定的工做,这能够用于异步执行阻塞IO,默认是由单线程实例池实现的,能够重用已经启动的线程,要注意的是,这个调度器的线程数量可能会无限制增加,从而致使内存溢出(OOM)。ide
@NonNull
public static Scheduler trampoline() {
return TRAMPOLINE;
}
复制代码
该方法返回一个默认、共享的调度器实例,用于队列工做,并以FIFO方式在一个参与线程中执行它们,也就是说会等到当前线程执行完毕才会执行下个线程。oop
÷@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
复制代码
该方法返回一个默认、共享的调度器实例,该实例为每一个工做单元建立一个新线程,默认实现是建立一个新的单线程,要注意的是,每次调用Scheduler.scheduleDirect方法(及其重载方法)和Scheduler.createWorker方法均可以建立数目无限制的线程,从而形成内存溢出(OOM)。源码分析
@NonNull
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
复制代码
该方法返回一个默认、共享的调度器实例,该实例会建立一个单独的线程。
负责线程切换有两个方法:subscribeOn和observeOn。
这个方法负责切换上游Observable的线程,代码以下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
根据上篇文章阅读subscribe方法源码的经验,咱们只看ObservableSubscribeOn类就能够了,要注意的点我都写上注释了,代码以下:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// source是上游Observable
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 建立SubscribeOnObserver对象,传入下游Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// 建立SubscribeTask任务,使用指定的调度器进行调度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
// 省略部分代码
}
// SubscribeTask继承Runnable,因此咱们能够看下它的run方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 这里已经切换到想要的线程了,source是上游Observable,调用它的subscribe方法,而且传入下游observer,根据上篇文章的经验,上游Observable的subscribeActual方法会被执行
source.subscribe(parent);
}
}
}
复制代码
咱们的示例代码中调用subscribeOn方法传入的是Schedulers.io(),看下这个方法对应的源码,代码以下:
// Schedulers.java
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
复制代码
IO是一个final的静态变量,它是经过Schedulers这个类的静态代码块赋值的,代码以下:
static {
// 省略部分代码
IO = RxJavaPlugins.initIoScheduler(new IOTask());
// 省略部分代码
}
复制代码
它会建立一个IOTask对象,代码以下:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
复制代码
这个类实现了Callable接口,而且重写了call方法,返回IoHolder.DEFAULT,代码以下:
// DEFAULT是final的静态类IoHolder里的final的静态变量
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
复制代码
咱们看到这里建立了一个IoScheduler对象,代码以下:
// IoScheduler.java
static final RxThreadFactory WORKER_THREAD_FACTORY;
static {
// 省略部分代码
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));
// RxThreadFactory是一个线程工厂,能够删除对new Thread调用的硬链接
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
// 省略部分代码
// 建立CachedWorkerPool对象,第二个参数是传入TimeUnit,若是是null的话,是不会建立线程池的,下面会讲到
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
// IoScheduler的构造方法
public IoScheduler() {
// 这里会调用下面那个方法
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
// 赋值给成员变量threadFactory
this.threadFactory = threadFactory;
// 用CachedWorkerPool建立一个原子引用
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
// 调用start方法
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
// compareAndSet方法第一个参数是预期值,第二个参数是新值,若是NONE==update的话,就会将值原子性地设置会更新值,而且返回true,不然不会更新,而且返回false,而后调用shutdown方法
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
static final class CachedWorkerPool implements Runnable {
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
// 当unit不是null的话,就会建立一个newScheduledThreadPool线程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
}
复制代码
咱们再回到上面说的ObservableSubscribeOn类,看到以下这段代码:
// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// 调用了scheduler的scheduleDirect方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
复制代码
咱们再看下scheduleDirect方法,代码以下:
// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 调用createWorker方法,createWorker是个抽象方法,刚才咱们所说的IoScheduler是Scheduler的实现类,它重写了createWorker方法
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// DisposeTask实现了Runnable接口
DisposeTask task = new DisposeTask(decoratedRun, w);
// 调用worker的scheduler方法
w.schedule(task, delay, unit);
return task;
}
复制代码
咱们再看下createWorker方法,代码以下:
// IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
// 建立EventLooperWork,而且传入从原子引用获得的当前的值
return new EventLoopWorker(pool.get());
}
复制代码
EventLoopWorker是IoScheduler的一个final的静态的内部类,继承Scheduler.Worker,代码以下:
// IoScheduler.java
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
// 若是已经取消订阅了,就再也不调度
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// 调用了ThreadWorker的scheduleActual方法
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
// ThreadWorker继承NewThreadWorker
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
复制代码
咱们看下NewThreadWorker的scheduleActual方法,代码以下:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
// ScheduledExecutorService是个接口,继承ExecutorService接口
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
// 调用SchedulerPoolFactory的create方法,建立线程池
executor = SchedulerPoolFactory.create(threadFactory);
}
// 省略部分代码
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
// executor.submit和excutor.schedule其实最后会调用同一个方法,执行这个方法后任务就提交上去了
if (delayTime <= 0L) {
// 若是不须要延迟就调用submit方法,提交一个有返回结果的任务
f = executor.submit(task);
} else {
// 若是须要延迟就调用schedule方法,提交一个有返回结果的任务
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
// 省略部分代码
}
复制代码
到这里,上游Observable的代码就会被切换到对应的线程了,咱们这里是拿**Schedulers.io()**做为例子来说解,其余类型你们能够本身看下源码。
结论:订阅事件是从下往上传递的,最终传递到上游Observable的subscribe方法。
这个方法负责切换下游Observer的线程,代码以下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
// 调用下面那个方法
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
也像以前那样,咱们只须要看ObservableObserveOn这个方法,代码以下:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
// source是上游Observable
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 判断要指定的调度器是否是TrampolineScheduler,也就是是否是传入Schedulers.trampoline()
if (scheduler instanceof TrampolineScheduler) {
// 若是是,就直接调用subscribe方法,由于TrampolineScheduler是在当前线程调度的,上面也说起过
source.subscribe(observer);
} else {
// 若是不是,就经过调度器建立worker,而后调用subscribe方法传入建立的ObserveOnObserver对象
Scheduler.Worker w = scheduler.createWorker();
// 与subscribeOn不一样,subscribe方法不是在已经切换好的线程中执行
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ObserveOnObserver是一个final的静态内部类,实现了Runnable接口,因此咱们看下它的run方法
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
// 省略部分代码
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
// 若是checkTerminated方法返回false就会return
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
// 最后调用下游Observer的onNext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
// 省略部分代码
@Override
public void run() {
// 到这里已经切换到想要的线程了,outputFused变量是经过requestFusion设置的
if (outputFused) {
drainFused();
} else {
// 咱们主要看这个方法
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
// delayError在咱们调用的observeOn方法中是传入false的
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
// 若是Throwable不是null的话,就会调用下游Observer的onError方法
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
// 若是任务队列是空的话,证实任务执行完毕,就会调用下游Observer的onComplete方法
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
// 这个方法和背压(Backpressure)有关系,不是本文章的主要内容,暂时不讨论
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
// 省略部分代码
}
}
复制代码
结论:观察事件是从上往下传递的,最终传递到下游Observer的回调方法,例如:onNext方法、onComplete方法、onError方法,注意onSubscribe方法所在的线程是当前的线程,不会随着订阅线程或者观察线程的切换而改变。
咱们试下屡次调用subscribeOn方法,把示例代码改为以下:
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
Log.i("TanJiaJun", "subscribe方法所在的线程:" + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun", "onSubscribe方法所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun", "onNext方法所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun", "onError所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("TanJiaJun", "onComplete方法所在的线程:" + Thread.currentThread().getName());
}
});
复制代码
Log以下:
根据以前的源码分析,其实它像以下代码:
new Thread("AndroidSchedulers.mainThread()") {
@Override
public void run() {
new Thread("Schedulers.io()") {
@Override
public void run() {
System.out.println("上游Observable的subscribe方法所在的线程:" + getName());
}
}.start();
}
}.start();
复制代码
Log以下:
结论:若是咱们屡次调用subscribeOn方法,切换订阅线程的话,上游Observable的subscribe方法所在的线程只会是在第一次切换的线程,上面也提到过了,由于订阅事件是从下往上传递的,最终传递到上游Observable的subscribe方法。
咱们试下屡次调用obsesrveOn方法,把示例代码改为以下:
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
Log.i("TanJiaJun", "subscribe方法所在的线程:" + Thread.currentThread().getName());
})
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun", "onSubscribe方法所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun", "onNext方法所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun", "onError所在的线程:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("TanJiaJun", "onComplete方法所在的线程:" + Thread.currentThread().getName());
}
});
复制代码
Log以下:
根据以前的源码分析,其实它像以下代码:
new Thread("AndroidSchedulers.mainThread()") {
@Override
public void run() {
new Thread("Schedulers.io()") {
@Override
public void run() {
System.out.println("下游Observer的回调方法所在的线程:" + getName());
}
}.start();
}
}.start();
复制代码
Log以下:
结论:若是咱们屡次调用observeOn方法,切换观察线程的话,下游Observer的回调方法,例如:onNext方法、onComplete方法、onError方法,它们所在的线程会随着每次切换而切换,由于观察事件是从上往下传递的,最终传递到下游Observer的回调方法。
Demo:RxJavaDemo
个人GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架(Kotlin-MVVM)
个人掘金:谭嘉俊
个人简书:谭嘉俊