2019年5月5日发现官方有更新,我也更新一下。html
基于版本:java
<properties> <kotlin.version>1.3.30</kotlin.version> </properties> <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-coroutines-core</artifactId> <version>1.2.1</version> </dependency>
2018.02.23才写完,竟然用了三个月,简直差劲,并且还有不少机器翻译的东西。我还会慢慢磨砺这篇文章。git
-------------------------------------------如下原文-------------------------------------------------------github
本文基于官方文档翻译所得,才疏学浅,请多指正。文章较长,但愿我能坚持写完,放在这里也是对本身的一种鞭策。express
-------------------------------------------正片开始-------------------------------------------------------编程
本文经过一系列例子简单介绍了kotlinx.coroutines的核心功能小程序
Kotlin做为一种编程语言,只提供最小的底层API的标准库来支持其余库使用协程。和其余拥有相似功能(指协程)的语言不同,async 和 await 不是语言的关键字,甚至还不是标准库的一部分。后端
kotlinx.coroutines 就是一个这样功能丰富的库,它包含了一些高级的关键字对协程的支持,包括async 和 await,你须要添加kotlinx-coroutines-core这个jar在你的项目中开启对协程的支持。api
本节涵盖了协程的基本概念。安全
fun main(args: Array<String>) { GlobalScope.launch{ // 开启一个协程 delay(1000L) // 延迟一秒,非阻塞,和主线程并行的(默认时间单位是毫秒) println("World!") // 延迟以后打印 } println("Hello,") // 主程序继续执行,由于上面协程是不阻塞的,因此这里会当即执行 Thread.sleep(2000L) // 让主程序休眠2秒,保持虚拟机,给协程执行完毕留出时间 }
执行结果以下:
Hello, World!
实际上、协程是一种轻量的线程,经过launch关键字启动。在这里,咱们将在GlobalScope中启动一个新的协同程序,这意味着新协程的生命周期仅受整个应用程序的生命周期的限制。
你能够用Thread{...}替换GlobalScope.launch{...},用Thread.sleep{...}替换delay{...}来实现相同的功能。若是你直接替换GlobalScope.launch为Thread,系统会报错以下:
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
由于delay是一种特殊的暂停方法,它不会阻塞线程,可是会暂停协程,且只能在协程中使用。
第一个例子在主进程中混合了非阻塞的delay(...)和阻塞的Thread.sleep(...),可能让人混乱。如今让咱们用runBlocking协程生成器来明确阻塞。
fun main(args: Array<String>) { GlobalScope.launch { //在后台开启一个新的协程,并执行 delay(1000L) println("World!") } println("Hello,") // 主线程会在这里当即执行 runBlocking { // 可是这个表达式会阻塞主线程 delay(2000L) // 延迟2秒,保证虚拟机可以等到子协程跑完 } }
运行结果是相同的,不过这里只使用了非阻塞方法delay。
调用runblocking的主线程会被阻塞,直到runblocking内部的协程完成。
这个例子还有一种惯用的写法,用runBlocking包装执行主程序。
fun main(args: Array<String>) = runBlocking<Unit> { // 开启主协程 GlobalScope.launch { // 启动一个新的协程 delay(1000L) println("World!") } println("Hello,") // 主协程当即执行到这里 delay(2000L) // 延迟2秒,保证虚拟机可以等到子协程跑完 }
这里runBlocking<Unit> { ... }做为一种适配器,开启一个顶级协程。咱们明确地指定了它的返回类型Unit,由于kotlin中一个格式良好的主函数必须返回Unit。
这是也能够对暂停程序进行单元测试。
class MyTest { @Test fun testMySuspendingFunction() = runBlocking<Unit> { // 这里你能够用你喜欢的断言方式,来测试暂停程序 } }
经过延迟一小段时间来等待后台的协程结束,并非一个好的方法。(指delay以保证虚拟机没有死掉)。让咱们以一种非阻塞的方式明确的等待一个后台协程结束。
fun main(args: Array<String>) = runBlocking<Unit> { val job = GlobalScope.launch { // 开启一个协程,并赋给它的一个引用 delay(1000L) println("World!") } println("Hello,") job.join() // 等待直到子协程结束 }
结果是同样的,但主协程的代码不以任何方式与后台做业的持续时间相关联,好多了!
实际使用协程仍然须要一些东西,当咱们使用 GlobalScope.launch时
, 咱们建立了一个顶级的协程。即便它是轻量的,可是仍然会消耗内存资源。
让咱们提取launch{...}里的代码块放进一个单独的方法里,这时候你须要一个使用suspend修饰的新方法,这是你的第一个暂停方法。暂停方法能够和其余方法同样在协程中使用,可是不一样之处在于,暂停方法能够调用其余的暂停方法,好比下面例子中的delay,来暂停一个协程。
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch { doWorld() } println("Hello,") job.join() } // 这是你的第一个暂停方法 suspend fun doWorld() { delay(1000L) println("World!") }
执行下面的代码:
fun main(args: Array<String>) = runBlocking<Unit> { val jobs = List(100_000) { // 开启大量协程,并对她们赋值到每个Job中 launch { delay(1000L) print(".") } } jobs.forEach { it.join() } //等待全部的Job结束 }
它会启动10万个协程,一秒后每个协程都打印一个点。你能够尝试用Thread来重写这段代码,会发生什么呢?(极可能会触发内存溢出异常,固然和你的电脑配置有关,反正个人是崩了=。=)
下面的代码会启动一段长时间运行的协程,每隔两秒打印一句“我在睡觉”,而后在一段延迟以后从主程序退出。
fun main(args: Array<String>) = runBlocking<Unit> { launch { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } } delay(1300L) // 延迟以后退出 }
运行以后能够看到,结果是执行了三行打印就退出了。
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ...
因此说,激活的协程,不会保持进程存活,她们更像后台线程。
这一节讲协程的取消和超时。
在一个小程序里,在main方法里return看上去是一种隐式关闭协程的好方法。在一个大的,长期执行的程序里,你须要更细粒度的控制。launch方法返回一个Job能够控制取消正在执行的协程:
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } } delay(1300L) // 延迟一下 println("main: I'm tired of waiting!") job.cancel() // 取消Job job.join() // 等待Job完成 println("main: Now I can quit.") }
输出为:
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.
当主协程执行cancel,咱们再也不看到子协程的打印,由于它被取消了。还有一个方法cancelAndJoin方法,包含了cancel和join这两个操做。
协程的取消是须要配合的,协程代码必须配合才能被取消!kotlinx.coroutines中的全部暂停函数都是能够取消的。她们检查协程的取消操做,并在取消的时候抛出CancellationException异常。可是,若是一个协程正在执行计算工做,而且没有检查取消,那么它不能被取消,以下面的例子所示:
fun main(args: Array<String>) = runBlocking<Unit> { val startTime = System.currentTimeMillis() val job = launch { var nextPrintTime = startTime var i = 0 while (i < 5) { // 循环计算,只是浪费CPU // 每秒打印2次信息 if (System.currentTimeMillis() >= nextPrintTime) { println("I'm sleeping ${i++} ...") nextPrintTime += 500L } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
运行它,你会发现即便在取消以后它仍继续打印“我正在睡觉”,直到五次迭代完成后才自行完成。
有两种方法让计算代码可取消。第一个是按期调用一个检查取消的挂起方法。有一个yield函数是一个很好的选择。另外一个是明确检查取消状态。让咱们尝试后面这种方法。
用isActive替换前面的例子中的 i < 5 并从新运行。
fun main(args: Array<String>) = runBlocking<Unit> { val startTime = System.currentTimeMillis() val job = launch { var nextPrintTime = startTime var i = 0 while (isActive) { // cancellable computation loop // print a message twice a second if (System.currentTimeMillis() >= nextPrintTime) { println("I'm sleeping ${i++} ...") nextPrintTime += 500L } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
正如你所看到的,如今这个循环被取消了。 isActive是一个CoroutineScope对象在协程代码中的属性。
可取消的暂停函数在取消时抛出CancellationException,能够利用这一点,在取消协程的时候,处理一些资源问题。例如,try{...}finally {...},Kotlin的use函数,当协程被取消时,会正常执行它们的finally操做。
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch { try { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } } finally { println("I'm running finally") } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
无论是cancel仍是cancelAndJoin都会等待finally里的代码执行完毕。因此上面的例子运行结果是:
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... main: I'm tired of waiting! I'm running finally main: Now I can quit.
任何试图在前面的例子的finally块中使用暂停函数的操做都会致使CancellationException,由于运行这个代码的协程被取消。一般,这不是问题,由于全部关闭操做(关闭文件,取消做业或关闭任何类型的通讯通道)一般都是非阻塞的,不涉及任何挂起功能。可是,在极少数状况下,当您须要在取消的协程中暂停时,您可使用run函数和NonCancellable上下文来运行相应的代码(NonCancellable){...},以下例所示:
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch { try { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } } finally { run(NonCancellable) { println("I'm running finally") delay(1000L) println("And I've just delayed for 1 sec because I'm non-cancellable") } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
在实践中取消协同执行的最显著的缘由是超时。虽然您能够对相应做业的引用手动跟踪,并启动一个单独的协程在延迟以后取消所跟踪的协程,可是可使用Timeout功能来执行此操做。看下面的例子:
fun main(args: Array<String>) = runBlocking<Unit> { withTimeout(1300L) { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } } }
它产生如下输出:
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
withTimeout引起的TimeoutCancellationException是CancellationException的子类,咱们以前没有看到它在控制台上打印的堆栈跟踪。这是由于在协程的取消中,CancellationException被认为是协程完成的正常缘由。然而,在这个例子中,咱们已经在主函数内部使用了Timeout。
因为取消只是一个例外,全部的资源将以一般的方式关闭。你能够在try {...} catch(e:TimeoutCancellationException){...}中使用timeout来封装代码,若是你须要在任何类型的超时
内作一些额外的操做,或者使用与withTimeout相似的TimeoutOrNull函数,但在超时时返回null,而不是抛出异常:
fun main(args: Array<String>) = runBlocking<Unit> { val result = withTimeoutOrNull(1300L) { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } "Done" // 在产生这个结果以前会被取消 } println("Result is $result") }
运行此代码时不会触发异常:
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Result is null
这一节介绍各类编写挂起程序的方法
假设咱们在别处定义了两个挂起函数,它们执行某种相似远程服务调用或计算的有用操做。咱们只是伪装他们是有用的,但实际上每一个只是为了这个例子的目的而拖延一秒钟:
suspend fun doSomethingUsefulOne(): Int { delay(1000L) // pretend we are doing something useful here return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) // pretend we are doing something useful here, too return 29 }
若是须要依次调用它们,咱们该怎么办?首先执行doSomethingUsefulOne,而后执行doSomethingUsefulTwo,而后计算他们结果的总和?在实践中,若是咱们使用第一个函数的结果来决定是否须要调用第二个函数或决定如何调用它,那么咱们会这样作。咱们只是使用正常的顺序调用,由于协程中的代码与常规代码同样,默认状况下是连续的。如下示例经过测量执行两个挂起功能所需的总时间来演示它:
fun main(args: Array<String>) = runBlocking<Unit> { val time = measureTimeMillis { val one = doSomethingUsefulOne() val two = doSomethingUsefulTwo() println("The answer is ${one + two}") } println("Completed in $time ms") }
结果像这样:
The answer is 42 Completed in 2017 ms
若是doSomethingUsefulOne和doSomethingUsefulTwo的调用之间没有依赖关系,而且咱们但愿经过同时执行这两个方法来更快地获得答案?使用async吧
从概念上讲,async就像launch。它启动一个单独的协程,它是一个与全部其余协程同时工做的轻量级线程。不一样之处在于启动会返回一个Job,而不会带来任何结果值,而异步返回Deferred - 一个轻量级的非阻塞对象,表示稍后提供结果。你可使用延迟值的.await()来获得它的最终结果,可是Deferred也是一个Job,因此你能够根据须要取消它。
fun main(args: Array<String>) = runBlocking<Unit> { val time = measureTimeMillis { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") }
运行结果以下:
The answer is 42 Completed in 1017 ms
这是两倍的速度,由于咱们同时执行两个协程。请注意,协程的并发老是显式的。
有一个惰性选项,使用可选的启动参数启动异步,值为CoroutineStart.LAZY。它只有在须要某个结果的时候才启动协程,或者启动了一个start()或者await()函数。对应的例子以下:
fun main(args: Array<String>) = runBlocking<Unit> { val time = measureTimeMillis { val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() } val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") }
结果以下:
The answer is 42 Completed in 2017 ms
因此,咱们又回到顺序执行,由于咱们先开始await()一个,而后开始await()第二个。这不是懒加载的预期用例。在计算值涉及暂停功能的状况下,它被设计为替代标准的懒加载函数。
咱们能够定义使用异步协程生成器异步调用doSomethingUsefulOne和doSomethingUsefulTwo的异步风格函数。使用“Async”后缀或“async”前缀来命名这些函数是一种很好的风格,以突出显示这样的事实,即它们只启动异步计算,而且须要使用获得的延迟值来得到结果。
// The result type of asyncSomethingUsefulOne is Deferred<Int> fun asyncSomethingUsefulOne() = async { doSomethingUsefulOne() } // The result type of asyncSomethingUsefulTwo is Deferred<Int> fun asyncSomethingUsefulTwo() = async { doSomethingUsefulTwo() }
请注意,这些asyncXXX函数不是暂停函数。他们能够从任何地方使用。然而,它们的使用老是意味着它们的行为与调用代码的异步(这里意味着并发)执行。
如下示例显示了它们在协程以外的用法:
//注意,在这个例子中咱们没有在`main`的右边有'runBlocking` fun main(args: Array<String>) { val time = measureTimeMillis { //咱们能够在协程以外启动异步操做 val one = asyncSomethingUsefulOne() val two = asyncSomethingUsefulTwo() //但等待结果必须涉及暂停或挂起。 //这里咱们使用`runBlocking {...}`在等待结果的同时阻塞主线程 runBlocking { println("The answer is ${one.await() + two.await()}") } } println("Completed in $time ms") }
协程老是在一些上下文中执行,它由Kotlin标准库中定义的CoroutineContext类型的值表示。
协程的上下文是一组元素。主要元素是咱们之前见过的协程的Job,以及本节中介绍的调度程序
协程上下文包括协程调度程序(参见CoroutineDispatcher),该协程肯定相应协程执行的一个线程或多个线程。协程调度程序能够将协程执行限制在一个特定的线程中,调度它到一个线程池中,或者让它无限制的运行。
全部协程构建器,如launch和async 接受一个可选的CoroutineContext参数,可用于显式指定调度程序的新协程和其余上下文元素
试试下面的例子:
fun main(args: Array<String>) = runBlocking<Unit> { val jobs = arrayListOf<Job>() jobs += launch(Unconfined) { //无限制 和主线程一块儿运行 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}") } jobs += launch(coroutineContext) { //父级的上下文,runBlocking协程 println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}") } jobs += launch(CommonPool) { //将被分派到ForkJoinPool.commonPool(或等同的) println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}") } jobs += launch(newSingleThreadContext("MyOwnThread")) { //将得到本身的新线程 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}") } jobs.forEach { it.join() } }
它会产生如下输出(可能顺序不一样):
'Unconfined': I'm working in thread main 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1 'newSTC': I'm working in thread MyOwnThread 'coroutineContext': I'm working in thread main
咱们在前面部分中使用的默认调度程序由DefaultDispatcher表示,这与当前实现中的CommonPool相同。所以,launch {...}与launch(DefaultDispatcher){...}以及launch(CommonPool){...}相同。
父coroutineContext和Unconfined上下文之间的区别将在稍后显示。
请注意,newSingleThreadContext建立一个新的线程,这是很耗费资源的。在真正的应用程序中,它必须被释放,再也不须要时,使用close函数,或者存储在顶层变量中,并在整个应用程序中重用。
Unconfined协程调度程序在调用者线程中启动协程,但仅在第一个暂停点以前。暂停后,它将在被调用的暂停功能彻底肯定的线程中恢复。协程不消耗CPU时间,也不更新限于特定线程的任何共享数据(如UI)时,无限制的分派器是合适的。
另外一方面,经过CoroutineScope接口在任何协程的块内可用的coroutineContext属性是对此特定协程的上下文的引用。这样,能够继承父上下文。 runBlocking协同程序的默认调度程序特别限于调用程序线程,所以继承它的做用是经过可预测的FIFO调度将执行限制在该线程中。
fun main(args: Array<String>) = runBlocking<Unit> { val jobs = arrayListOf<Job>() jobs += launch(Unconfined) { // not confined -- will work with main thread println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}") delay(500) println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}") } jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}") delay(1000) println("'coroutineContext': After delay in thread ${Thread.currentThread().name}") } jobs.forEach { it.join() } }
执行结果:
'Unconfined': I'm working in thread main 'coroutineContext': I'm working in thread main 'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor 'coroutineContext': After delay in thread main
所以,继承了runBlocking {...}的coroutineContext的协程在主线程中继续执行,而非限制的协程在延迟函数使用的默认执行程序线程中恢复。
协程能够在一个线程上挂起,并在带有Unconfined dispatcher的另外一个线程上或使用默认的多线程调度程序恢复。即便使用单线程调度程序,也很难弄清楚协程在什么地方作什么,在什么地方,何时作什么。使用线程调试应用程序的经常使用方法是在每一个日志语句的日志文件中打印线程名称。日志框架一般支持此功能。在使用协程时,单独的线程名称不会提供不少上下文,所以kotlinx.coroutines包含调试工具以使其更容易
使用-Dkotlinx.coroutines.debug JVM选项运行如下代码:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun main(args: Array<String>) = runBlocking<Unit> { val a = async(coroutineContext) { log("I'm computing a piece of the answer") 6 } val b = async(coroutineContext) { log("I'm computing another piece of the answer") 7 } log("The answer is ${a.await() * b.await()}") }
这里有三个协程,主协程(#1),runBlocking的这个,还有两个计算延迟值a(#2)和b(#3)的协程,它们都在runBlocking的上下文中执行,而且被限制在主线程中。执行结果以下:
[main @coroutine#2] I'm computing a piece of the answer [main @coroutine#3] I'm computing another piece of the answer [main @coroutine#1] The answer is 42
log函数在方括号中打印线程的名称,您能够看到它是主线程,后面跟着当前正在执行的协程的标识符。打开调试模式时,此标识符将连续分配给全部建立的协程。
你能够在newCoroutineContext函数的文档中阅读更多关于调试工具的信息。
使用-Dkotlinx.coroutines.debug JVM选项运行如下代码:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun main(args: Array<String>) { newSingleThreadContext("Ctx1").use { ctx1 -> newSingleThreadContext("Ctx2").use { ctx2 -> runBlocking(ctx1) { log("Started in ctx1") run(ctx2) { log("Working in ctx2") } log("Back to ctx1") } } } }
它演示了几种新技术。一个是使用带有明确指定的上下文的runBlocking,另外一个是使用run函数来改变协程的上下文,而仍然保持在同一个协程中,你能够在下面的输出中看到:
[Ctx1 @coroutine#1] Started in ctx1 [Ctx2 @coroutine#1] Working in ctx2 [Ctx1 @coroutine#1] Back to ctx1
请注意,该示例也使用Kotlin标准库中的use函数来释放在再也不须要的状况下使用newSingleThreadContext建立的线程。
协程的工做(Job)是其上下文的一部分。协程可使用coroutineContext [Job]表达式从它本身的上下文中检索它:
fun main(args: Array<String>) = runBlocking<Unit> { println("My job is ${coroutineContext[Job]}") }
它在调试模式下运行时会产生如下相似的结果:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
所以,在 CoroutineScope中的isActive只是一个coroutineContext [Job] !! isActive的快捷方式。
当协程的coroutineContext被用来启动另外一个协程时,新协程的Job就成为了父协程的子Job。当父协程被取消时,它的全部子协程也被递归地取消
fun main(args: Array<String>) = runBlocking<Unit> { // launch a coroutine to process some kind of incoming request val request = launch { // it spawns two other jobs, one with its separate context val job1 = launch { println("job1: I have my own context and execute independently!") delay(1000) println("job1: I am not affected by cancellation of the request") } // and the other inherits the parent context val job2 = launch(coroutineContext) { delay(100) println("job2: I am a child of the request coroutine") delay(1000) println("job2: I will not execute this line if my parent request is cancelled") } // request completes when both its sub-jobs complete: job1.join() job2.join() } delay(500) request.cancel() // cancel processing of the request delay(1000) // delay a second to see what happens println("main: Who has survived request cancellation?") }
输出以下:
job1: I have my own context and execute independently! job2: I am a child of the request coroutine job1: I am not affected by cancellation of the request main: Who has survived request cancellation?
协程的上下文可使用+运算符来组合。右侧的上下文替换左侧上下文的相关条目。例如,父协程的Job能够被继承,而调度器被替换:
fun main(args: Array<String>) = runBlocking<Unit> { // start a coroutine to process some kind of incoming request val request = launch(coroutineContext) { // use the context of `runBlocking` // spawns CPU-intensive child job in CommonPool !!! val job = launch(coroutineContext + CommonPool) { println("job: I am a child of the request coroutine, but with a different dispatcher") delay(1000) println("job: I will not execute this line if my parent request is cancelled") } job.join() // request completes when its sub-job completes } delay(500) request.cancel() // cancel processing of the request delay(1000) // delay a second to see what happens println("main: Who has survived request cancellation?") }
这个代码的预期结果是:
job: I am a child of the request coroutine, but with a different dispatcher main: Who has survived request cancellation?
父协程老是等待全部的子协程完成。 父协程没必要显式地跟踪它启动的全部子节点,而且没必要使用Job.join等待它们到最后:
fun main(args: Array<String>) = runBlocking<Unit> { // launch a coroutine to process some kind of incoming request val request = launch { repeat(3) { i -> // launch a few children jobs launch(coroutineContext) { delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms println("Coroutine $i is done") } } println("request: I'm done and I don't explicitly join my children that are still active") } request.join() // wait for completion of the request, including all its children println("Now processing of the request is complete") }
结果以下:
request: I'm done and I don't explicitly join my children that are still active Coroutine 0 is done Coroutine 1 is done Coroutine 2 is done Now processing of the request is complete
当你只关心来自同一个协程的日志记录,自动分配的ID是很好的。可是,当协程与特定请求的处理或执行一些特定的后台任务相关联时,为了调试目的,最好明确地命名它。 CoroutineName上下文元素提供与线程名称相同的功能。在打开调试模式时,它将显示在执行此协程的线程名称中。
下面的例子演示了这个概念:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun main(args: Array<String>) = runBlocking(CoroutineName("main")) { log("Started main coroutine") // run two background value computations val v1 = async(CoroutineName("v1coroutine")) { delay(500) log("Computing v1") 252 } val v2 = async(CoroutineName("v2coroutine")) { delay(1000) log("Computing v2") 6 } log("The answer for v1 / v2 = ${v1.await() / v2.await()}") }
它使用-Dkotlinx.coroutines.debug JVM选项生成的输出相似于:
[main @main#1] Started main coroutine [ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1 [ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2 [main @main#1] The answer for v1 / v2 = 42
让咱们把关于上下文,子协程和Job的知识放在一块儿。假设咱们的应用程序有一个生命周期对象,可是这个对象不是一个协程。例如,咱们正在编写一个Android应用程序,并在Android活动的上下文中启动各类协同程序,以执行异步操做以获取和更新数据,执行动画等。全部这些协程必须在活动被销毁时被取消,以免内存泄漏。
咱们能够经过建立与咱们活动的生命周期相关的Job实例来管理协同程序的生命周期。Job实例是使用Job()工厂函数建立的,如如下示例所示。为了方便起见,咱们能够编写launch(coroutineContext,parent = job)(貌似在最新的1.2版本上,这个方法有更新,这样用会报错,译者注),而不是使用launch(coroutineContext+job)表达式来明确父Job正在被使用。
如今,一个Job.cancel的调用取消了咱们启动的全部子项。并且,Job.join等待全部这些完成,因此咱们也能够在这个例子中使用cancelAndJoin 。
fun main(args: Array<String>) = runBlocking<Unit> { val job = Job() // create a job object to manage our lifecycle // now launch ten coroutines for a demo, each working for a different time val coroutines = List(10) { i -> // they are all children of our job object launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc println("Coroutine $i is done") } } println("Launched ${coroutines.size} coroutines") delay(500L) // delay for half a second println("Cancelling the job!") job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete }
这个例子的输出是:
Launched 10 coroutines Coroutine 0 is done Coroutine 1 is done Cancelling the job!
正如你所看到的,只有前三个协程已经打印了一条消息,而其余的则被一个job.cancelAndJoin()调用取消了。因此咱们在咱们假设的Android应用程序中须要作的是在建立活动时建立父做业对象,将其用于子协程,并在活动被销毁时将其取消。咱们不能在Android生命周期的状况下加入它们,由于它是同步的,可是当构建后端服务以确保有限的资源使用时,这种加入能力是有用的 。
延迟值提供了在协程之间传递单个值的简便方法。通道提供了一种方式来传递数据流。
通道在概念上与阻塞队列很是类似。一个关键的区别是,用send替换了队列的put,用receive替换了队列的take。
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>() launch { // this might be heavy CPU-consuming computation or async logic, we'll just send five squares for (x in 1..5) channel.send(x * x) } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!") }
结果以下:
1 4 9 16 25 Done!
不像一个队列,一个通道能够关闭,表示没有更多的元素来了。在接收端,使用常规的for循环来接收来自通道的元素是很方便的。
从概念上讲, close就像发送一个特殊的令牌给该通道。一旦接收到这个关闭标记,迭代就会中止,这样就保证了在关闭以前能收到全部先前发送的元素。
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) channel.close() // we're done sending } // here we print received values using `for` loop (until the channel is closed) for (y in channel) println(y) println("Done!") }
协程产生一系列元素的模式是经常使用的,这是一般在并发代码中的生产者 - 消费者模式的一部分。你能够把这样一个生产者抽象成一个以通道为参数的函数,可是必须从函数返回结果,这与常识是相反的。
有一个命名为produce的便利的协程生成器,能够很容易地在生产者端作到这一点,而且有一个扩展方法consumeEach,能够取代消费者端的for循环:
fun produceSquares() = produce<Int> { for (x in 1..5) send(x * x) } fun main(args: Array<String>) = runBlocking<Unit> { val squares = produceSquares() squares.consumeEach { println(it) } println("Done!") }
管道是协程的一种生成模式,多是无线的数据流:
fun produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // infinite stream of integers starting from 1 }
另外一个协程或多个协程则消费这个数据流,作一些处理,并产生一些其余的结果。在下面的例子中,计算了数字的平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> { for (x in numbers) send(x * x) }
主程序启动并链接整个管道:
fun main(args: Array<String>) = runBlocking<Unit> { val numbers = produceNumbers() // produces integers from 1 and on val squares = square(numbers) // squares integers for (i in 1..5) println(squares.receive()) // print first five println("Done!") // we are done squares.cancel() // need to cancel these coroutines in a larger app numbers.cancel() }
本例中咱们不须要cancel这些协程,由于前文提到的《协程像是一种守护线程》,可是在一个更大的应用程序中,若是咱们再也不须要它,咱们须要中止咱们的管道。或者,咱们能够将管道协程做为《主协程的子程序》运行,如如下示例所示:
让咱们经过一个使用一系列协程来生成素数的例子,将管道推向极致。咱们从无限的数字序列开始。此次咱们引入一个明确的上下文参数并传递给生成器,以便调用者能够控制咱们的协程运行的位置
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) { var x = start while (true) send(x++) // infinite stream of integers from start }
如下管道阶段过滤输入的数字流,移除可由给定质数整除的全部数字:
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) { for (x in numbers) if (x % prime != 0) send(x) }
如今咱们经过从2开始编号的流来创建咱们的管道,从当前通道取一个素数,而且为每一个找到的素数启动新的管道阶段
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
如下示例打印前十个素数,在主线程的上下文中运行整个管道。因为全部的协程都是在其coroutineContext中做为主要runBlocking协程的子进程启动的,因此咱们没必要保留咱们开始的全部协程的明确列表。咱们使用cancelChildren扩展函数来取消全部的子协同程序:
fun main(args: Array<String>) = runBlocking<Unit> { var cur = numbersFrom(coroutineContext, 2) for (i in 1..10) { val prime = cur.receive() println(prime) cur = filter(coroutineContext, cur, prime) } coroutineContext.cancelChildren() // cancel all children to let main finish }
输出以下:
2 3 5 7 11 13 17 19 23 29
请注意,您可使用标准库中的buildIterator协同构建器来构建相同的管道。用 buildIterator
替换 produce
,用yield替换send,用next代替reveive,使用Iterator的ReceiveChannel,摆脱上下文。你也不须要runBlocking。可是,如上所示使用通道的管道的好处是,若是在CommonPool上下文中运行它,它实际上可使用多个CPU内核。
不管如何,这是一个很是不切实际的方式来找到素数。实际上,管道确实涉及到一些其余暂停调用(如异步调用远程服务),而且这些管道没法使用buildSeqeunce / buildIterator进行构建,由于它们不容许任意暂停,与异步的 produce
是不一样的。
多个协程能够从同一个通道接收,在他们之间分配工做。让咱们从一个按期生成整数的生产者协程开始(每秒十个数字):
fun produceNumbers() = produce<Int> { var x = 1 // start from 1 while (true) { send(x++) // produce next delay(100) // wait 0.1s } }
那么咱们能够有几个处理器协同程序。在这个例子中,他们只是打印他们的ID和收到的号码:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { channel.consumeEach { println("Processor #$id received $it") }
}
如今让咱们启动五个处理程序,让他们执行一秒钟。看看结果:
fun main(args: Array<String>) = runBlocking<Unit> { val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // cancel producer coroutine and thus kill them all }
输出将相似于下面,尽管接收每一个特定整数的处理器ID可能不一样:
Processor #2 received 1 Processor #4 received 2 Processor #0 received 3 Processor #1 received 4 Processor #3 received 5 Processor #2 received 6 Processor #4 received 7 Processor #0 received 8 Processor #1 received 9 Processor #3 received 10
请注意,取消生产者协同程序会关闭其通道,从而最终终止对处理器协程正在进行的通道的迭代。
多个协程可能会发送到同一个通道。例如,咱们有一个字符串的通道,以及一个暂停方法,该方法以指定的延迟向该通道重复发送指定的字符串:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } }
如今,让咱们看看若是咱们启动一些发送字符串的协程,会发生什么(在这个例子中,咱们将它们做为主协程的子节点在主线程的上下文中启动):
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<String>() launch(coroutineContext) { sendString(channel, "foo", 200L) } launch(coroutineContext) { sendString(channel, "BAR!", 500L) } repeat(6) { // receive first six println(channel.receive()) } coroutineContext.cancelChildren() // cancel all children to let main finish }
输出以下:
foo foo BAR! foo foo BAR!
目前的频道没有缓冲区。当发送者和接收者彼此相遇(又名会合)时,无缓冲的信道传送元素。若是发送先被调用,那么它被挂起直到接收被调用,若是接收被首先调用,它被挂起直到发送被调用。
Channel()工厂函数和产生构建器都使用一个可选的容量参数来指定缓冲区大小。缓冲区容许发送者在挂起以前发送多个元素,相似于具备指定容量的阻塞队列,当缓冲区已满时阻塞队列被阻塞。
看看下面的代码的行为:
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>(4) // create buffered channel val sender = launch(coroutineContext) { // launch sender coroutine repeat(10) { println("Sending $it") // print before sending each element channel.send(it) // will suspend when buffer is full } } // don't receive anything... just wait.... delay(1000) sender.cancel() // cancel sender coroutine }
它使用四个容量的缓冲通道打印“发送”五次:
Sending 0 Sending 1 Sending 2 Sending 3 Sending 4
前四个元素被添加到缓冲区,而且发送者在尝试发送第五个元素时挂起。
对频道的发送和接收操做对于从多个协同程序调用的顺序是公平的。它们以先入先出的顺序被服务,例如,调用接收的第一个协程获取元素。在如下示例中,两个协程“ping”和“pong”正从共享的“表格”通道接收“球”对象:
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> { val table = Channel<Ball>() // a shared table launch(coroutineContext) { player("ping", table) } launch(coroutineContext) { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them }
suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ println("$name $ball") delay(300) // wait a bit table.send(ball) // send the ball back } }
“ping”协程首先启动,因此它是第一个接收球。即便“ping”协同程序在将球发回桌面后当即开始接收球,球被“pong”协程接收,由于它已经在等待它了:
ping Ball(hits=1) pong Ball(hits=2) ping Ball(hits=3) pong Ball(hits=4)
请注意,因为正在使用的执行者的性质,有时渠道可能会产生看起来不公平的处决。this issue查看细节:
协同程序可使用多线程调度程序(如CommonPool)同时执行。它提出了全部常见的并发问题。主要的问题是同步访问共享可变状态。在协程的领域中,这个问题的一些解决方案与多线程世界中的解决方案相似,可是其余解决方案是独特的:
让咱们启动一千次协程,所有作一样的动做千次(共执行一百万次)。咱们还会测量他们的完成时间,以便进一步比较:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) { val n = 1000 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { val jobs = List(n) { launch(context) { repeat(k) { action() } } } jobs.forEach { it.join() } } println("Completed ${n * k} actions in $time ms")
}
咱们从一个很是简单的动做开始,使用多线程的CommonPool上下文来增长一个共享的可变变量:
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { counter++ } println("Counter = $counter") }
最后打印什么?打印“counter = 1000000”的可能性很是小,由于千位协同程序从多个线程同时递增计数器而没有任何同步。
注意:若是你有一个旧的系统有2个或更少的cpu,那么你将一直看到1000000,由于CommonPool在这种状况下只在一个线程中运行。要重现问题,您须要进行如下更改:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(mtContext) { // use it instead of CommonPool in this sample and below counter++ } println("Counter = $counter") }
有一个常见的误解认为,使用volatile变量能够解决并发问题。让咱们尝试一下:
@Volatile // in Kotlin `volatile` is an annotation var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { counter++ } println("Counter = $counter") }
这段代码的工做速度较慢,但最终仍是没有获得“counter = 1000000”,由于volatile变量保证可线性化(这是“原子”的技术术语)读写相应的变量,但不提供原子性更大的行动(在咱们的状况下增长)
对于线程和协程都适用的通用解决方案是使用线程安全(又名同步,线性或原子)数据结构,为须要在共享状态上执行的相应操做提供全部必需的同步。在简单计数器的状况下,咱们可使用原子增量和原子操做的原子整数类:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { counter.incrementAndGet() } println("Counter = ${counter.get()}") }
这是解决这个问题的最快解决方案。它适用于普通计数器,集合,队列和其余标准数据结构以及它们的基本操做。然而,它不容易扩展到复杂的状态或复杂的操做,没有现成的线程安全实现。
线程约束是一种解决共享可变状态问题的方法,其中对特定共享状态的全部访问都局限于单个线程。它一般用于UI应用程序中,其中全部UI状态都局限于单个事件派发/应用程序线程。经过使用一个与协程一块儿使用单线程的上下文:
val counterContext = newSingleThreadContext("CounterContext") var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { // run each coroutine in CommonPool withContext(counterContext) { // but confine each increment to the single-threaded context counter++ } } println("Counter = $counter") }
这段代码的工做速度很是缓慢,由于它会执行细粒度的线程约束。每一个单独的增量使用withcontext块从多线程共用上下文切换到单线程上下文。
实际上,粗粒度的线程约束是以大块(例如,大部分状态更新业务逻辑都局限于单线程。下面的例子就是这样作的,以单线程上下文中运行每一个协程开始:
val counterContext = newSingleThreadContext("CounterContext") var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(counterContext) { // run each coroutine in the single-threaded context counter++ } println("Counter = $counter") }
这如今工做得更快,并产生正确的结果。
互斥问题的解决方案是用一个永远不会同时执行的关键部分来保护共享状态的全部修改。在一个阻塞的世界中,你一般使用synchronized或reentrantlock。协程的替代方法称为互斥体。它具备锁定和解锁功能来界定关键部分。关键的区别是,mutex.lock是一个暂停功能。它不会阻塞线程。
还有一个方便表示mutex.lock()的锁定扩展函数。try{...}finally{mutex.unlock()}模式
val mutex = Mutex() var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { mutex.withLock { counter++
} } println("Counter = $counter") }
这个例子中的锁定是细粒度的。然而,对于一些绝对必须按期修改某些共享状态的状况,这是一个很好的选择,可是没有天然线程将此状态限制为。
一个actors是一个协同程序的组合,这个协程被封装在这个协程中,而且是一个与其余协程通讯的通道。一个简单的actors能够写成一个方法,可是具备复杂状态的actors更适合一个类。
有一个actors协同创做者能够方便地将actors的消息频道合并到其做用域中,以便接收来自发送频道的信息并将其组合到做业对象中,以便对actors的单个引用能够随其句柄一块儿传送。
使用actors的第一步是定义actors将要处理的一类消息。kotlin的密封类很是适合这一目的。咱们用inccounter消息定义countermsg密封类来增长计数器和getcounter消息以得到它的值。后者须要发送回复。这里使用了一个可补充的可用通讯原语,它表示将来将被知道(传递)的单个值,这里用于此目的
// Message types for counterActor sealed class CounterMsg object IncCounter : CounterMsg() // one-way message to increment counter class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
那么咱们定义一个使用actor协程构建器启动actor的函数:
// This function launches a new counter actor fun counterActor() = actor<CounterMsg> { var counter = 0 // actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }
主要代码很简单:
fun main(args: Array<String>) = runBlocking<Unit> { val counter = counterActor() // create the actor massiveRun(CommonPool) { counter.send(IncCounter) } // send a message to get a counter value from an actor val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() // shutdown the actor }
(对于正确性)执行者自己执行的是什么上下文可有可无。一个actor是一个协程而且一个协程是按顺序执行的,所以将该状态限制到特定的协程能够解决共享可变状态的问题
actor在负载下比锁定更高效,由于在这种状况下,它老是有工做要作,并且根本不须要切换到不一样的上下文。
请注意,actor协同程序生成器是生成协程生成器的双重对象。一个actor与它接收消息的频道相关联,而一个制做者与它发送元素的频道相关联。
选择表达式能够同时等待多个暂停功能,并选择第一个可用的暂停功能
让咱们有两个字符串生产者:fizz和buzz。fizz每300毫秒产生一个“fizz”字符串:
fun fizz(context: CoroutineContext) = produce<String>(context) { while (true) { // sends "Fizz" every 300 ms delay(300) send("Fizz") } }
buzz产生“buzz!”每500毫秒一个字符串:
fun buzz(context: CoroutineContext) = produce<String>(context) { while (true) { // sends "Buzz!" every 500 ms delay(500) send("Buzz!") } }
使用接收暂停功能,咱们能够从一个频道或另外一个频道接收。但选择表达式容许咱们同时使用它的接受子句接收:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { select<Unit> { // <Unit> means that this select expression does not produce any result fizz.onReceive { value -> // this is the first select clause println("fizz -> '$value'") } buzz.onReceive { value -> // this is the second select clause println("buzz -> '$value'") } } }
让咱们所有运行七次:
fun main(args: Array<String>) = runBlocking<Unit> { val fizz = fizz(coroutineContext) val buzz = buzz(coroutineContext) repeat(7) { selectFizzBuzz(fizz, buzz) } coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
结果以下:
fizz -> 'Fizz' buzz -> 'Buzz!' fizz -> 'Fizz' fizz -> 'Fizz' buzz -> 'Buzz!' fizz -> 'Fizz' buzz -> 'Buzz!'
当通道关闭时,select中的onreceive子句失败,而且相应的select引起异常。咱们可使用onreceiveornull子句在通道关闭时执行特定的操做。如下示例还显示select是一个返回其所选子句结果的表达式:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> { a.onReceiveOrNull { value -> if (value == null) "Channel 'a' is closed" else "a -> '$value'" } b.onReceiveOrNull { value -> if (value == null) "Channel 'b' is closed" else
"b -> '$value'" } }
让咱们用四次产生“hello”字符串的通道a和四次产生“world”的通道b使用它:
fun main(args: Array<String>) = runBlocking<Unit> { // we are using the context of the main thread in this example for predictability ... val a = produce<String>(coroutineContext) { repeat(4) { send("Hello $it") } } val b = produce<String>(coroutineContext) { repeat(4) { send("World $it") } } repeat(8) { // print first eight results println(selectAorB(a, b)) } coroutineContext.cancelChildren()
}
这段代码的结果很是有趣,因此咱们会详细分析它的模式:
a -> 'Hello 0' a -> 'Hello 1' b -> 'World 0' a -> 'Hello 2' a -> 'Hello 3' b -> 'World 1' Channel 'a' is closed Channel 'a' is closed
有几个观察结果能够从中得出:
首先,选择偏向于第一个条款。当同时选择几个子句时,其中的第一个被选中。在这里,两个频道都在不断地产生字符串,因此一个频道成为选择的第一个条目,赢了。可是,由于咱们使用的是无缓冲的频道,因此a会不时暂停发送调用,而且也给b发送一个机会
第二个观察结果是,当通道已经关闭时,onreceiveornull当即被选中
选择表达式有onsend子句,能够用于一个很棒的好处与偏见的选择性质相结合。
让咱们编写一个整数生成器的例子,当其主通道的用户不能跟上它时,它将其值发送到辅助通道:
fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) { for (num in 1..10) { // produce 10 numbers from 1 to 10 delay(100) // every 100 ms select<Unit> { onSend(num) {} // Send to the primary channel side.onSend(num) {} // or to the side channel
} } }
消费者将会很是缓慢,须要250毫秒来处理每一个号码
fun main(args: Array<String>) = runBlocking<Unit> { val side = Channel<Int>() // allocate side channel launch(coroutineContext) { // this is a very fast consumer for the side channel side.consumeEach { println("Side channel has $it") } } produceNumbers(coroutineContext, side).consumeEach { println("Consuming $it") delay(250) // let us digest the consumed number properly, do not hurry } println("Done consuming") coroutineContext.cancelChildren()
}
让咱们看看发生了什么:
Consuming 1 Side channel has 2 Side channel has 3 Consuming 4 Side channel has 5 Side channel has 6 Consuming 7 Side channel has 8 Side channel has 9 Consuming 10 Done consuming
可使用onawait子句选择延迟值。让咱们从一个异步函数开始,它在随机延迟以后返回一个延迟字符串值:
fun asyncString(time: Int) = async { delay(time.toLong()) "Waited for $time ms" }
让咱们以随机延迟启动其中的十几个
fun asyncStringsList(): List<Deferred<String>> { val random = Random(3) return List(12) { asyncString(random.nextInt(1000)) } }
如今主函数正在等待第一个完成并计算仍然活动的延迟值的数量。请注意,咱们在这里使用了select expression是kotlin dsl的事实,因此咱们可使用任意代码为它提供子句。在这种状况下,咱们遍历延迟值列表,为每一个延迟值提供onawait子句
fun main(args: Array<String>) = runBlocking<Unit> { val list = asyncStringsList() val result = select<String> { list.withIndex().forEach { (index, deferred) -> deferred.onAwait { answer -> "Deferred $index produced answer '$answer'" } } } println(result) val countActive = list.count { it.isActive } println("$countActive coroutines are still active") }
输出以下:
Deferred 4 produced answer 'Waited for 128 ms' 11 coroutines are still active
让咱们编写一个使用延迟字符串值通道的通道生成器函数,等待每一个接收到的延迟值,但直到下一个延迟值结束或通道关闭。这个例子将onreceiveornull和onawait子句放在同一个select中
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select<Deferred<String>?> { // return next deferred value from this select or null input.onReceiveOrNull { update -> update // replaces next value to wait } current.onAwait { value ->
send(value) // send value that current deferred has produced input.receiveOrNull() // and use the next deferred from the input channel } } if (next == null) { println("Channel was closed") break // out of loop } else { current = next } } }
为了测试它,咱们将使用一个简单的异步函数,在指定的时间后解析为指定的字符串
fun asyncString(str: String, time: Long) = async { delay(time) str }
主函数只是启动一个协程来打印switchmapdeferreds的结果并发送一些测试数据给它
fun main(args: Array<String>) = runBlocking<Unit> { val chan = Channel<Deferred<String>>() // the channel for test launch(coroutineContext) { // launch printing coroutine for (s in switchMapDeferreds(chan)) println(s) // print each received string } chan.send(asyncString("BEGIN", 100)) delay(200) // enough time for "BEGIN" to be produced chan.send(asyncString("Slow", 500)) delay(100) // not enough time to produce slow chan.send(asyncString("Replace", 100)) delay(500) // give it time before the last one chan.send(asyncString("END", 500)) delay(1000) // give it time to process chan.close() // close the channel ... delay(500) // and wait some time to let it finish }
结果以下:
BEGIN Replace END Channel was closed