在项目开发中经常会在一个页面中执行多个任务,多线程异步执行任务时哪一个任务先结束出结果这些并很差控制,譬如要进行几个并发的网络请求在都拿到结果后须要对数据进行处理,但愿的是用户只感知一次数据加载,若是不能作到这样,反馈到用户界面的体验也就很差了。经过 RxJava 的合并操做符咱们可以很方便的应对这些状况。java
咱们先建立以下三个不一样数据类型的 observable 做为示例数据数组
// 先建立三个不一样类型的示例 observable
private val observable = Observable.fromArray(1, 2, 3)
.concatMap(object : Function<Int, ObservableSource<Int>> {
override fun apply(t: Int): ObservableSource<Int> {
return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
}
})
private val observable1 = Observable.just("a", "b", "c")
.concatMap(object : Function<String, ObservableSource<String>> {
override fun apply(t: String): ObservableSource<String> {
return Observable.just(t).delay(400, TimeUnit.MILLISECONDS)
}
})
private val observable2 = Observable.fromArray(1.0f, 2.0f, 3.0f)
.concatMap(object : Function<Float, ObservableSource<Float>> {
override fun apply(t: Float): ObservableSource<Float> {
return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
}
})
复制代码
concat()
操做符将多个 Observable 按前后顺序进行合并,当合并的 Observable 泛型类型不一致时,事件流中的对象类型只能使用 Object
(java),Any
(kotlin)。经过以下示例代码能够直观的看到 concat 后事件的发送顺序:网络
Observable.concat(observable, observable1)
.subscribe(object : AppCallBack<Any>() {
override fun success(data: Any?) {
println("contact----->$data")
}
override fun fail(code: Long?, msg: String?) {
}
override fun finish(success: Boolean) {
merge()
}
})
复制代码
获得的输出结果为: 多线程
多于四个 observable 的合并: 两种方式并发
concat()
传入一个 Iterable<? extends ObservableSource<? extends T>
对象concatArray()
传入一个 Observable 数组。merge()
操做符将多个 Observable 无序合并,当合并的 Observable 泛型类型不一致时,事件流中的对象类型只能使用 Object
(java),Any
(kotlin)。app
Observable.merge(observable, observable1)
.subscribe(object : AppCallBack<Any>() {
override fun success(data: Any?) {
println("merge----->$data")
}
override fun fail(code: Long?, msg: String?) {
}
override fun finish(success: Boolean) {
zip()
}
})
复制代码
获得的输出结果为: 异步
merge()
后事件的发送是并发的无序的,先发送先处理
多于四个 observable 的合并: 两种方式ide
merge()
传入一个 Iterable<? extends ObservableSource<? extends T>
对象mergeArray()
传入一个 Observable 数组。上面的两种合并都是单个事件 item 订阅监听,若是想合并的事件 item 都接收到数据时处理这两个事件数据就须要使用 zip() 操做符。spa
Observable.zip(observable, observable1, object : BiFunction<Int, String, String> {
override fun apply(t1: Int, t2: String): String {
return "t1=$t1 t2=$t2"
}
}).subscribe(object : AppCallBack<String>() {
override fun success(data: String?) {
println("zip----->$data")
}
override fun fail(code: Long?, msg: String?) {
}
override fun finish(success: Boolean) {
}
})
复制代码
获得的输出结果为: 线程
多个 observable 的合并: zip()
的多事件合并就有点厉害了,支持九个 Observable 按数据类型合并,除了两个观察者对象合并时 zipper 是 BiFunction
其余的为 FunctionX
,X 为合并个数。若是多于九个观察者对象合并,与上面两种合并同样可使用 zipArray()
进行合并,可是合并后的观察结果是一个 Object 数组对象,须要本身判断数据类型
除了以上的建立 Observable 对象级别的合并操做符,还有一些事件流中的操做符,譬如第一段代码中的 concatMap
,在事件流前面添加其余事件的 startWith()
等。仍是比较完善的