RxJava
是ReactiveX推出的一个开源库,它是Reactive Extensions
的Java VM实现,能够很方便的在Java中实现响应式编程。解决了Java中繁琐的异步切换、Callback hell等问题,使逻辑变得更加简洁。java
RxJava
提供了丰富&功能强大的操做符,能够说这些操做符就是RxJava
的基础及核心,因此学习RxJava
都是从这些操做符开始。但因为RxJava
的操做符种类繁多且网络上已经出现了不少优秀的讲解RxJava
操做符的文章,因此本文仅列举一些操做符讲解。android
Observable.interval(3000, TimeUnit.MILLISECONDS)//每隔3s发一个事件
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("tag", "start");
}
@Override
public void onNext(Long aLong) {
Log.i("tag", "onNext:" + aLong);
}
@Override
public void onError(Throwable e) {
Log.i("tag", "error:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i("tag", "onComplete");
}
});
复制代码
interval
操做符主要就是实现轮询操做,经过该操做符来实现轮询效果会比Handler
、Timer
及newScheduledThreadPool
更简洁,更优雅。但从原理上来看,interval
其实就是对newScheduledThreadPool
的封装。固然,咱们也能够本身对Handler
、Timer
来进行封装。git
Observable.create(new ObservableOnSubscribe<File>() {
@Override
public void subscribe(ObservableEmitter<File> emitter) throws Exception {
File file = new File(path + File.separator + "blacklist");
emitter.onNext(file);
}
}).concatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(File file) throws Exception {
if (!file.isDirectory()) {
return Observable.empty();
}
return Observable.fromArray(file.listFiles());
}
}).subscribe(new Consumer<File>() {
@Override
public void accept(File file) throws Exception {
LogUtils.i("getPackageNames", "删除文件夹中已存在的文件");
file.delete();
}
});
复制代码
concatMap
操做符主要是进行事件的拆分及合并。在上面示例中就实现了对文件夹的遍历及得到文件夹下的每一个File
对象。github
Observable.fromIterable(data)
.map(new Function<PackageNameData, File>() {//类型转换
@Override
public File apply(PackageNameData pkg) throws Exception {
LogUtils.i("getPackageNames", "pkg:" + pkg.toString());
String path = FileUtil.getWeikePath() + File.separator + "blacklist";
File file = new File(path);
if (file.exists() && file.isFile()) {
file.delete();
}
boolean b = file.mkdirs();
if (b) {
LogUtils.i("getPackageNames", "建立文件夹" + file + "成功");
} else {
LogUtils.i("getPackageNames", "建立文件夹" + file + "失败");
}
path = path + File.separator + pkg.appPackageName.trim();
return new File(path);
}
})
.filter(new Predicate<File>() {//筛选
@Override
public boolean test(File file) throws Exception {
return !file.exists();
}
})
.subscribe(new Consumer<File>() {
@Override
public void accept(File file) throws Exception {
LogUtils.i("getPackageNames", "建立新的文件");
try {
boolean b = file.createNewFile();
if (!b) {
FileUtil.writeTxt(file.getAbsolutePath(), "");
}
} catch (IOException e) {
LogUtils.i("getPackageNames", "建立文件失败:" + e.getMessage());
FileUtil.writeTxt(file.getAbsolutePath(), "");
}
}
});
复制代码
filter
操做符主要是作筛选操做,若是返回false,则不会继续向下发送事件。因此若是想要在返回false的状况下也要继续发送事件的话,则不能使用该操做符。算法
map
操做符主要是对类型的转换,如上面示例中就是将PackageNameData
类型转换成一个File
类型并向下传递。编程
关于RxJava
操做符的更多内容能够去阅读Carson_Ho的RxJava
系列文章、扔物线的给 Android 开发者的 RxJava 详解等文章。数组
在Java中,通常讨论线程都会想到Thread
类,但在RxJava中,咱们会发现,RxJava
中的线程是能够作定时、轮询等操做。这究竟是怎么实现的尼?或许会想到定时器类——Timer
,但其实不是Timer
,是经过一个可定时、轮询执行操做的线程池——newScheduledThreadPool
来实现的。在RxJava
中,因为该线程池有且仅有一个线程,所以能够将该线程池理解为一种特殊线程,一种仅在RxJava
中使用的特殊线程。在后面内容中会将这种特殊的线程简称为线程。 缓存
从图中能够看出,RxJava中线程都是在SchedulerPoolFactory
类的create
方法中建立的。网络
public static ScheduledExecutorService create(ThreadFactory factory) {
//建立线程为1的一个线程池,它至关于RxJava中的特殊线程
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
复制代码
在RxJava
中可使用Schedulers.single()
来建立一个线程,该方法有且只会建立一个新的线程,相似于线程池中的newSingleThreadExecutor
。因此该线程只会在当前任务执行完毕后才执行下一个任务——至关于串行执行。下面来看一下源码里的实现。多线程
public final class SingleScheduler extends Scheduler {
final ThreadFactory threadFactory;
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
/** The name of the system property for setting the thread priority for this Scheduler. */
//至关于一个key,能够经过设置KEY_SINGLE_PRIORITY对应的值来设置线程优先级
private static final String KEY_SINGLE_PRIORITY = "rx2.single-priority";
//能够经过该参数来判断执行的线程名称
private static final String THREAD_NAME_PREFIX = "RxSingleScheduler";
...
public SingleScheduler() {
this(SINGLE_THREAD_FACTORY);
}
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
//建立一个线程并使用原子变量类AtomicReference来管理该线程
//使用lazySet并不会让值当即对全部线程可见,而set则是当即对全部线程可见的
executor.lazySet(createExecutor(threadFactory));
}
//建立一个线程,SchedulerPoolFactory.create(threadFactory)该方法在上面前面已经讲述
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}
//线程中止执行,关于如何中止能够去查看线程池的中止执行
@Override
public void shutdown() {...}
...
}
复制代码
能够看出,在SingleScheduler
的构造方法中就经过createExecutor
建立了一个线程,而SingleScheduler
这个类仅会建立一次。因此当使用Schedulers.single()
时仅会建立一个线程。
在RxJava
中可使用Schedulers.newThread()
来建立一个新线程,该线程不会被重用,线程数量会随着调用次数的增长而增长。
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
//建立一个新的线程
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
复制代码
上面的NewThreadWorker
是一个很是重要类,后面的Schedulers.computation()
及Schedulers.io()
都是根据此类来建立线程的。
Schedulers.computation()
主要用来作一些计算密集型操做,会根据当前设备的CPU数量来建立一组线程。而后给不一样任务分配不一样的线程。下面来看源码的实现。
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
...
static {
//最大线程数量,根据CPU数量计算出的
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
...
}
static int cap(int cpuCount, int paramThreads) {
return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}
//我认为这里实现了一个简单的线程池
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
//线程的数量
this.cores = maxThreads;
//建立一个数组,保存对应的线程
this.eventLoops = new PoolWorker[maxThreads];
//建立一组线程
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
//根据索引来给不一样任务分配不一样的线程。
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}
public void shutdown() {
for (PoolWorker w : eventLoops) {
w.dispose();
}
}
@Override
public void createWorkers(int number, WorkerCallback callback) {...}
}
public ComputationScheduler() {
this(THREAD_FACTORY);
}
//建立线程
public ComputationScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
...
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
//给任务分配一个线程
PoolWorker w = pool.get().getEventLoop();
return w.scheduleDirect(run, delay, unit);
}
@NonNull
@Override
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
//给任务分配一个线程
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
//建立一组要使用的线程
@Override
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
//中止线程执行
@Override
public void shutdown() {...}
...
//在NewThreadWorker中建立了线程,这里之因此不直接使用NewThreadWorker是由于这里传递的threadFactory能够根据名称来区分线程
static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}
}
复制代码
原理仍是比较简单的,用一个数组来保存一组线程,而后根据索引将任务分配给每一个线程,因为每一个线程其实是一个线程池,而这个线程池会把多余的任务放在队列中等待执行,因此每一个线程后面任务的执行须要等待前面的任务执行完毕。
Schedulers.io()
能够说是RxJava
里实现最复杂的,它不只会建立线程,也会清除线程。在IoScheduler
中实现了一个缓存池,当线程执行完毕后会将线程放入缓存池中。下面来看一下源码实现。
public final class IoScheduler extends Scheduler {
...
//线程的存活时间
public static final long KEEP_ALIVE_TIME_DEFAULT = 60;
private static final long KEEP_ALIVE_TIME;
//线程的存活时间单位
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
...
//缓存池
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
//建立一个线程,该线程默认会每60s执行一次,来清除已到期的线程
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
//设置定时任务
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
@Override
public void run() {
//执行清除时间到期的线程操做
evictExpiredWorkers();
}
//每个任务都从队列中获取线程,若是队列中有线程的话
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
//若是缓存池不为空
while (!expiringWorkerQueue.isEmpty()) {
//从缓冲池中得到线程
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
//缓存池为空,须要建立一个新的线程
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
//将执行完毕的线程放入缓存队列中
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
//刷新线程的到期时间
threadWorker.setExpirationTime(now() + keepAliveTime);
//将执行完毕的线程放入缓存池中
expiringWorkerQueue.offer(threadWorker);
}
//默认每60s执行一次,主要是清除队列中的已过时线程
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
//若是线程threadWorker已到期就将其从缓存中移除
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
...
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
...
//建立一个新的线程
static final class ThreadWorker extends NewThreadWorker {
//到期时间,若是该线程到期后就会被清除
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
//获取线程的到期时间
public long getExpirationTime() {
return expirationTime;
}
//刷新到期时间
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
}
复制代码
CachedWorkerPool
是一个很是重要的类,它内部有一个队列及线程。该队列主要是缓存已经使用完毕的线程,而CachedWorkerPool
中的线程evictor
主要就是作清除操做,默认是每60s就遍历一遍队列,若是线程过时就从队列中将该线程移除。这里的队列没有数量限制,因此理论上能够建立无限多的线程。
Schedulers.trampoline()
用的比较少,官方对于它的解释是:
在当前线程上执行,但不会当即执行。任务会被放入队列并在当前任务完成后执行。注意:是在当前线程执行,也就意味着不会进行线程切换。
经过查看源码能够发现,当Schedulers.trampoline()
没有延迟任务时,Schedulers.trampoline()
使用与没有使用都没区别。但执行延时任务时,就会将当前任务添加进队列中,等待时间到了再执行。
public final class TrampolineScheduler extends Scheduler {
private static final TrampolineScheduler INSTANCE = new TrampolineScheduler();
public static TrampolineScheduler instance() {
return INSTANCE;
}
@NonNull
@Override
public Worker createWorker() {
return new TrampolineWorker();
}
/* package accessible for unit tests */TrampolineScheduler() {
}
//当不是延时任务时,直接执行该任务
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run) {
RxJavaPlugins.onSchedule(run).run();
return EmptyDisposable.INSTANCE;
}
...
//执行延时任务,就会将该任务添加进优先级队列PriorityBlockingQueue中
static final class TrampolineWorker extends Scheduler.Worker implements Disposable {
final PriorityBlockingQueue<TimedRunnable> queue = new PriorityBlockingQueue<TimedRunnable>();
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action) {
//将任务压入队列中
return enqueue(action, now(TimeUnit.MILLISECONDS));
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
long execTime = now(TimeUnit.MILLISECONDS) + unit.toMillis(delayTime);
//将任务压入队列中
return enqueue(new SleepingRunnable(action, this, execTime), execTime);
}
//将任务添加进队列中等待执行
Disposable enqueue(Runnable action, long execTime) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
queue.add(timedRunnable);
if (wip.getAndIncrement() == 0) {
int missed = 1;
for (;;) {
for (;;) {
if (disposed) {
queue.clear();
return EmptyDisposable.INSTANCE;
}
//获取一个要执行的任务
final TimedRunnable polled = queue.poll();
if (polled == null) {
break;
}
if (!polled.disposed) {
//执行任务
polled.run.run();
}
}
//重置wip的值
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
return EmptyDisposable.INSTANCE;
} else {
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
return Disposables.fromRunnable(new AppendToQueueTask(timedRunnable));
}
}
...
}
...
}
复制代码
AndroidSchedulers.mainThread()
是RxAndroid
中的的API。因为在android中须要在主线程更新UI,因此须要该API来切换回主线程。在Android中想要切换回主线程,就只有经过Handler
来实现,而AndroidSchedulers.mainThread()
也不例外。很是简单,就是经过Handler
向主线程发送消息。
final class HandlerScheduler extends Scheduler {
//传递进来的Handler已是主线程的Handler了,只要经过该Handler发送消息便可
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
...
//发送消息,切换回主线程
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
//发送消息,切换回主线程
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
...
}
...
}
复制代码
在Android及一些开源库(如OKHttp
、Glide
等)中,多线程之间的数据同步问题通常都是采用synchronized
来实现,由于它是使用最简单也最深刻人心的一种实现方式,也是性能比较高的一种实现方法。但它倒是一种悲观锁——不论是否有线来程竞争都会加锁,这就致使了在线程竞争比较低的状况下,它的性能不如乐观锁——一种经过CAS来实现的锁机制。而RxJava
中大量使用的原子变量类Atomicxxxxxx
就是一种乐观锁,也是CAS
的一种实现。
CAS
全称为Compare And Swap
,即比较并替换。它包含了3个操做数——须要读写的内存位置V、进行比较的值A和拟写入的新值B。当且仅当V的值等于A时,CAS
才会经过原子方式用新值B来更新V的值,不然不会执行任何操做。关于更多CAS
能够参考笔者的Java之CAS无锁算法这篇文章。
在RxJava
中都会使用装饰模式将Observer
包裹成与操做符对应的类xxxxxxObserver
,如FlatMap
、merge
等操做符对应的类——MergeObserver
、subscribeOn
操做符对应的类——SubscribeOnObserver
、observeOn
对应的类——ObserveOnObserver
等。而MergeObserver
、SubscribeOnObserver
及ObserveOnObserver
都分别继承自AtomicInteger
、AtomicReference
及AtomicInteger
。也就是经过原子变量类来实现了线程之间的数据同步。
在Flowable
中也是如此,只不过由xxxxxxObserver
变为了xxxxxSubscriber
而已。
生产者——消费者模式其实就是一种线程间协做的思想。在学习多线程时,实现的买票与卖票案例,就是该模型的实现。或许在开发中不多主动使用到该模型,但基本上都会被动使用该模型。如音视频的下载与解码、网络图片的下载与展现、RxJava事件的发送与接收等。到这里,咱们会疑惑,该模型与RxJava有什么关联?是何种联系尼?其实RxJava的异步订阅就是该模型的一种实现,也所以会在上游发送事件的速度超出下游处理事件的速度时,抛MissingBackpressureException
异常。
Backpressure
既是你们所说的背压,可是我认为这个翻译是有一点问题的,没有一目了然的表达Backpressure
,笔者认为扔物线在如何形象的描述反应式编程中的背压(Backpressure)机制?中的回答就很好的阐述了Backpressure
。 产生的缘由——主要是在异步场景下,上游发送事件的速度超过了下游处理事件的速度,使buffer溢出,从而抛出MissingBackpressureException
异常,这里重点在于buffer的溢出(RxJava 2.x中的默认buffer大小为128)。在1.x的版本中,解决该问题的方案不是很完全,但在2.x的版本中则分出一个新类Flowable
来处理这个问题。它与Observable
处理事件的流程恰好相反,Observable
的事件是由被观察者主动发送的,观察者没法控制速度,只能被动接受,而Flowable
则是由观察者主动获取事件,从而解决了MissingBackpressureException
异常。下面来看一个示例。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 200; i++) {
emitter.onNext("str" + i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.w("Flowable", "onSubscribe");
}
@Override
public void onNext(String s) {
Log.w("Flowable", "s:" + s);
}
@Override
public void onError(Throwable t) {
Log.w("Flowable", "error:" + t.toString());
}
@Override
public void onComplete() {
Log.w("Flowable", "onComplete");
}
});
复制代码
Flowable
的create
方法的第二个参数是设置Backpressure
的模式,它有以下几种模式:
BackpressureStrategy.MISSING
:上游不作任何事件缓存及丢弃,所有交给下游处理,若是有溢出的话,上游无论,交给下游处理。BackpressureStrategy.ERROR
:当下游没法及时处理事件从而致使缓存队列已满时,会给出MissingBackpressureException
异常提示,默认是该策略。BackpressureStrategy.BUFFER
:缓存队列无限大,因此不会抛出MissingBackpressureException
异常。直到下游处理完毕全部事件为止,也意味着内存会随着事件的增多而增大。BackpressureStrategy.DROP
:若是下游没法及时处理事件从而当缓存队列已满时,会删除最近的事件。BackpressureStrategy.LATEST
:若是下游没法及时处理事件从而当缓存队列已满时,会保留最新的事件,其余的事件会被覆盖。因此运行上面代码就会给出MissingBackpressureException
异常提示,须要咱们经过request
方法来获取及消费事件及设置Backpressure
策略来解决该问题。在使用其余操做符的时候,没法主动设置Backpressure
策略,则会在缓存池满了之后给出MissingBackpressureException
异常提示。
toFlowable
是Observable
中的一个方法,经过该方法能够主动来设置Backpressure
策略,从而低成本的解决在Observable
中抛出的MissingBackpressureException
异常。
Observable.interval(1000,TimeUnit.MILLISECONDS)
//设置`Backpressure`策略
.toFlowable(BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Long aLong) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
复制代码
相信阅读到这里,就已经对RxJava及源码都有了必定的了解。可是你们有没有想过如下几个问题,也是我在学习RxJava时一直思考的几个问题。
首先来看问题一,RxJava
的应用能够说很是普遍,好比轮询、网络出错重连、网络请求嵌套回调、联合判断、从缓存中获取数据等,但上面的一些场景也能够不用RxJava来实现,这也就致使了在使用时不会第一时间想到RxJava
。因此笔者认为若是想要熟练的使用RxJava,则须要在思想上进行一次转变,由于RxJava是响应式编程的一种实现,它不会像OkHttp
、Glide
、Dbflow
等开源库只会应用在某一领域。
关于学习RxJava的意义,我认为最好就是可以熟练使用并在可使用RxJava
的时候可以第一时间想到RxJava
,固然因为RxJava
学习门槛较高且须要思惟的转变,因此在不能熟练使用时,就须要咱们可以看懂别人写的RxJava
代码了。固然RxJava
的异步切换、Callback hell问题的解决也是很好的学习RxJava
的理由。
那么你们怎么看RxJava
尼???
【参考资料】
关于RxJava最友好的文章——背压(Backpressure)