关于Observable的源码解析能够看Rxjava2 Observable源码浅析java
关于Subject的源码解析能够看RxJava2 Subject源码浅析缓存
看过Rxjava2 Observable源码浅析的你会发现其实Rxjava的实现套路都差很少,因此其实Flowable
也差很少,只是在实现的细节上稍微有些差别而已。bash
Flowable
的出现其实主要是为了解决在异步模型中上下游数据发送和接收的差别性而存在的。上游发送速度大于下游接收速度时就会产生数据积压致使OOM,而Flowable
就提供了背压(BackPressure) 策略来处理数据积压问题。多线程
从最原始的Flowable#create
开始异步
//FlowableOnSubscribe就是最原始的数据源发生器
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
//将FlowableOnSubscribe转化成了FlowableCreate
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
复制代码
能够看到create
方法也是将数据源进行了一层封装。而subscribe
方法和Observable#subscribe
就是差很少,最终仍是调用的Flowable#subscribeActual
,而这里就是FlowableCreate#subscribeActual
ide
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
//根据不一样的背压策略实现不一样Emitter
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
//通常来讲在Subscriber#onSubscribe,调用emitter.request指定拉取上游多少数据
t.onSubscribe(emitter);
try {
//将上下游关联
//调用Flowable#create一开始建立的FlowableOnSubscribe#subscribe
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
}
复制代码
能够才看到这里核心就是根据不用背压策略实现不一样的Emitter
。通常来讲在Subscriber#onSubscribe
,调用emitter.request
指定拉取上游多少数据,从而经过背压策略对数据下发的策略不一样。post
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;
final Subscriber<? super T> actual;
final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> actual) {
this.actual = actual;
this.serial = new SequentialDisposable();
}
@Override
public void onComplete() {
complete();
}
protected void complete() {
if (isCancelled()) {
return;
}
try {
actual.onComplete();
} finally {
serial.dispose();
}
}
@Override
public final void onError(Throwable e) {
//尝试下发完成缓存数据
if (!tryOnError(e)) {
RxJavaPlugins.onError(e);
}
}
@Override
public boolean tryOnError(Throwable e) {
return error(e);
}
protected boolean error(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
return false;
}
try {
actual.onError(e);
} finally {
serial.dispose();
}
return true;
}
@Override
public final void cancel() {
serial.dispose();
onUnsubscribed();
}
@Override
public final boolean isCancelled() {
return serial.isDisposed();
}
@Override
public final void request(long n) {
//记录请求的个数
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
void onRequested() {
// default is no-op
}
@Override
public final void setDisposable(Disposable s) {
serial.update(s);
}
@Override
public final void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public final long requested() {
return get();
}
.....
}
复制代码
这里能够看到BaseEmitter
经过自身继承AtomicLong
取记录请求个数,而不是经过锁或者volatile
来提升性能。性能
不作任何处理,由下游自行处理overflow。MissingEmitter
实现很简单。学习
static final class MissingEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 3776720187248809713L;
MissingEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
//这里能够看出,对应数据下发没有任何限制
if (t != null) {
actual.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//request减1
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
}
复制代码
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 2427151001689639875L;
final SpscLinkedArrayQueue<T> queue;///数据缓存列表
Throwable error;
volatile boolean done;//标记是否onComplete或onError
final AtomicInteger wip;//标记调用了多少次drain
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.offer(t);///数据入队列
drain();//检测并下发数据
}
@Override
public boolean tryOnError(Throwable e) {
if (done || isCancelled()) {
return false;
}
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
error = e;
done = true;//标记完成
drain();//检测并下发未完成数据
return true;
}
@Override
public void onComplete() {//仅标记,若队列有数据继续下发完成
done = true;//标记完成
drain();//检测并下发未完成数据
}
@Override
void onRequested() {//#request(long n)后调用
drain();//检测并下发数据
}
@Override
void onUnsubscribed() {
if (wip.getAndIncrement() == 0) {
queue.clear();
}
}
void drain() {
//相似于if(wip++ != 0)
//因此这里屡次调用#drain只有第一次调用才会经过,或者已经清空队列等待一下调用#drain
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
final Subscriber<? super T> a = actual;
final SpscLinkedArrayQueue<T> q = queue;
for (; ; ) {
long r = get();//数据请求数,由#request决定
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
//是否已完成,调用onComplete/onError后会标记done==true
boolean d = done;
//获取队列第一条数据
T o = q.poll();
//用于标记队列是否为空
boolean empty = o == null;
//已标记完成且队列为空,调用onComplete/onError
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
//队列为空,退出获取数据循环
if (empty) {
break;
}
//下发数据
a.onNext(o);
//标记已下发数据
e++;
}
//数据下发量和请求数相符
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
//标记是否完成
boolean d = done;
//标记队列是否为空
boolean empty = q.isEmpty();
//队列为空且已完成,调用onComplete/onError
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
//request数减去已经下发数
if (e != 0) {
BackpressureHelper.produced(this, e);
}
//已处理一次drain,wip-missed避免错过屡次调用drain
//和Observable#observeOn时的ObserveOnObserver#drainNormal处理方式同样
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
复制代码
这里的#drain
下发数据方法和Observable#observeOn
->ObserveOnObserver#drainNormal
的处理方式是有点类似的。经过自己记录request
数和wip
协调下发数据量及正确的下发。在调用Subscriber#onSubscribe
、Emitter#onNext
、Emitter#onComplete
都会触发#drain
尝试去下发缓存的数据。其中Emitter#onNext
时先缓存数据在尝试下发,并且数据还没下发完成前调用onComplete
和onError
(这里重写了tryOnError
)仅先标记完成,还要等数据彻底下发才会真正调用actual
对应方法。ui
其实这里咱们仍是能够学到一些东西的:
- 若是能够的话,使用
Atomic
包下的类代替volatile
和锁提升性能- 使用
missed
和wip
来协调多线程分发任务- 多线程中标志位的判断最好经过临时变量存储判断并屡次判断
BackpressureStrategy.LATEST
当数据背压时只会缓存最后一次下发的数据(经过AtomicReference
来缓存)。具体实现原理和BackpressureStrategy.BUFFER
较为相似就不贴代码了。
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//是否已达请求数
if (get() != 0) {
actual.onNext(t);//未达请求数,下发
BackpressureHelper.produced(this, 1);//请求数减1
} else {
onOverflow();//已超过请求,调用对应策略方法
}
}
//
abstract void onOverflow();
}
复制代码
BackpressureStrategy.DROP
对应的DropAsyncEmitter
和BackpressureStrategy.ERROR
对应的ErrorAsyncEmitter
都是继承于NoOverflowBaseAsyncEmitter
。实现方式也是很简单,仅仅在onNext
判断一下是否已经到达了请求数,未到达就下发,若到达了调用onOverflow()
处理溢出方案。
BackpressureStrategy.DROP
的溢出方案为空实现即舍去溢出数据BackpressureStrategy.ERROR
的溢出方案为调用onError
即溢出时报错
MISS
策略须要下游自行处理背压问题
BUFFER
策略则在还有数据未下发完成时就算上游调用onComplete
或onError
也会等待数据下发完成
LATEST
策略则当产生背压时仅会缓存最新的数据
DROP
策略为背压时抛弃背压数据
ERROR
策略是背压时抛出异常调用onError
在学习源码时获得的一些关于多线程的领悟:
- 若是能够的话,使用
Atomic
包下的类代替volatile
和锁提升性能- 使用
missed
和wip
来协调多线程分发任务- 多线程中标志位的判断最好经过临时变量存储判断并屡次判断