【译】kotlin 协程官方文档(8)-共享可变状态和并发性(Shared mutable state and concurrency)

最近一直在了解关于kotlin协程的知识,那最好的学习资料天然是官方提供的学习文档了,看了看后我就萌生了翻译官方文档的想法。先后花了要接近一个月时间,一共九篇文章,在这里也分享出来,但愿对读者有所帮助。我的知识所限,有些翻译得不是太顺畅,也但愿读者能提出意见java

协程官方文档:coroutines-guidegit

协程官方文档中文翻译:coroutines-cn-guidegithub

协程官方文档中文译者:leavesC安全

[TOC]bash

可使用多线程调度器(如 Dispatchers.Default)并发执行协程,它呈现了全部常见的并发问题。主要问题是对共享可变状态的同步访问。在协程做用域中解决这个问题的一些方法相似于多线程世界中的方法,但有一些其它方法是独有的数据结构

1、问题(The problem)

让咱们启动一百个协程,都作一样的操做一千次。咱们还将计算它们的完成时间,以便进一步比较:多线程

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}
复制代码

咱们从一个很是简单的操做开始,该操做使用多线程调度器 Dispatchers.Default,并增长一个共享的可变变量并发

import kotlinx.coroutines.*
import kotlin.system.*    

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
复制代码

最后会打印出什么呢?不太可能打印出 “Counter=100000”,由于100个协程从多个线程并发地递增 counter 而不进行任何同步。ide

2、Volatiles 是没有做用的(Volatiles are of no help)

有一种常见的误解是:将变量标记为 volatile 能够解决并发问题。让咱们试试:函数

import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
复制代码

这段代码运行得比较慢,可是咱们在最后仍然没有获得“Counter=100000”,由于 volatile 变量保证了可线性化(这是“atomic”的一个技术术语)对相应变量的读写,但不提供更大行为的原子性(在咱们的例子中指递增操做)

3、线程安全的数据结构(Thread-safe data structures)

对线程和协程都有效的一个解决方案是使用线程安全的(也称为同步、可线性化或原子)数据结构,该结构为须要在共享状态上执行的相应操做提供全部必要的同步保障。对于一个简单的计数器,咱们可使用 AtomicInteger 类,该类具备保证原子性的 incrementAndGet 方法

import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
var counter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
复制代码

这是解决这个特殊问题的最快方法。它适用于普通计数器、集合、队列和其余标准数据结构及其基本操做。可是,它不容易扩展到复杂的状态或没有实现好了的线程安全的复杂操做

4、以细粒度限制线程(Thread confinement fine-grained)

线程限制是解决共享可变状态问题的一种方法,其中对特定共享状态的全部访问都限制在一个线程内。它一般用于 UI 应用程序,其中全部的 UI 状态都限制在“单个事件分派”或“应用程序线程”中。经过使用单线程上下文,能够很容易地使用协程来实现上述的计数器

import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // confine each increment to a single-threaded context
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}
//sampleEnd      
复制代码

这段代码运行得很是缓慢,由于它执行细粒度的线程限制。每一个单独的增值操做都使用 withContext(counterContext) 从多线程 Dispatchers.Default 上下文切换到单线程上下文

5、以粗粒度限制线程(Thread confinement coarse-grained)

在实践中,线程限制是在比较大的范围内执行的,例如,更新状态的逻辑的范围被限制在单个线程中。下面的示例就是这样作的,首先在单线程上下文中运行每一个协程

import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // confine everything to a single-threaded context
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
复制代码

如今这段代码的运行速度会快得多,并产生了正确的结果

6、互斥(Mutual exclusion)

互斥问题的解决方案是保护共享状态的全部修改操做,其中的关键代码永远不会同时执行。在一个阻塞的世界中,一般会使用 synchronizedReentrantLock。协程的替换方案称为互斥(Mutex)。它具备 lockunlock 函数以划定一个关键位置。关键的区别在于 Mutex.lock() 是一个挂起函数。它不会阻塞线程

还有一个扩展函数 withLock 能够方便地来实现 mutex.lock(); try {...} finally { mutex.unlock() }

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // protect each increment with lock
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}
//sampleEnd
复制代码

本例中的锁是细粒度的,所以它也付出了某些代价(消耗)。可是,在某些状况下这是一个很好的选择,好比你必须按期修改某些共享状态,但不具有修改共享状态所需的原生线程

7、Actors

actor 是一个实体,由一个协程、被限制并封装到这个协程中的状态以及一个与其它协程通讯的通道组成。简单的 actor 能够写成函数,但具备复杂状态的 actor 更适合类

有一个 actor 协程构造器,它能够方便地将 actor 的 mailbox channel 合并到其接收的消息的做用域中,并将 send channel 合并到生成的 job 对象中,以即可以将对 actor 的单个引用做为其句柄引有

使用 actor 的第一步是定义一类 actor 将要处理的消息。kotlin 的密封类很是适合这个目的。在 CounterMsg 密封类中,咱们用 IncCounter 消息来定义递增计数器,用 GetCounter 消息来获取其值,后者须要返回值。为此,这里使用 CompletableDeferred communication primitive,它表示未来已知(通讯)的单个值

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
复制代码

而后,咱们定义一个函数,该函数使用 actor 协程构造器来启动 actor:

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
复制代码

代码很简单:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

//sampleStart
fun main() = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}
//sampleEnd 
复制代码

在什么上下文中执行 actor 自己并不重要(为了正确)。actor 是一个协程,而且协程是按顺序执行的,所以将状态限制到特定的协程能够解决共享可变状态的问题。实际上,actors 能够修改本身的私有状态,但只能经过消息相互影响(避免须要任何锁)

actor 比使用锁更为有效,由于在这种状况下,它老是有工做要作,根本不须要切换到不一样的上下文

注意,actor 协程构造器是一个双重的 product 协程构造器 。actor 与它接收消息的通道相关联,而 producer 与向其发送元素的通道相关联

相关文章
相关标签/搜索