- 原文地址:RxJava vs. Kotlin Coroutines, a quick look
- 原文做者:Dávid Karnok
- 译文出自:掘金翻译计划
- 本文永久连接:github.com/xitu/gold-m…
- 译者:PhxNirvana
- 校对者:jamweak、jerry-shao
Kotlin 的协程是否让 RxJava 和 响应式编程光辉再也不 了呢?答案取决于你询问的对象。狂信徒和营销者们会坚决果断地是是是。若是真是这样的话,开发者们早晚会将 Rx 代码用协程重写一遍,抑或从一开始就用协程来写。 由于 协程 目前仍是实验性的,因此目前的诸如性能瓶颈之类的不足,都将逐渐解决。所以,相对于原生性能,本文的重点更在于易用性方面。html
假设有两个函数,f1 和 f2,用来模仿不可信的服务,两者都会在一段延迟以后返回一个数。调用这两个函数,将其返回值求和并呈现给用户。然而若是 500ms 以内没有返回的话,就再也不期望它会返回值了,所以咱们会在有限次数内取消并重试,直到超过次数最终放弃请求。前端
协程用起来就像是传统的 基于 ExecutorService 和 Future 的工具套装, 不一样点在于协程的底层是用的挂起、状态机和任务调度来代替线程阻塞的。java
首先,写两个函数来实现延迟操做:react
suspend fun f1(i: Int) {
Thread.sleep(if (i != 2) 2000L else 200L)
return 1;
}
suspend fun f2(i: Int) {
Thread.sleep(if (i != 2) 2000L else 200L)
return 2;
}
复制代码
与协程调度有关的函数须要加上 suspend 关键字并经过协程上下文来调用。为了演示上面的目的,若是传入参数不是 2 的时候,函数会延迟 2s。这样就会让超时检测将其结束掉,并在第三次尝试时在规定时间内成功。android
由于异步总会在结束时离开主线程,咱们须要一个方法来在业务逻辑完成前阻塞它,以防止直接退出 JVM。为了达到目的,可使用 runBlocking 在主线程中调用函数。ios
fun main(arg: Array<string>) = runBlocking <unit>{
coroutineWay()
reactiveWay()
}
suspend func coroutineWay() {
// TODO implement
}
func reactiveWay() {
// TODO implement
}</unit> </string>
复制代码
相比 RxJava 的函数式,用协程写出来的代码逻辑更简洁,并且代码看起来就像是线性和同步的同样。git
suspend fun coroutineWay() {
val t0 = System.currentTimeMillis()
var i = 0;
while (true) { // (1)
println("Attempt " + (i + 1) + " at T=" +
(System.currentTimeMillis() - t0))
var v1 = async(CommonPool) { f1(i) } // (2)
var v2 = async(CommonPool) { f2(i) }
var v3 = launch(CommonPool) { // (3)
Thread.sleep(500)
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
val te = TimeoutException();
v1.cancel(te); // (4)
v2.cancel(te);
}
try {
val r1 = v1.await(); // (5)
val r2 = v2.await();
v3.cancel(); // (6)
println(r1 + r2)
break;
} catch (ex: TimeoutException) { // (7)
println(" Crash at T=" +
(System.currentTimeMillis() - t0))
if (++i > 2) { // (8)
throw ex;
}
}
}
println("End at T="
+ (System.currentTimeMillis() - t0)) // (9)
}
复制代码
添加的一些输出是用来观察这段代码如何运行的。程序员
看起来挺简单的,尽管取消机制可能搞个大新闻:若是 v2 由于其余异常(好比网络缘由致使的 IOException)崩溃了呢?固然咱们得处理这些状况来确保任务能够在各类状况下被取消(举个栗子,试试 Kotlin 中的资源?)。然而,这种状况发生的背景是 v1 会及时返回,直到尝试 await 以前都没法取消 v1 或检测 v2 的崩溃。github
不要在乎那些细节,反正程序跑起来了,运行结果以下:编程
Attempt 1 at T=0
Cancelling at T=531
Crash at T=2017
Attempt 2 at T=2017
Cancelling at T=2517
Crash at T=4026
Attempt 3 at T=4026
3
End a
复制代码
一共进行了 3 次尝试,最后一次成功了,值是 3。是否是和剧本如出一辙的?一点都不快(此处有双关(译者并无看出来哪里有双关))! 咱们能够看到取消事件发生的大概时间,两次不成功的请求以后大约 500 ms ,然而异常捕获发生在大约 2000 ms 以后!咱们知道 cancel() 被成功调用是由于咱们捕获了异常。然而,看起来函数中的 Thread.sleep() 并无被打断,或者用协程的说法,没有在打断异常时恢复。这多是 CommonPool 的一部分,对 Future.cancel(false) 的调用处于基础结构中,抑或只是简单的程序限制。
接下来咱们看看 RxJava 2 是如何实现相同操做的。让人失望的是,若是函数前加了 suspended,就没法经过普通方式调用了,因此咱们还得用普通方法重写一下两个函数:
fun f3(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 1
}
fun f4(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 2
}
复制代码
为了匹配阻塞外部环境的功能,咱们采用 RxJava 2 Extensions 中的 BlockingScheduler 来提供返回到主线程的功能。顾名思义,它阻塞了一开始的调用者/主线程,直到有任务经过调度器来提交并运行。
fun reactiveWay() {
RxJavaPlugins.setErrorHandler({ }) // (1)
val sched = BlockingScheduler() // (2)
sched.execute {
val t0 = System.currentTimeMillis()
val count = Array<Int>(1, { 0 }) // (3)
Single.defer({ // (4)
val c = count[0]++;
println("Attempt " + (c + 1) +
" at T=" + (System.currentTimeMillis() - t0))
Single.zip( // (5)
Single.fromCallable({ f3(c) })
.subscribeOn(Schedulers.io()),
Single.fromCallable({ f4(c) })
.subscribeOn(Schedulers.io()),
BiFunction<Int, Int> { a, b -> a + b } // (6)
)
})
.doOnDispose({ // (7)
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
})
.timeout(500, TimeUnit.MILLISECONDS) // (8)
.retry({ x, e ->
println(" Crash at " +
(System.currentTimeMillis() - t0))
x < 3 && e is TimeoutException // (9)
})
.doAfterTerminate { sched.shutdown() } // (10)
.subscribe({
println(it)
println("End at T=" +
(System.currentTimeMillis() - t0)) // (11)
},
{ it.printStackTrace() })
}
}
复制代码
实现起来有点长,对那些不熟悉 lambda 的人来讲看起来可能有点可怕。
可能有人说,这比协程的实现复杂多了。不过……至少跑起来了:
Cancelling at T=4527
Attempt 1 at T=72
Cancelling at T=587
Crash at 587
Attempt 2 at T=587
Cancelling at T=1089
Crash at 1090
Attempt 3 at T=1090
Cancelling at T=1291
3
End at T=1292
复制代码
有趣的是,若是在 main 函数中同时调用两个函数的话,Cancelling at T=4527 是在调用 coroutineWay() 方法时打印出来的:尽管最后根本没有时间消耗,取消事件自身就浪费在没法中止的计算问题上,也所以在取消已经完成的任务上增长了额外消耗。
另外一方面,RxJava 至少及时地取消和重试了函数。然而,实际上也有几乎不必的 Cancelling at T=1291 被打印出来了。呐,没办法,写出来就这样了,或者说我懒吧,在 Single.timeout 中是这样实现的:若是没有延时就完成了的话,不管操做符真实状况如何,内部的 CompositeDisposable 代理了上游的 Disposable 并将其和操做符一块儿取消了。
最后呢,咱们经过一个小小的改进来看一下响应式设计的强大之处:若是只须要重试没有响应的函数的话,为何咱们要重试整个过程呢?改进方法也能够很容易地在 RxJava 中找到:将 doOnDispose().timeout().retry() 放到每个函数调用链中(也许用 transfomer 能够避免代码的重复):
val timeoutRetry = SingleTransformer<Int, Int> {
it.doOnDispose({
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
})
.timeout(500, TimeUnit.MILLISECONDS)
.retry({ x, e ->
println(" Crash at " +
(System.currentTimeMillis() - t0))
x < 3 && e is TimeoutException
})
}
// ...
Single.zip(
Single.fromCallable({ f3(c) })
.subscribeOn(Schedulers.io())
.compose(timeoutRetry)
,
Single.fromCallable({ f4(c) })
.subscribeOn(Schedulers.io())
.compose(timeoutRetry)
,
BiFunction<Int, Int> { a, b -> a + b }
)
// ...
复制代码
欢迎读者亲自动手实践并更新协程的实现来实现相同行为(顺即可以试试各类其余形式的取消机制)。 响应式编程的好处之一是大多数状况下都没必要去理会诸如线程、取消信息的传递和操做符的结构等恼人的东西。RxJava 之类的库已经设计好了 API 并将这些底层的大麻烦封装起来了,一般状况下,程序员只须要使用便可。
那么,协程到底有没有用呢?固然有用啦,但总的来讲,我仍是以为性能对其是极大的限制,同时,我也想知道协程能够怎么作才能总体取代响应式编程。
掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 Android、iOS、React、前端、后端、产品、设计 等领域,想要查看更多优质译文请持续关注 掘金翻译计划、官方微博、知乎专栏。