Outlinejava
[TOC]程序员
在上一节中, 咱们提到了Flowable
和Backpressure
背压, 原本这一节的确是想讲这两个东西的,但是写到一半感受仍是差点火候,感受时机未到, 所以,这里先来作个准备工做, 先带你们学习zip
这个操做符, 这个操做符也是比较牛逼的东西了, 涉及到的东西也比较多, 主要是一些细节上的东西太多, 经过学习这个操做符,能够为咱们下一节的Backpressure
作个铺垫.api
照惯例咱们仍是先贴上一下比较正式的解释吧.服务器
Zip
经过一个函数将多个Observable发送的事件结合到一块儿,而后发送这些组合到一块儿的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable同样多的数据。app
咱们再用通俗易懂的图片来解释一下:ide
从这个图中能够看见, 此次上游和以往不一样的是, 咱们有两根水管了.函数
其中一根水管负责发送圆形事件
, 另一根水管负责发送三角形事件
, 经过Zip操做符, 使得圆形事件
和三角形事件
合并为了一个矩形事件
. 学习
下面咱们再来看看分解动做:字体
经过分解动做咱们能够看出:spa
分别从
两根水管里各取出一个事件
来进行组合, 而且一个事件只能被使用一次,
组合的顺序是严格按照事件发送的顺利
来进行的, 也就是说不会出现圆形1
事件和三角形B
事件进行合并, 也不可能出现圆形2
和三角形A
进行合并的状况.下游收到的事件数量
是和上游中发送事件最少的那一根水管的事件数量
相同. 这个也很好理解, 由于是从每一根水管
里取一个事件来进行合并, 最少的
那个确定就最早取完
, 这个时候其余的水管尽管还有事件
, 可是已经没有足够的事件来组合了, 所以下游就不会收到剩余的事件了.分析了大概的原理, 咱们仍是劳逸结合, 先来看看实际中的代码怎么写吧:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
咱们分别建立了两个上游水管, 一个发送1,2,3,4,Complete, 另外一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧:
D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: emit 2
D/TAG: emit 3
D/TAG: emit 4
D/TAG: emit complete1
D/TAG: emit A
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete复制代码
结果彷佛是对的... 可是总感受什么地方不对劲...
哪儿不对劲呢, 为何感受是水管一发送完了以后, 水管二才开始发送啊? 究竟是不是呢, 咱们来验证一下:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
此次咱们在每发送一个事件以后加入了一秒钟的延时, 来看看运行结果吧, 注意这是个GIF图:
(贴心的我怕你们看不清楚, 特地调成了老年字体呢)
阿西吧, 好像真的是先发送的水管一再发送的水管二呢, 为何会有这种状况呢? 由于咱们两根水管都是运行在同一个线程里, 同一个线程里执行代码确定有前后顺序呀.
所以咱们来稍微改一下, 不让他们在同一个线程, 不知道怎么切换线程的, 请掉头看前面几节.
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
好了, 此次咱们让水管都在IO线程里发送事件, 再来看看运行结果:
D/TAG: onSubscribe
D/TAG: emit A
D/TAG: emit 1
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: emit 2
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: emit 3
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete复制代码
GIF图:
诶! 这下就对了嘛, 两根水管同时开始发送, 每发送一个, Zip就组合一个, 再将组合结果发送给下游.
不对呀! 可能细心点的朋友又看出端倪了, 第一根水管明明发送了四个数据+一个Complete, 以前明明还有的, 为啥到这里没了呢?
这是由于咱们以前说了, zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里咱们第二根水管只发送了三个事件而后就发送了Complete, 这个时候尽管第一根水管还有事件4
和事件Complete
没有发送, 可是它们发不发送还有什么意义呢? 因此本着节约是美德的思想, 就干脆打断它的狗腿, 不让它发了.
至于前面的例子为何会发送, 刚才不是已经说了是!在!同!一!个!线!程!里!吗!!!!再问老子打死你!
有好事的程序员可能又要问了, 那我不发送Complete呢? 答案是显然的, 上游会继续发送事件, 可是下游仍然收不到那些多余的事件. 不信你能够试试.
学习了Zip的基本用法, 那么它在Android有什么用呢, 其实不少场景均可以用到Zip. 举个例子.
好比一个界面须要展现用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了以后才能进行展现, 这个时候就能够用Zip了:
首先分别定义这两个请求接口:
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}复制代码
接着用Zip来打包请求:
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo, UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});复制代码
好了, 本次的教程就到这里吧. 又到周末鸟, 下周见.