[译] 管中窥豹:RxJava 与 Kotlin 协程的对比

引言

Kotlin 的协程是否让 RxJava 和 响应式编程光辉再也不 了呢?答案取决于你询问的对象。狂信徒和营销者们会坚决果断地是是是。若是真是这样的话,开发者们早晚会将 Rx 代码用协程重写一遍,抑或从一开始就用协程来写。 由于 协程 目前仍是实验性的,因此目前的诸如性能瓶颈之类的不足,都将逐渐解决。所以,相对于原生性能,本文的重点更在于易用性方面。html

方案设计

假设有两个函数,f1f2,用来模仿不可信的服务,两者都会在一段延迟以后返回一个数。调用这两个函数,将其返回值求和并呈现给用户。然而若是 500ms 以内没有返回的话,就再也不期望它会返回值了,所以咱们会在有限次数内取消并重试,直到超过次数最终放弃请求。前端

协程的方式

协程用起来就像是传统的 基于 ExecutorServiceFuture 的工具套装, 不一样点在于协程的底层是用的挂起、状态机和任务调度来代替线程阻塞的。java

首先,写两个函数来实现延迟操做:react

suspend fun f1(i: Int) {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 1;
}

suspend fun f2(i: Int) {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 2;
}
复制代码

与协程调度有关的函数须要加上 suspend 关键字并经过协程上下文来调用。为了演示上面的目的,若是传入参数不是 2 的时候,函数会延迟 2s。这样就会让超时检测将其结束掉,并在第三次尝试时在规定时间内成功。android

由于异步总会在结束时离开主线程,咱们须要一个方法来在业务逻辑完成前阻塞它,以防止直接退出 JVM。为了达到目的,可使用 runBlocking 在主线程中调用函数。ios

fun main(arg: Array<string>) = runBlocking <unit>{

     coroutineWay()

     reactiveWay()
}

suspend func coroutineWay() {
    // TODO implement
}

func reactiveWay() {
    // TODO implement
}</unit> </string>
复制代码

相比 RxJava 的函数式,用协程写出来的代码逻辑更简洁,并且代码看起来就像是线性和同步的同样。git

suspend fun coroutineWay() {
    val t0 = System.currentTimeMillis()

    var i = 0;
    while (true) {                                       // (1)
        println("Attempt " + (i + 1) + " at T=" +
            (System.currentTimeMillis() - t0))

        var v1 = async(CommonPool) { f1(i) }             // (2)
        var v2 = async(CommonPool) { f2(i) }

        var v3 = launch(CommonPool) {                    // (3)
            Thread.sleep(500)
            println(" Cancelling at T=" +
                (System.currentTimeMillis() - t0))
            val te = TimeoutException();
            v1.cancel(te);                               // (4)
            v2.cancel(te);
        }

        try {
            val r1 = v1.await();                         // (5)
            val r2 = v2.await();
            v3.cancel();                                 // (6)
            println(r1 + r2)
            break;                                       
        } catch (ex: TimeoutException) {                 // (7)
            println(" Crash at T=" +
                (System.currentTimeMillis() - t0))
            if (++i > 2) {                               // (8)
                throw ex;
            }
        }
    }
    println("End at T=" 
        + (System.currentTimeMillis() - t0))             // (9)

}
复制代码

添加的一些输出是用来观察这段代码如何运行的。程序员

  1. 一般线性编程的状况下,是没有直接重试某个操做的快捷方法的,所以,咱们须要创建一个循环以及重试计数器 i
  2. 经过 async(CommonPool) 来执行异步操做,该函数能够在一些后台线程当即启动并执行函数。该函数会返回一个 Deferred,稍后会用到这个值。 若是用 await() 来获得 v1 做为最终值的话,当前线程将会挂起,另外,对 v2 的计算也不会开始,除非前一个恢复执行。除此之外,咱们还须要在超时的状况下取消当前操做的方法。参考步骤 3 和 5。
  3. 若是想让两个操做都超时的话,看起来咱们只能在另外一个异步线程中执行等待操做。launch(CommonPool) 方法会返回一个能够用在这种状况下的 Job 对象。 与 async 的区别是,这样执行没法返回值。之因此保存返回的 Job 是由于先前的异步操做可能及时返回,就再也不须要取消操做了。
  4. 在超时的任务中,咱们用 TimeoutException 来取消 v1v2 ,这将恢复任何已经挂起来等待两者返回的操做。
  5. 等待两个函数运行结果。若是超时,await 将从新扔出在第四步中使用的异常。
  6. 若是没有异常,则取消再也不须要执行的超时任务,并跳出循环。
  7. 若是有超时,则走老一套捕获异常并执行状态检查来肯定下一步操做。注意任何其余异常都会直接被抛出并退出循环。
  8. 万一是第三次或更屡次的尝试,直接扔出异常,什么都不作。
  9. 若是一切按剧本走,打印运行的总时间,而后退出当前函数。

