【译】kotlin 协程官方文档(6)-通道(Channels)

最近一直在了解关于kotlin协程的知识,那最好的学习资料天然是官方提供的学习文档了,看了看后我就萌生了翻译官方文档的想法。先后花了要接近一个月时间,一共九篇文章,在这里也分享出来,但愿对读者有所帮助。我的知识所限,有些翻译得不是太顺畅,也但愿读者能提出意见git

协程官方文档:coroutines-guidegithub

协程官方文档中文翻译:coroutines-cn-guide编程

协程官方文档中文译者:leavesC安全

[TOC]并发

Deferred 值提供了在协程之间传递单个值的方便方法,而通道(Channels)提供了一种传输值流的方法异步

1、通道基础(Channel basics)

通道在概念上很是相似于 BlockingQueue,它们之间的一个关键区别是:通道有一个挂起的 send 函数和一个挂起的 receive 函数,而不是一个阻塞的 put 操做和一个阻塞的 take 操做async

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
//sampleEnd
}
复制代码

输出结果是:ide

1
4
9
16
25
Done!
复制代码

2、关闭和迭代通道(Closing and iteration over channels)

与队列不一样,通道能够关闭,以此来代表元素已发送完成。在接收方,使用常规的 for 循环从通道接收元素是比较方便的函数

从概念上讲,close 相似于向通道发送一个特殊的 cloase 标记。一旦接收到这个 close 标记,迭代就会中止,所以能够保证接收到 close 以前发送的全部元素:oop

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
//sampleEnd
}
复制代码

3、构建通道生产者(Building channel producers)

协程生成元素序列(sequence )的模式很是常见。这是能够常常在并发编程中发现的生产者-消费者模式的一部分。你能够将这样一个生产者抽象为一个以 channel 为参数的函数,但这与必须从函数返回结果的常识相反

有一个方便的名为 product 的协程构造器,它使得在 producer 端执行该操做变得很容易;还有一个扩展函数 consumerEach,它替换了consumer 端的 for 循环:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
//sampleStart
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
//sampleEnd
}
复制代码

4、管道(Pipelines)

管道是一种模式,是一个协程正在生成的多是无穷多个元素的值流

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}
复制代码

存在一个或多个协程对值流进行取值,进行一些处理并产生一些其它结果。在下面的示例中,每一个返回值也是入参值(数字)的平方值

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}
复制代码

启动并链接整个管道:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    repeat(5) {
        println(squares.receive()) // print first five
    }
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
//sampleEnd
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}
复制代码

建立协程的全部函数都被定义为 CoroutineScope 的扩展,所以咱们能够依赖结构化并发来确保应用程序中没有延迟的全局协程

5、使用管道的素数(Prime numbers with pipeline)

让咱们以一个使用协程管道生成素数的例子,将管道发挥到极致。咱们从一个无限的数字序列开始

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}
复制代码

如下管道过滤传入的数字流,删除全部可被给定素数整除的数字:

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}
复制代码

如今,咱们经过从2开始一个数字流,从当前通道获取一个质数,并为找到的每一个质数启动新的管道:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 
复制代码

下面的示例代码打印了前十个质数,在主线程的上下文中运行整个管道。由于全部的协程都是在主 runBlocking 协程的范围内启动的,因此咱们没必要保留全部已启动的协程的显式引用。咱们使用扩展函数 cancelChildren 来取消打印前十个质数后的全部子协程

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd 
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}
复制代码

运行结果:

2
3
5
7
11
13
17
19
23
29
复制代码

注意,你可使用标准库中的 iterator 协程构造器来构建相同的管道。将 product 替换为 iterator,send 替换为 yield,receive 替换为 next,ReceiveChannel 替换为 iterator,并去掉协程做用域。你也不须要再使用 runBlocking 。可是,使用如上所示的通道的管道的好处是,若是在 Dispatchers.Default 上下文中运行它,它实际上能够利用多个 CPU 来执行代码

但不管如何,如上所述的替代方案也是一个很是不切实际的来寻找素数的方法。实际上,管道确实涉及一些其余挂起调用(如对远程服务的异步调用),而且这些管道不能使用 sequence/iterator 来构建,由于它们不容许任意挂起,而 product 是彻底异步的

