Outlinejava
[TOC]react
很久不见朋友们,最近一段时间在忙工做上的事情,没来得及写文章,这两天正好有点时间,赶忙写下了这篇教程,省得你们说我太监了。android
先来回顾一下上上节,咱们讲Flowable的时候,说它采用了响应式拉
的方式,咱们还举了个叶问打小日本
的例子,再来回顾一下吧,咱们说把上游
当作小日本
, 把下游
看成叶问
, 当调用Subscription.request(1)
时, 叶问
就说我要打一个!
而后小日本
就拿出一个鬼子
给叶问, 让他打, 等叶问打死这个鬼子以后, 再次调用request(10)
, 叶问就又说我要打十个!
而后小日本又派出十个鬼子
给叶问, 而后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打。app
可是不知道你们有没有发现,在咱们前两节中的例子中,咱们口中声称的响应式拉
并无彻底体现出来,好比这个例子:异步
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(1);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
mSubscription.request(1);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
虽然咱们在下游中是每次处理掉了一个事件以后才调用request(1)去请求下一个事件,也就是说叶问的确是在打死了一个鬼子以后才继续打下一个鬼子,但是上游呢?上游真的是每次当下游请求一个才拿出一个吗?从上上篇文章中咱们知道并非这样的,上游仍然是一开始就发送了全部的事件,也就是说小日本并无等叶问打死一个才拿出一个,而是一开始就拿出了全部的鬼子,这些鬼子从一开始就在这儿排队等着被打死。ide
有个故事是这么说的:oop
楚人有卖盾与矛者,先誉其盾之坚,曰:“吾盾之坚,物莫能陷也。”俄而又誉其矛之利,曰:“吾矛之利,万物莫不陷也。”市人诘之曰:"以子之矛陷子之盾,何如?”其人弗能应也。众皆笑之。学习
没错,咱们先后所说的就是自相矛盾了,这说明了什么呢,说明咱们的实现并非一个完整的实现,那么,究竟怎样的实现才是完整的呢?spa
咱们先本身来想想,在下游中调用Subscription.request(n)就能够告诉上游,下游可以处理多少个事件,那么上游要根据下游的处理能力正确的去发送事件,那么上游是否是应该知道下游的处理能力是多少啊,对吧,否则,一个巴掌拍不响啊,这种事情得你情我愿才行。线程
那么上游从哪里得知下游的处理能力呢?咱们来看看上游最重要的部分,确定就是FlowableEmitter
了啊,咱们就是经过它来发送事件的啊,来看看它的源码吧(别紧张,它的代码灰常简单):
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable s);
void setCancellable(Cancellable c);
/** * The current outstanding request amount. * <p>This method is thread-safe. * @return the current outstanding request amount */
long requested();
boolean isCancelled();
FlowableEmitter<T> serialize();
}复制代码
FlowableEmitter是个接口,继承Emitter,Emitter里面就是咱们的onNext(),onComplete()和onError()三个方法。咱们看到FlowableEmitter中有这么一个方法:
long requested();复制代码
方法注释的意思就是当前外部请求的数量
,哇哦,这好像就是咱们要找的答案呢. 咱们仍是实际验证一下吧.
先来看同步
的状况吧:
public static void demo1() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
这个例子中,咱们在上游中打印出当前的request数量,下游什么也不作。
咱们先猜想一下结果,下游没有调用request(),说明当前下游的处理能力为0,那么上游获得的requested也应该是0,是否是呢?
来看看运行结果:
D/TAG: onSubscribe
D/TAG: current requested: 0复制代码
哈哈,结果果真是0,说明咱们的结论基本上是对的。
那下游要是调用了request()呢,来看看:
public static void demo1() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(10); //我要打十个!
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
此次在下游中调用了request(10),告诉上游我要打十个,看看运行结果:
D/TAG: onSubscribe
D/TAG: current requested: 10复制代码
果真!上游的requested的确是根据下游的请求来决定的,那要是下游屡次请求呢?好比这样:
public static void demo1() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(10); //我要打十个!
s.request(100); //再给我一百个!
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
下游先调用了request(10), 而后又调用了request(100),来看看运行结果:
D/TAG: onSubscribe
D/TAG: current requested: 110复制代码
看来屡次调用也没问题,作了加法
。
诶加法?对哦,只是作加法,那何时作减法
呢?
固然是发送事件啦!
来看个例子吧:
public static void demo2() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "before emit, requested = " + emitter.requested());
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "after emit 1, requested = " + emitter.requested());
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "after emit 2, requested = " + emitter.requested());
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "after emit 3, requested = " + emitter.requested());
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "after emit complete, requested = " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(10); //request 10
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
代码很简单,来看看运行结果:
D/TAG: onSubscribe
D/TAG: before emit, requested = 10
D/TAG: emit 1
D/TAG: onNext: 1
D/TAG: after emit 1, requested = 9
D/TAG: emit 2
D/TAG: onNext: 2
D/TAG: after emit 2, requested = 8
D/TAG: emit 3
D/TAG: onNext: 3
D/TAG: after emit 3, requested = 7
D/TAG: emit complete
D/TAG: onComplete
D/TAG: after emit complete, requested = 7复制代码
你们应该能看出端倪了吧,下游调用request(n) 告诉上游它的处理能力,上游每发送一个next事件
以后,requested就减一,注意是next事件,complete和error事件不会消耗requested
,当减到0时,则表明下游没有处理能力了,这个时候你若是继续发送事件,会发生什么后果呢?固然是MissingBackpressureException
啦,试一试:
public static void demo2() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "before emit, requested = " + emitter.requested());
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "after emit 1, requested = " + emitter.requested());
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "after emit 2, requested = " + emitter.requested());
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "after emit 3, requested = " + emitter.requested());
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "after emit complete, requested = " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(2); //request 2
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
仍是这个例子,只不过此次只request(2), 看看运行结果:
D/TAG: onSubscribe
D/TAG: before emit, requested = 2
D/TAG: emit 1
D/TAG: onNext: 1
D/TAG: after emit 1, requested = 1
D/TAG: emit 2
D/TAG: onNext: 2
D/TAG: after emit 2, requested = 0
D/TAG: emit 3
W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
at zlc.season.rxjava2demo.demo.ChapterNine$4.subscribe(ChapterNine.java:80)
at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
at io.reactivex.Flowable.subscribe(Flowable.java:12218)
at zlc.season.rxjava2demo.demo.ChapterNine.demo2(ChapterNine.java:89)
at zlc.season.rxjava2demo.MainActivity$2.onClick(MainActivity.java:36)
at android.view.View.performClick(View.java:4780)
at android.view.View$PerformClick.run(View.java:19866)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
D/TAG: after emit 3, requested = 0
D/TAG: emit complete
D/TAG: after emit complete, requested = 0复制代码
到目前为止咱们一直在说同步的订阅,如今同步说完了,咱们先用一张图来总结一下同步的状况:
这张图的意思就是当上下游在同一个线程中的时候,在下游
调用request(n)就会直接改变上游
中的requested的值,屡次调用便会叠加这个值,而上游每发送一个事件以后便会去减小这个值,当这个值减小至0的时候,继续发送事件便会抛异常了。
咱们再来讲说异步
的状况,异步和同步会有区别吗?会有什么区别呢?带着这个疑问咱们继续来探究。
一样的先来看一个基本的例子:
public static void demo3() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
此次是异步的状况,上游啥也不作,下游也啥也不作,来看看运行结果:
D/TAG: onSubscribe
D/TAG: current requested: 128复制代码
哈哈,又是128,看了我前几篇文章的朋友确定很熟悉这个数字啊!这个数字为何和咱们以前所说的默认的水缸大小同样啊,莫非?
带着这个疑问咱们继续来研究一下:
public static void demo3() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(1000); //我要打1000个!!
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
此次咱们在下游调用了request(1000)告诉上游我要打1000个,按照以前咱们说的,此次的运行结果应该是1000,来看看运行结果:
D/TAG: onSubscribe
D/TAG: current requested: 128复制代码
卧槽,你肯定你没贴错代码?
是的,真相就是这样,就是128,蜜汁128。。。
为了答疑解惑,我就直接上图了:
能够看到,当上下游工做在不一样的线程里时,每个线程里都有一个requested,而咱们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。
如今咱们就能理解为何没有调用request,上游中的值是128了,由于下游在一开始就在内部调用了
request(128)去设置了上游中的值,所以即便下游没有调用request(),上游也能发送128个事件,这也能够解释以前咱们为何说Flowable中默认的水缸大小是128,其实就是这里设置的。
刚才同步的时候咱们说了,上游每发送一个事件,requested的值便会减一,对于异步来讲一样如此,那有人确定有疑问了,一开始上游的requested的值是128,那这128个事件发送完了不就不能继续发送了吗?
刚刚说了,设置上游requested的值的这个内部调用会在合适的时候
自动触发,那到底何时是合适的时候呢?是发完128个事件才去调用吗?仍是发送了一半才去调用呢?
带着这个疑问咱们来看下一段代码:
public static void request() {
mSubscription.request(96); //请求96个事件
}
public static void demo4() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "First requested = " + emitter.requested());
boolean flag;
for (int i = 0; ; i++) {
flag = false;
while (emitter.requested() == 0) {
if (!flag) {
Log.d(TAG, "Oh no! I can't emit value!");
flag = true;
}
}
emitter.onNext(i);
Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
此次的上游稍微复杂了一点点,首先仍然是个无限循环发事件,可是是有条件的,只有当上游的requested != 0的时候才会发事件,而后咱们调用request(96)去消费96个事件(为何是96而不是其余的数字先不要管),来看看运行结果吧:
D/TAG: onSubscribe
D/TAG: First requested = 128
D/TAG: emit 0 , requested = 127
D/TAG: emit 1 , requested = 126
D/TAG: emit 2 , requested = 125
...
D/TAG: emit 124 , requested = 3
D/TAG: emit 125 , requested = 2
D/TAG: emit 126 , requested = 1
D/TAG: emit 127 , requested = 0
D/TAG: Oh no! I can't emit value!复制代码
首先运行以后上游便会发送完128个事件,以后便不作任何事情,从打印的结果中咱们也能够看出这一点。
而后咱们调用request(96),这会让下游去消费96个事件,来看看运行结果吧:
D/TAG: onNext: 0
D/TAG: onNext: 1
...
D/TAG: onNext: 92
D/TAG: onNext: 93
D/TAG: onNext: 94
D/TAG: onNext: 95
D/TAG: emit 128 , requested = 95
D/TAG: emit 129 , requested = 94
D/TAG: emit 130 , requested = 93
D/TAG: emit 131 , requested = 92
...
D/TAG: emit 219 , requested = 4
D/TAG: emit 220 , requested = 3
D/TAG: emit 221 , requested = 2
D/TAG: emit 222 , requested = 1
D/TAG: emit 223 , requested = 0
D/TAG: Oh no! I can't emit value!复制代码
能够看到,当下游消费掉第96个事件以后,上游又开始发事件了,并且能够看到当前上游的requested的值是96(打印出来的95是已经发送了一个事件减一以后的值),最终发出了第223个事件以后又进入了等待区,而223-127 正好等于 96。
这是否是说明当下游每消费96个事件便会自动触发内部的request()去设置上游的requested的值啊!没错,就是这样,而这个新的值就是96。
朋友们能够手动试试请求95个事件,上游是不会继续发送事件的。
至于这个96是怎么得出来的(确定不是我猜的蒙的啊),感兴趣的朋友能够自行阅读源码寻找答案,对于初学者而言应该没什么必要,管它内部怎么实现的呢对吧。
好了今天的教程就到这里了!经过本节的学习,你们应该知道如何正确的去实现一个完整的响应式拉取了,在某一些场景
下,能够在发送事件前先判断当前的requested的值是否大于0,若等于0则说明下游处理不过来了,则须要等待,例以下面这个例子。
这个例子是读取一个文本文件,须要一行一行读取,而后处理并输出,若是文本文件很大的时候,好比几十M的时候,所有先读入内存确定不是明智的作法,所以咱们能够一边读取一边处理,实现的代码以下:
public static void main(String[] args) {
practice1();
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void practice1() {
Flowable
.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
try {
FileReader reader = new FileReader("test.txt");
BufferedReader br = new BufferedReader(reader);
String str;
while ((str = br.readLine()) != null && !emitter.isCancelled()) {
while (emitter.requested() == 0) {
if (emitter.isCancelled()) {
break;
}
}
emitter.onNext(str);
}
br.close();
reader.close();
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
s.request(1);
}
@Override
public void onNext(String string) {
System.out.println(string);
try {
Thread.sleep(2000);
mSubscription.request(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
System.out.println(t);
}
@Override
public void onComplete() {
}
});
}复制代码
运行的结果即是:
好了,本次的教程就到这里了,谢谢你们捧场!下节见,敬请期待!(PS: 我这么用心的写文章, 大家也不给个赞吗?)