看起来挺简单的,尽管取消机制可能搞个大新闻:若是 v2 由于其余异常(好比网络缘由致使的 IOException)崩溃了呢?固然咱们得处理这些状况来确保任务能够在各类状况下被取消(举个栗子,试试 Kotlin 中的资源?)。然而,这种状况发生的背景是 v1 会及时返回,直到尝试 await 以前都没法取消 v1 或检测 v2 的崩溃。github

不要在乎那些细节,反正程序跑起来了,运行结果以下:编程

Attempt 1 at T=0
    Cancelling at T=531
         Crash at T=2017
Attempt 2 at T=2017
    Cancelling at T=2517
         Crash at T=4026
Attempt 3 at T=4026
3
End a
复制代码

一共进行了 3 次尝试,最后一次成功了,值是 3。是否是和剧本如出一辙的?一点都不快(此处有双关(译者并无看出来哪里有双关))! 咱们能够看到取消事件发生的大概时间,两次不成功的请求以后大约 500 ms ,然而异常捕获发生在大约 2000 ms 以后!咱们知道 cancel() 被成功调用是由于咱们捕获了异常。然而,看起来函数中的 Thread.sleep() 并无被打断,或者用协程的说法,没有在打断异常时恢复。这多是 CommonPool 的一部分,对 Future.cancel(false) 的调用处于基础结构中,抑或只是简单的程序限制。

响应式

接下来咱们看看 RxJava 2 是如何实现相同操做的。让人失望的是,若是函数前加了 suspended,就没法经过普通方式调用了,因此咱们还得用普通方法重写一下两个函数:

fun f3(i: Int) : Int {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 1
}

fun f4(i: Int) : Int {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 2
}
复制代码

为了匹配阻塞外部环境的功能,咱们采用  RxJava 2 Extensions 中的 BlockingScheduler 来提供返回到主线程的功能。顾名思义,它阻塞了一开始的调用者/主线程,直到有任务经过调度器来提交并运行。

fun reactiveWay() {
    RxJavaPlugins.setErrorHandler({ })                         // (1)

    val sched = BlockingScheduler()                            // (2)
    sched.execute {
        val t0 = System.currentTimeMillis()
        val count = Array<Int>(1, { 0 })                       // (3)

        Single.defer({                                         // (4)
            val c = count[0]++;
            println("Attempt " + (c + 1) +
                " at T=" + (System.currentTimeMillis() - t0))

            Single.zip(                                        // (5)
                    Single.fromCallable({ f3(c) })
                        .subscribeOn(Schedulers.io()),
                    Single.fromCallable({ f4(c) })
                        .subscribeOn(Schedulers.io()),
                    BiFunction<Int, Int> { a, b -> a + b }               // (6)
            )
        })
        .doOnDispose({                                         // (7)
            println(" Cancelling at T=" + 
                (System.currentTimeMillis() - t0))
        })
        .timeout(500, TimeUnit.MILLISECONDS)                   // (8)
        .retry({ x, e ->
            println(" Crash at " + 
                (System.currentTimeMillis() - t0))
            x < 3 && e is TimeoutException                     // (9)
        })
        .doAfterTerminate { sched.shutdown() }                 // (10)
        .subscribe({
            println(it)
            println("End at T=" + 
                (System.currentTimeMillis() - t0))             // (11)
        },
        { it.printStackTrace() })
    }
}
复制代码

