转载请以连接形式标明出处: 本文出自:103style的博客react
base on RxJava 2.xgit
官方介绍github
Backpressure is when in an Flowable processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down. 背压是在Flowable处理事件流中,某些异步阶段没法足够快地处理这些值,而且须要一种方法来告诉上游生产商减速。缓存
因此 RxJava 的背压策略(Backpressure)是指处理上述上游流速过快现象的一种策略。 相似 Java中的线程池 中的饱和策略 RejectedExecutionHandler。bash
咱们先使用 Observable 看看是什么状况:异步
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(integer);
}
});
复制代码
输出:async
I/art: Background partial concurrent mark sweep GC freed 7(224B) AllocSpace objects, 0(0B) LOS objects, 27% free, 43MB/59MB, paused 528us total 106.928ms
I/System.out: 0
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 20% free, 62MB/78MB, paused 1.065ms total 327.346ms
I/System.out: 1
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 16% free, 82MB/98MB, paused 1.345ms total 299.700ms
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 13% free, 103MB/119MB, paused 1.609ms total 377.432ms
I/System.out: 2
...
I/art: Alloc concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 1% free, 252MB/256MB, paused 1.574ms total 818.037ms
I/art: WaitForGcToComplete blocked for 2.539s for cause Alloc
I/art: Starting a blocking GC Alloc
I/art: Waiting for a blocking GC Alloc
W/art: Throwing OutOfMemoryError "Failed to allocate a 12 byte allocation with 4109520 free bytes and 3MB until OOM; failed due to fragmentation (required continguous free 4096 bytes for a new buffer where largest contiguous free 0 bytes)"
复制代码
咱们能够从上图中看到,内存在逐步上升,在必定的时间后,到达 256M 以后会触发GC,最后抛出 OutOfMemoryError。由于上游的事件发送太快而下游的消费者消耗的比较慢。ide
那致使内存暴增的源头是什么呢 ?函数
咱们对上面的代码作一点点修改,注释了 observeOn(AndroidSchedulers.mainThread()),会发现内存显示很正常,不会存在上述问题。源码分析
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
})
.subscribeOn(Schedulers.computation())
// .observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(integer);
}
});
复制代码
因此内存暴增的源头就在 observeOn(AndroidSchedulers.mainThread()).
咱们来看看 observeOn 的源码,经过 RxJava subscribeOn和observeOn源码介绍,咱们知道在 ObservableObserveOn.ObserveOnObserver 的 onSubscribe 中构建了一个容量默认为 128 的 SpscLinkedArrayQueue。
queue = new SpscLinkedArrayQueue<T>(bufferSize);
复制代码
上游每发送一个事件都会经过 queue.offer(t) 保存到 SpscLinkedArrayQueue 中。
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
复制代码
咱们能够写个测试代码来看看,由于生产比消费快的多,至关于一直添加元素,以下:
private void test(){
SpscLinkedArrayQueue<Integer> queue = new SpscLinkedArrayQueue<>(128);
for (int i = 0; ; i++) {
queue.offer(i);
}
}
复制代码
运行会发现内存变化和 Observable 同样迅速暴增。
SpscLinkedArrayQueue 的详细介绍后面再说。如今能够大体理解为 一直狂吃,而后最后撑破肚皮,而后裂开。
咱们来看看 Flowable 的用法:
Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)
复制代码
BackpressureStrategy 包含五种模式:MISSING、ERROR、BUFFER、DROP、LATEST。
下面对这五种 BackpressureStrategy 分别介绍其用法以及 发送事件速度 > 接收事件速度 时的处理方式:
BackpressureStrategy.MISSING 处理方式:抛出异常 MissingBackpressureException,并提示 缓存区满了 代码示例:
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
复制代码
输出结果:
System.out: onNext: 0
System.err: io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
复制代码
BackpressureStrategy.ERROR 处理方式:直接抛出异常MissingBackpressureException 修改上述代码的 BackpressureStrategy.MISSING 为 BackpressureStrategy.ERROR:
Flowable
.create(new FlowableOnSubscribe<Object>() {
...
}, BackpressureStrategy.ERROR)
...
复制代码
输出结果:
System.out: onNext: 0
System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
复制代码
BackpressureStrategy.BUFFER 处理方式:相似 Observable 同样 扩充缓存区大小 修改上述代码的 BackpressureStrategy.MISSING 为 BackpressureStrategy.BUFFER:
Flowable
.create(new FlowableOnSubscribe<Object>() {
...
}, BackpressureStrategy.BUFFER)
...
复制代码
输出结果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
...
System.out: onNext: 253
System.out: onNext: 254
System.out: onNext: 255
System.out: onComplete
复制代码
BackpressureStrategy.DROP 处理方式:丢弃缓存区满后处理缓冲区数据期间发送过来的事件 示例代码:
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
复制代码
输出结果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
...
System.out: onNext: 124
System.out: onNext: 125
System.out: onNext: 126
System.out: onNext: 127
System.out: onNext: 1070801
System.out: onNext: 1070802
System.out: onNext: 1070803
System.out: onNext: 1070804
System.out: onNext: 1070805
...
复制代码
BackpressureStrategy.LATEST 处理方式:丢弃缓存区满后处理缓冲区数据期间发送过来的非最后一个事件。下面示例代码输出了 129 个事件,下面的源码分析会介绍。 示例代码:
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
复制代码
输出结果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
...
System.out: onNext: 124
System.out: onNext: 125
System.out: onNext: 126
System.out: onNext: 127
System.out: onNext: 255
System.out: onComplete
复制代码
经过以前 RxJava之create操做符源码解析 的介绍。咱们知道 Flowable.create(new FlowableOnSubscribe(){...}, BackpressureStrategy.LATEST) 返回的是一个 FlowableCreate 对象。
分别对不一样的背压策略建立了不一样的 Emitter .
public final class FlowableCreate<T> extends Flowable<T> {
//...
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
//...
}
复制代码
MissingEmitter
static final class MissingEmitter<T> extends BaseEmitter<T> {
MissingEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if (t != null) {
downstream.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
}
复制代码
经过上面的代码咱们能够看到 MissingEmitter 基本上没作什么操做,因此 BackpressureStrategy.MISSING 示例中的代码其实是调用了 ObserveOn 中返回对象的 FlowableObserveOn.ObserveOnSubscriber 的 onNext:
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
if (!queue.offer(t)) {
upstream.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}
复制代码
上面代码中咱们看到了背压状况下出现的报错信息,出现的前提是 queue.offer(t) 返回 false 。这里的 queue 是 onSubscribe 中构造的容量为 Flowable.bufferSize() 的 SpscArrayQueue .
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
//...
queue = new SpscArrayQueue<T>(prefetch);
downstream.onSubscribe(this);
s.request(prefetch);
}
}
复制代码
SpscArrayQueue 的 offer 方法,咱们能够看到当 SpscArrayQueue 数据 “满了” 的时候即返回 false .
public boolean offer(E e) {
//...
final int mask = this.mask;
final long index = producerIndex.get();
final int offset = calcElementOffset(index, mask);
if (index >= producerLookAhead) {
int step = lookAheadStep;
if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad
producerLookAhead = index + step;
} else if (null != lvElement(offset)) {
return false;
}
}
soElement(offset, e); // StoreStore
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
return true;
}
复制代码
因此 BackpressureStrategy.MISSING 在缓冲区满了以后再发射事件即会抛出 message 为 "Queue is full?!" 的 MissingBackpressureException .
ErrorAsyncEmitter
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
//...
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
//...
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
//...
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
ErrorAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
复制代码
经过在 onSubscribe 中调用 request(Flowable.bufferSize()) 设置当前 AtomicLong 的 value 值。 而后 onNext 中每传递一个事件就经过 BackpressureHelper.produced(this, 1) 将 value 减 1 . 当发送了 Flowable.bufferSize() 个事件, get() != 0 不成立,调用 onOverflow() 方法抛出 MissingBackpressureException 异常。
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
DropAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
// nothing to do
}
}
复制代码
和 ErrorAsyncEmitter 相似,只不过当发送超过超过 Flowable.bufferSize() 的事件时,啥也没作,即实现丢弃的功能。LatestAsyncEmitter
static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
final AtomicReference<T> queue;
//...
LatestAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
this.queue = new AtomicReference<T>();
//...
}
@Override
public void onNext(T t) {
//...
queue.set(t);
drain();
}
//...
}
复制代码
咱们能够看到每次调用 onNext 都会更新传过来的值到 queue 中,因此 queue 中保存了最新的值。
接着来看 drain 方法: 上面咱们知道在 onSubscribe 中调用 request() 设置当前 AtomicLong 的 value 值。
void drain() {
//...
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
//...
boolean d = done;
T o = q.getAndSet(null);
boolean empty = o == null;
if (d && empty) {
//...
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
//...
boolean d = done;
boolean empty = q.get() == null;
if (d && empty) {
//...
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
//...
}
}
复制代码
最后一个事件是怎么发出的? 咱们在上面的 drain() 中调用 a.onNext(o) 最终是调用 observeOn 构建对象中的 ObserveOnSubscriber 的 onNext ,即调用 runAsync(); 。
public final void onNext(T t) {
//...
trySchedule();
}
final void trySchedule() {
//...
worker.schedule(this);
}
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
复制代码
runAsync():
void runAsync() {
//...
for (;;) {
long r = requested.get();
while (e != r) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
//...
return;
}
//...
a.onNext(v);
e++;
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
upstream.request(e);
e = 0L;
}
}
//...
}
}
复制代码
BaseObserveOnSubscriber(
Worker worker,
boolean delayError,
int prefetch) {
//...
this.limit = prefetch - (prefetch >> 2);
}
复制代码
request方法:
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
System.out.println("n = " + n);
BackpressureHelper.add(this, n);
onRequested();
}
}
@Override
void onRequested() {
drain();
}
复制代码
即继续执行 drain() 方法,由于 queue 中还保存最新的值事件。因此会经过 a.onNext(o) 发送这个最新的事件。
若是在执行完等待队列 3/4 的事件以后,上游的事件还没发送结束,下游即会再次缓存上游发送过来的容量的 3/4 个事件。 示例代码:
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(i);
}
Thread.sleep(10 * Flowable.bufferSize());
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(Flowable.bufferSize() * 2 + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
复制代码
输出结果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
//....
System.out: onNext: 125
System.out: onNext: 126
System.out: onNext: 127
System.out: onNext: 255
System.out: onNext: 256
System.out: onNext: 257
//...
System.out: onNext: 349
System.out: onNext: 350
System.out: onNext: 511
System.out: onComplete
复制代码
能够看到输出结果中 255-350 即为容量 128 的 3/4 个元素。
BufferAsyncEmitter
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
final SpscLinkedArrayQueue<T> queue;
//...
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
//...
queue.offer(t);
drain();
}
void drain() {
//...
final SpscLinkedArrayQueue<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
//...
boolean d = done;
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
//...
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
//...
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
//...
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
复制代码
若是以为不错的话,请帮忙点个赞呗。
以上
扫描下面的二维码,关注个人公众号 Android1024, 点关注,不迷路。