SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一个属性叫做executor,它是使用AtomicReference包装的ScheduledExecutorService。java
补充:AtomicReference类的做用:AtomicReference则对应普通的对象引用,即保证你在修改对象引用时的线程安全性;对”对象”进行原子操做安全
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
复制代码
在SingleScheduler构造函数中,Executor会调用lazySet().bash
/** * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. */
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}
复制代码
************ 分割线 ************
app
其中**lazySet()**是AtomicReference
中的方法,用于修改引用对象:less
// AtomicRefence类
/** * Sets to the given value. * * @param newValue the new value */
public final void set(V newValue) {
value = newValue;
}
/** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */
public final void lazySet(V newValue) {
U.putOrderedObject(this, VALUE, newValue);
}
复制代码
AtomicReferences中set()和lazySet()区别:
set()
会马上修改旧值,别的线程能够马上看到更新后的值;而lazySet()
不会马上(可是最终会)修改旧值,别的线程看到新值的时间会延迟一些。ide
************ 分割线 ************
函数
它的createExecutor()用于建立工做线程,能够看到经过SchedulerPoolFactory来建立ScheduledExecutorService。oop
// SingleScheduler类
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}
复制代码
经过SchedulerPoolFactory类的create(ThreadFactory factory)来建立单线程的线程ui
// SchedulerPoolFactory类
/**
* Creates a ScheduledExecutorService with the given factory.
* @param factory the thread factory
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
// 建立单线程
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
复制代码
在SingleScheduler中,每次使用ScheduledExecutorService时,实际上是使用executor.get()。因此说,single拥有一个线程单例。this
SingleScheduler会建立一个ScheduledWorker,ScheduledWorker使用JDK的ScheduledExecutorService做为executor。 下面是ScheduledWorker的schedule()方法,使用ScheduledExecutorService的submit()或schedule()来执行runnable。
@NonNull
@Override
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
tasks.add(sr);
try {
Future<?> f;
if (delay <= 0L) {
/**
* 当即执行则执行submit()方法
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*/
f = executor.submit((Callable<Object>)sr);
} else {
/**
* 需延迟执行,则执行schedule()方法
* Creates and executes a ScheduledFuture that becomes enabled after the
* given delay.
*/
f = executor.schedule((Callable<Object>)sr, delay, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
dispose();
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return sr;
}
复制代码
ComputationScheduler使用FixedSchedulerPool做为线程池,而且FixedSchedulerPool被AtomicReference包装了一下。
从ComputationScheduler的源码中能够看出,MAX_THREADS是CPU的数目。FixedSchedulerPool
能够理解为拥有固定数量的线程池(有点相似线程池中的FixedThreadPool),数量为MAX_THREADS。
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
...
}
static int cap(int cpuCount, int paramThreads) {
return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}
复制代码
ComputationScheduler类会建立一个EventLoopWorker。
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}
复制代码
其中getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一个PoolWorker。
注:
FixedSchedulerPool
和EventLoopWorker
都为ComputationScheduler
的内部类
// EventLoopWorker类中的方法
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}
复制代码
PoolWorker继承自NewThreadWorker,也是线程数为1的ScheduledExecutorService。
IoScheduler使用CachedWorkerPool做为线程池,而且CacheWorkerPool也被AtomicReference包装了一下。 CachedWorkerPool是基于RxThreadFactory这个ThreadFactory来建立的。
static {
...
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
...
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
...
}
复制代码
在RxThreadFactory中,由prefix和incrementAndGet()来建立新线程的名称
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
复制代码
IoScheduler
建立的线程数是不固定的,能够经过IOScheduler的size()
来获取当前的线程数。通常状况下,ComputationScheduler的线程数等于CPU的数目。
public int size() {
return pool.get().allWorkers.size();
}
复制代码
注意:
ComputationScheduler
和IoScheduler
都是依赖线程池来维护线程的,区别在于:IoScheduler
线程池中的个数是无限的,由prefix和incrementAndGet()
产生的递增值来决定线程的名字。而ComputationScheduler中则是一个固定线程数量的线程池,数量为CPU的数目,而且不要把I/O操做放在computation()中,不然I/O操做的等待时间会浪费CPU。
一样,IoScheduler也会建立EventLoopWorker类
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
复制代码
但这个EventLoopWorker是IoScheduler的内部类,与ComputationScheduler建立的EventLoopWorker不一样,知识两者同名罢了,且都是继承Scheduler.WOrker类而已。
NewThrScheduler会建立NewThreadWorker,NewThreadWorker的构成函数使用的也是SchedulerPoolFactory。
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
复制代码
与SingleScheduler不一样的是,SingleScheduler的executor是使用AtomicReference包装的SchedulerExecutorService。每次使用时,都会调用executor.get()。
然而,NewThreadScheduler
每次都会建立一个新的线程。
我的这块不太懂,按照自我理解分析下:在SingleScheduler由于该调度器中只有一个线程,于是在后续调用,须要保证该对象惟一且保证先后一致,于是使用AtomicReference保证其余线程能知道;而NewThreadScheduler中每次都会建立新的线程,于是无需保证线程同步,不用管是否让其余线程知道。
TrampolineScheduler会建立TrampolineWorker,在TrampolineWorker内部维护着一个PriorityBlockingQueue。任务进入该队列以前,会先用TimedRunnable封装下。
static final class TimedRunnable implements Comparable<TimedRunnable> {
final Runnable run;
final long execTime;
final int count; // In case if time between enqueueing took less than 1ms
volatile boolean disposed;
TimedRunnable(Runnable run, Long execTime, int count) {
this.run = run;
this.execTime = execTime;
this.count = count;
}
@Override
public int compareTo(TimedRunnable that) {
int result = ObjectHelper.compare(execTime, that.execTime);
if (result == 0) {
return ObjectHelper.compare(count, that.count);
}
return result;
}
}
复制代码
能够看到TimeRunnable实现了Comparable接口,会比较任务的execTime和count。 任务在进入queue以前,count每次都会+1.
final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
queue.add(timedRunnable);
复制代码
因此,在使用TrampolineScheduler
时,新的任务总会优先执行。