实现起来有点长,对那些不熟悉 lambda 的人来讲看起来可能有点可怕。

  1. 众所周知 RxJava 2 不管如何都会传递异常。在 Android 上,没法传递的异常会使应用崩溃,除非使用 RxJavaPlugins.setErrorHandler 来捕获。在此,由于咱们知道取消事件会打断 Thread.sleep() ,调用栈打出来的结果只会是一团乱麻,咱们也不会去注意这么多的异常。
  2. 设置 BlockingScheduler 并分发第一个执行的任务,以及剩下的主线程执行逻辑。 这是因为一旦锁住, start() 将会给主线程增长一个活锁状态,直到有任何随后事件打破锁定,主线程才会继续执行。
  3. 设置一个堆变量来记录重试次数。
  4. 一旦有经过 Single.defer 的订阅,计数器加一并打印 “Attempt” 字符串。该操做符容许保留每一个订阅的状态,这正是咱们在下游执行的 retry() 操做符所指望的。
  5. 使用 zip 操做符来异步执行两个元素的计算,两者都在后台线程执行本身的函数。
  6. 当两者都完成时,将结果相加。
  7. 为了让超时取消,使用 doOnDispose 操做符来打印当前状态和时间。
  8. 使用 timeout 操做符定义求和的超时。若是超时则会发送 TimeoutException(例如该场景下没有反馈时)。
  9. retry 操做符的重载提供了重试时间以及当前错误。打印错误后,应该返回 true ——也就是说必须执行重试——若是重试次数小于三而且当前错误是 TimeoutException 的话。任何其余错误只会终止而不是触发重试。
  10. 一旦完成,咱们须要关闭调度器,来让释放主线程并退出JVM。
  11. 固然,在完成前咱们须要打印求和结果以及整个操做的耗时。

可能有人说,这比协程的实现复杂多了。不过……至少跑起来了:

Cancelling at T=4527

Attempt 1 at T=72
    Cancelling at T=587
         Crash at 587
Attempt 2 at T=587
    Cancelling at T=1089
         Crash at 1090
Attempt 3 at T=1090
    Cancelling at T=1291
3
End at T=1292
复制代码

有趣的是,若是在 main 函数中同时调用两个函数的话,Cancelling at T=4527 是在调用 coroutineWay() 方法时打印出来的:尽管最后根本没有时间消耗,取消事件自身就浪费在没法中止的计算问题上,也所以在取消已经完成的任务上增长了额外消耗。

另外一方面,RxJava 至少及时地取消和重试了函数。然而,实际上也有几乎不必的 Cancelling at T=1291 被打印出来了。呐,没办法,写出来就这样了,或者说我懒吧,在 Single.timeout 中是这样实现的:若是没有延时就完成了的话,不管操做符真实状况如何,内部的 CompositeDisposable 代理了上游的 Disposable 并将其和操做符一块儿取消了。

结论

最后呢,咱们经过一个小小的改进来看一下响应式设计的强大之处:若是只须要重试没有响应的函数的话,为何咱们要重试整个过程呢?改进方法也能够很容易地在 RxJava 中找到:将 doOnDispose().timeout().retry() 放到每个函数调用链中(也许用 transfomer 能够避免代码的重复):

val timeoutRetry = SingleTransformer<Int, Int> { 
    it.doOnDispose({
        println(" Cancelling at T=" + 
            (System.currentTimeMillis() - t0))
    })
    .timeout(500, TimeUnit.MILLISECONDS)
    .retry({ x, e ->
        println(" Crash at " + 
            (System.currentTimeMillis() - t0))
        x < 3 && e is TimeoutException
    })
}

// ...

Single.zip(
    Single.fromCallable({ f3(c) })
        .subscribeOn(Schedulers.io())
        .compose(timeoutRetry)
    ,
    Single.fromCallable({ f4(c) })
        .subscribeOn(Schedulers.io())
        .compose(timeoutRetry)
    ,
    BiFunction<Int, Int> { a, b -> a + b }
)
// ...
复制代码

欢迎读者亲自动手实践并更新协程的实现来实现相同行为(顺即可以试试各类其余形式的取消机制)。 响应式编程的好处之一是大多数状况下都没必要去理会诸如线程、取消信息的传递和操做符的结构等恼人的东西。RxJava 之类的库已经设计好了 API 并将这些底层的大麻烦封装起来了,一般状况下,程序员只须要使用便可。

那么,协程到底有没有用呢?固然有用啦,但总的来讲,我仍是以为性能对其是极大的限制,同时,我也想知道协程能够怎么作才能总体取代响应式编程。


掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 AndroidiOSReact前端后端产品设计 等领域,想要查看更多优质译文请持续关注 掘金翻译计划官方微博知乎专栏

相关文章
相关标签/搜索