系列文章第三篇
承接上文:RXjava解析(二)我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?
(顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找)
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)java
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的状况下,一种告诉上游的被观察者下降发送速度的策略react
简而言之,背压是流速控制的一种策略。git
须要强调两点:github
首先咱们回忆以前那篇《关于Rxjava最友好的文章》,里面其实提到,在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据。面试
结构示意图以下:
观察者能够根据自身实际状况按需拉取数据,而不是被动接收(也就至关于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。缓存
public class FlowableOnBackpressureBufferStategy{ ... @Override public void onNext(T t) { if (done) { return; } boolean callOnOverflow = false; boolean callError = false; Deque<T> dq = deque; synchronized (dq) { if (dq.size() == bufferSize) { switch (strategy) { case DROP_LATEST: dq.pollLast(); dq.offer(t); callOnOverflow = true; break; case DROP_OLDEST: dq.poll(); dq.offer(t); callOnOverflow = true; break; default: // signal error callError = true; break; } } else { dq.offer(t); } } if (callOnOverflow) { if (onOverflow != null) { try { onOverflow.run(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); onError(ex); } } } else if (callError) { s.cancel(); onError(new MissingBackpressureException()); } else { drain(); } } ... }
在这段源码中,根据不一样的背压策略进行了不一样的处理措施,固然这只是列举了一段关于buffer背压策略的例子。网络
产生背压问题的根源就是上游发送速度与下游的处理速度不均致使的,因此若是想要解决这个问题就须要经过匹配两个速率达到解决这个背压根源的措施。
一般有两个策略可供使用:app
- 从数量上解决,对数据进行采样
- 从速度上解决,下降发送事件的速率
- 利用flowable和subscriber
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); } }, BackpressureStrategy.ERROR); //增长了一个参数 Subscriber<Integer> downstream = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); //注意这句代码 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; upstream.subscribe(downstream);
咱们注意到此次和 Observable 有些不一样. 首先是建立 Flowable 的时候增长了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里咱们直接用 BackpressureStrategy.ERROR
这种方式,这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException
. 其他的策略后面再来说解.异步
另外的一个区别是在下游的 onSubscribe
方法中传给咱们的再也不是 Disposable 了, 而是 Subscription
, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 以前咱们说调用 Disposable.dispose()
方法能够切断水管, 一样的调用 Subscription.cancel()
也能够切断水管, 不一样的地方在于 Subscription
增长了一个 void request(longn)
方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:ide
s.request(Long.MAX_VALUE);
这是由于 Flowable
在设计的时候采用了一种新的思路也就是 响应式拉取 的方式来更好的解决上下游流速不均衡的问题, 与咱们以前所讲的 控制数量 和 控制速度 不太同样, 这种方式用通俗易懂的话来讲就比如是 叶问打鬼子 , 咱们把 上游当作 小日本 , 把 下游 看成 叶问 , 当调用 Subscription.request(1)
时, 叶问 就说 我要打一个! 而后 小日本 就拿出 一个鬼子 给叶问, 让他打, 等叶问打死这个鬼子以后, 再次调用 request(10)
, 叶问就又说 我要打十个! 而后小日本又派出 十个鬼子 给叶问, 而后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...
因此咱们把request当作是一种能力, 当成 下游处理事件 的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会形成一窝蜂的发出一堆事件来, 从而致使OOM
. 这也就完美的解决以前咱们所学到的两种方式的缺陷, 过滤事件会致使事件丢失, 减速又可能致使性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { //无限循环发事件 emitter.onNext(i); } } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.d(TAG, "" + integer); } });
当上下游工做在 同一个线程 中时, 这时候是一个 同步 的订阅关系, 也就是说 上游 每发送一个事件 必须 等到 下游 接收处理完了之后才能接着发送下一个事件.
同步与异步的区别就在于有没有缓存发送事件的缓冲区。
经过subscribeOn
和observeOn
来肯定对应的线程,达到异步的效果,异步时会有一个对应的缓存区来换从从上游发送的事件。
public enum BackpressureStrategy { /** * OnNext events are written without any buffering or dropping. * Downstream has to deal with any overflow. * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators. */ MISSING, /** * Signals a MissingBackpressureException in case the downstream can't keep up. */ ERROR, /** * Buffers <em>all</em> onNext values until the downstream consumes it. */ BUFFER, /** * Drops the most recent onNext value if the downstream can't keep up. */ DROP, /** * Keeps only the latest onNext value, overwriting any previous value if the * downstream can't keep up. */ LATEST }
背压策略:
- error, 缓冲区大概在128
- buffer, 缓冲区在1000左右
- drop, 把存不下的事件丢弃
- latest, 只保留最新的
- missing, 缺省设置,不作任何操做
上游从哪里得知下游的处理能力呢?咱们来看看上游最重要的部分,确定就是 FlowableEmitter
了啊,咱们就是经过它来发送事件的啊,来看看它的源码吧(别紧张,它的代码灰常简单):
public interface FlowableEmitter<T> extends Emitter<T> { void setDisposable(Disposable s); void setCancellable(Cancellable c); /** * The current outstanding request amount. * <p>This method is thread-safe. * @return the current outstanding request amount */ long requested(); boolean isCancelled(); FlowableEmitter<T> serialize(); }
FlowableEmitter
是个接口,继承Emitter,Emitter里面就是咱们的onNext(),onComplete()
和onError()
三个方法。咱们看到FlowableEmitter
中有这么一个方法:
long requested();
这张图的意思就是当上下游在同一个线程中的时候,在 下游 调用request(n)
就会直接改变 上游 中的requested
的值,屡次调用便会叠加这个值,而上游每发送一个事件以后便会去减小这个值,当这个值减小至0的时候,继续发送事件便会抛异常了
能够看到,当上下游工做在不一样的线程里时,每个线程里都有一个requested
,而咱们调用request(1000
)时,实际上改变的是下游主线程中的requested
,而上游中的requested
的值是由RxJava
内部调用request(n)
去设置的,这个调用会在合适的时候自动触发。
- 网络请求处理(轮询,嵌套,出错重连)
- 功能防抖
- 从多级缓存获取数据
- 合并数据源
- 联合判断
- 与 Retrofit,RxBinding,EventBus结合使用
- Scheduler线程切换工做原理
- 数据的发送与接收(观察者模式)
- lift的工做原理
- map的工做原理
- flatMap的工做原理
- merge的工做原理
- concat的工做原理
(顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找)
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)