6、扇出(Fan-out)

多个协程能够从同一个通道接收数据,在它们之间分配任务。让咱们从一个周期性地生成整数(每秒10个数)的 producer 协程开始:

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}
复制代码

而后咱们能够有多个处理器(processor)协程。在本例中,他们只需打印他们的 id 和接收的数字:

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}
复制代码

如今让咱们启动5个处理器,让它们工做几乎一秒钟。看看会发生什么:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
//sampleStart
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
//sampleEnd
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}
复制代码

尽管接收每一个特定整数的处理器 id 可能不一样,但运行结果将相似于如下输出:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
复制代码

请注意,取消 producer 协程会关闭其通道,从而最终终止 processor 协程正在执行的通道上的迭代

另外,请注意咱们如何使用 for 循环在通道上显式迭代以在 launchProcessor 代码中执行 fan-out。与 consumeEach 不一样,这个 for 循环模式在多个协程中使用是彻底安全的。若是其中一个 processor 协程失败,则其余处理器仍将处理通道,而经过 consumeEach 写入的处理器老是在正常或异常完成时消费(取消)底层通道

7、扇入(Fan-in)

多个协程能够发送到同一个通道。例如,有一个字符串通道和一个挂起函数,函数以指定的延迟将指定的字符串重复发送到此通道:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
复制代码

如今,让咱们看看若是启动两个协程来发送字符串会发生什么状况(在本例中,咱们将它们做为主协程的子协程,在主线程的上下文中启动):

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
复制代码

运行结果:

foo
foo
BAR!
foo
foo
BAR!
复制代码

8、带缓冲的通道(Buffered channels)

到目前为止显示的通道都没有缓冲区。无缓冲通道在发送方和接收方同时调用发送和接收操做时传输元素。若是先调用 send,则在调用 receive 以前会将其挂起;若是先调用 receive ,则在调用 send 以前会将其挂起

Channel() 工厂函数和 produce 构建器都采用可选的参数 capacity 来指定缓冲区大小。 缓冲用于容许发送者在挂起以前发送多个元素,相似于具备指定容量的 BlockingQueue,它在缓冲区已满时才阻塞

查看如下代码的效果:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
//sampleStart
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
//sampleEnd 
}
复制代码

使用了容量为4的缓冲通道,因此将打印五次:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
复制代码

前四个元素被添加到缓冲区内,sender 在尝试发送第五个元素时挂起

9、通道是公平的(Channels are fair)

对通道的发送和接收操做,对于从多个协程调用它们的顺序是公平的。它们按先入先出的顺序提供,例如,先调用 receive 的协程先获取到元素。在下面的示例中,两个协程 “ping” 和 “pong” 从共享的 “table” 通道接收 “ball” 对象

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

//sampleStart
data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}
//sampleEnd
复制代码

“ping” 协程首先开始运行,因此它是第一个接收到 ball 的。即便 “ping” 协程在将 ball 从新送回给 table 后又当即开始进行 receive,但 ball 仍是会被 “pong” 接收到,由于它已经先在等待接收了:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
复制代码

请注意,有时因为所使用的执行者的性质,通道可能会产生看起来不公平的执行效果。有关详细信息,请参阅此 issue

10、计时器通道(Ticker channels)

计时器通道是一种特殊的会合(rendezvous)通道,自该通道的最后一次消耗以来,每次给定的延迟时间结束后都将返回 Unit 值。尽管它看起来是无用处的,但它是一个有用的构建块,能够建立复杂的基于时间的 produce 管道和进行窗口化操做以及其它时间相关的处理。计时器通道可用于 select 执行 “on tick” 操做

要建立这样的通道,请使用工厂方法 ticker。若是不须要通道发送更多元素了,请对其使用 ReceiveChannel.cancel 取消发送

如今让咱们看看它在实践中是如何工做的:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}
复制代码

运行结果:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
复制代码

请注意,ticker 能感知到消费端可能处于暂停状态,而且在默认的状况下,若是发生暂停,将会延迟下一个元素的生成,尝试保持生成元素的固定速率

可选的,ticker 函数的 mode 参数能够指定为 TickerMode.FIXED_DELAY,以保证元素之间的固定延迟

相关文章
相关标签/搜索