RxJava的被观察者在使用操做符时能够利用线程调度器--Scheduler来切换线程,例如java
Observable.just("aaa","bbb")
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});复制代码
被观察者(Observable、Flowable...)发射数据流以后,其操做符能够在不一样的线程中加工数据流,最后被观察者在前台线程中接受并响应数据。ios
下图不一样的箭头颜色表示不一样的线程。
bash
Schedulers 是一个静态工厂类,经过分析Schedulers的源码能够看到它有多种不一样类型的Scheduler。下面是Schedulers的各个工厂方法。app
computation()用于CPU密集型的计算任务,但并不适合于IO操做。less
@NonNull
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}复制代码
io()用于IO密集型任务,支持异步阻塞IO操做,这个调度器的线程池会根据须要增加。对于普通的计算任务,请使用Schedulers.computation()。异步
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}复制代码
trampoline()在RxJava2中跟RxJava1的做用是不一样的。在RxJava2中表示当即执行,若是当前线程有任务在执行,则会将其暂停,等插入进来的新任务执行完以后,再将原先未完成的任务接着执行。在RxJava1中表示在当前线程中等待其余任务完成以后,再执行新的任务。ide
@NonNull
public static Scheduler trampoline() {
return TRAMPOLINE;
}复制代码
newThread()为每一个任务建立一个新线程。函数
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}复制代码
single()拥有一个线程单例,全部的任务都在这一个线程中执行,当此线程中有任务执行时,它的任务们将会按照先进先出的顺序依次执行。oop
@NonNull
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}复制代码
除此以外,还支持自定义的Executor来做为调度器。ui
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor);
}复制代码
Scheduler是RxJava的线程任务调度器,Worker是线程任务的具体执行者。从Scheduler源码能够看到,Scheduler在scheduleDirect()、schedulePeriodicallyDirect()方法中建立了Worker,而后会分别调用worker的schedule()、schedulePeriodically()来执行任务。
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}复制代码
Worker也是一个抽象类,从上图能够看到每一种Scheduler会对应一种具体的Worker。
public abstract static class Worker implements Disposable {
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
...
}复制代码
SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一个属性叫executor,它是使用AtomicReference包装的ScheduledExecutorService。
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();复制代码
在SingleScheduler构造函数中,executor会调用lazySet()。
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}复制代码
它的createExecutor()用于建立工做线程,能够看到经过SchedulerPoolFactory来建立ScheduledExecutorService。
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}复制代码
在SchedulerPoolFactory类的create(ThreadFactory factory) 中,使用newScheduledThreadPool线程池定义定时器,最大容许线程数为1。
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}复制代码
在SingleScheduler中每次使用ScheduledExecutorService,实际上是使用executor.get()。因此说,single拥有一个线程单例。
SingleScheduler会建立一个ScheduledWorker,ScheduledWorker使用jdk的ScheduledExecutorService做为executor。
下面是ScheduledWorker的schedule()方法。使用ScheduledExecutorService的submit()或schedule()来执行runnable。
@NonNull
@Override
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
tasks.add(sr);
try {
Future<?> f;
if (delay <= 0L) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delay, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
dispose();
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return sr;
}复制代码
ComputationScheduler使用FixedSchedulerPool做为线程池,而且FixedSchedulerPool被AtomicReference包装了一下。
从ComputationScheduler的源码中能够看出,MAX_THREADS是CPU的数目。FixedSchedulerPool能够理解为拥有固定数量的线程池,数量为MAX_THREADS。
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
......
}
static int cap(int cpuCount, int paramThreads) {
return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}复制代码
ComputationScheduler会建立一个EventLoopWorker。
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}复制代码
其中,getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一个PoolWorker。
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}复制代码
PoolWorker继承自NewThreadWorker,它也是线程数为1的ScheduledExecutorService。
IoScheduler使用CachedWorkerPool做为线程池,而且CachedWorkerPool也是被AtomicReference包装了一下。
CachedWorkerPool是基于RxThreadFactory这个ThreadFactory来建立的。
static {
......
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
......
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
......
}复制代码
在RxThreadFactory中,由 prefix 和 incrementAndGet() 来建立新线程的名称。
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}复制代码
IoScheduler建立的线程数是不固定的,能够经过IoScheduler 的 size() 来得到当前的线程数。而ComputationScheduler的线程数通常状况等于CPU的数目。
public int size() {
return pool.get().allWorkers.size();
}复制代码
特别须要的是 ComputationScheduler 和 IoScheduler 都是依赖线程池来维护线程的,区别就是 IoScheduler 线程池中的个数是无限的,由 prefix 和 incrementAndGet() 产生的递增值来决定线程的名字;而 ComputationScheduler 中则是一个固定线程数量的线程池,数据为CPU的数目,而且不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU。
一样,IoScheduler也会建立EventLoopWorker。
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}复制代码
可是这个EventLoopWorker是IoScheduler的内部类,跟ComputationScheduler建立的EventLoopWorker是不同的,只是两者的名称相同罢了。
NewThreadScheduler会建立NewThreadWorker。咱们看到NewThreadWorker的构造函数也是使用SchedulerPoolFactory。
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}复制代码
跟SingleScheduler不一样的是,SingleScheduler的executor是使用AtomicReference包装的ScheduledExecutorService。每次使用时,会调用executor.get()。
然而,NewThreadScheduler每次都会建立一个新的线程。
TrampolineScheduler会建立TrampolineWorker,在TrampolineWorker内部维护着一个PriorityBlockingQueue。任务进入该队列以前,会先用TimedRunnable封装一下。
static final class TimedRunnable implements Comparable<TimedRunnable> {
final Runnable run;
final long execTime;
final int count; // In case if time between enqueueing took less than 1ms
volatile boolean disposed;
TimedRunnable(Runnable run, Long execTime, int count) {
this.run = run;
this.execTime = execTime;
this.count = count;
}
@Override
public int compareTo(TimedRunnable that) {
int result = ObjectHelper.compare(execTime, that.execTime);
if (result == 0) {
return ObjectHelper.compare(count, that.count);
}
return result;
}
}复制代码
咱们能够看到TimedRunnable实现了Comparable接口,会比较任务的execTime和count。
任务在进入queue以前,count每次都会+1。
final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
queue.add(timedRunnable);复制代码
因此,使用TrampolineScheduler时,每次新的任务都会优先执行。
在默认状况下不作任何线程处理,Observable和Observer是处于同一线程中的。若是想要切换线程的话,可使用subscribeOn()和observeOn()。
subscribeOn经过接收一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。
若屡次执行subscribeOn,则只有一次起做用。
点击subscribeOn()的源码能够看到,每次调用subscribeOn()都会建立一个ObservableSubscribeOn对象。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}复制代码
ObservableSubscribeOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}复制代码
其中,SubscribeOnObserver是下游的Observer经过装饰器模式生成的。它实现了Observer、Disposable接口。
接下来,在上游的线程中执行下游Observer的onSubscribe(Disposable disposabel)方法。
s.onSubscribe(parent);复制代码
而后,将子线程的操做加入Disposable管理中,加入Disposable后能够方便上下游的统一管理。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));复制代码
在这里,已经调用对应scheduler的scheduleDirect()方法。scheduleDirect() 传入的是一个Runnable,也就是下面的SubscribeTask。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}复制代码
SubscribeTask会执行run()对上游的Observable进行订阅。
此时,已经在对应的Scheduler线程中运行了。
source.subscribe(parent);复制代码
在RxJava的链式操做中,数据的处理是自下而上,这点跟数据发射正好相反。若是屡次调用subscribeOn,最上面的线程切换最晚执行,因此变成了只有第一次切换线程才有效。
observeOn一样接收一个Scheduler参数,用来指定下游操做运行在特定的线程调度器Scheduler上。
若屡次执行observeOn,则每次均起做用,线程会一直切换。
点击observeOn()的源码能够看到,每次调用observeOn()都会建立一个ObservableObserveOn对象。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}复制代码
ObservableObserveOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}复制代码
若是scheduler是TrampolineScheduler,上游事件和下游事件会当即产生订阅。
若是不是的话,scheduler会建立本身的Worker,而后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象包装了下游真正的Observer。
ObserveOnObserver是ObservableObserveOn的内部类,实现了Observer、Runnable接口。跟SubscribeOnObserver不一样的是,SubscribeOnObserver实现了Observer、Disposable接口。
在ObserveOnObserver的onNext()中,schedule()执行了具体调度的方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}复制代码
其中,worker是当前scheduler建立的Worker,this指的是当前的ObserveOnObserver对象,this实现了Runnable接口。
而后,咱们看看Runnable接口的实现方法run(),这个方法是在worker对应的线程里执行的。drainNormal()会取出 ObserveOnObserver 的 queue 里的数据进行发送。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}复制代码
下游屡次调用observeOn()的话,线程会一直切换。每一次切换线程,都会把对应的Observer对象的各个方法的处理执行在指定的线程中。
举一个屡次调用subscribeOn、observeOn的例子。
Observable.just("HELLO WORLD")
.subscribeOn(Schedulers.single())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
s = s.toLowerCase();
L.i("map1",s);
return s;
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
s = s+" tony.";
L.i("map2",s);
return s;
}
})
.subscribeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
s = s+"it is a test.";
L.i("map3",s);
return s;
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
L.i("subscribe",s);
System.out.println(s);
}
});复制代码
了解RxJava的线程模型、线程调度器、线程调度是很是有意义的。可以帮助咱们更合理地使用RxJava。另外,RxJava的线程切换结合链式调用很是方便,比起Java使用线程操做实在是简单太多了。