协程虽然是微线程,可是并不会和某一个特定的线程绑定,它能够在A线程中执行,并通过某一个时刻的挂起(suspend),等下次调度到恢复执行的时候,极可能会在B线程中执行。java
与 launch、async、runBlocking 相似 withContext 也属于 Coroutine builders。不过与他们不一样的是,其余几个都是建立一个新的协程,而 withContext 不会建立新的协程。withContext 容许更改协程的执行线程,withContext 在使用时须要传递一个 CoroutineContext 。安全
launch {
val result1 = withContext(CommonPool) {
delay(2000)
1
}
val result2 = withContext(CommonPool) {
delay(1000)
2
}
val result = result1 + result2
println(result)
}
Thread.sleep(5000)
复制代码
执行结果:bash
3
复制代码
withContext 能够有返回值,这一点相似 async。async 建立的协程经过 await() 方法将值返回。而 withContext 能够直接返回。app
launch {
val result1 = async {
delay(2000)
1
}
val result2 = async {
delay(1000)
2
}
val result = result1.await() + result2.await()
println(result)
}
Thread.sleep(5000)
复制代码
执行结果:async
3
复制代码
在上述的例子中,withContext 使用了 CommonPool。CommonPool 继承了 CoroutineDispatcher,表示使用线程池来执行协程的任务。ide
CommonPool 有点相似于 RxJava 的 Schedulers.computation(),主要是用于CPU密集型的计算任务。函数
CommonPool 使用 pool 来执行 block。oop
override fun dispatch(context: CoroutineContext, block: Runnable) =
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}
复制代码
若是 pool 为空,则调用 getOrCreatePoolSync() 方法来建立 pool。post
@Synchronized
private fun getOrCreatePoolSync(): Executor =
pool ?: createPool().also { pool = it }
复制代码
此时,createPool() 方法是正在建立 pool 的方法。ui
首先,安全管理器不为空的话,使用 createPlainPool() 来建立 pool。 不然,尝试建立一个 ForkJoinPool,不行的话仍是使用 createPlainPool() 来建立 pool。
private fun createPool(): ExecutorService {
if (System.getSecurityManager() != null) return createPlainPool()
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool()
if (!usePrivatePool) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
?.let { return it }
}
Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
?. let { return it }
return createPlainPool()
}
复制代码
createPlainPool() 会使用 Executors.newFixedThreadPool() 来建立线程池。
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
复制代码
CommonPool 的建立原理大体了解以后,经过源码发现 CoroutineContext 默认的 CoroutineDispatcher 就是 CommonPool。
/** * This is the default [CoroutineDispatcher] that is used by all standard builders like * [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context. * * It is currently equal to [CommonPool], but the value is subject to change in the future. */
@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool
复制代码
常见的 CoroutineDispatcher 还能够经过 ThreadPoolDispatcher 的 newSingleThreadContext()、newFixedThreadPoolContext()来建立,以及Executor 的扩展函数 asCoroutineDispatcher() 来建立。
在 Android 中,还可使用UI。它顾名思义,在 Android 主线程上调度执行。
Job、Deferred 对象均可以取消任务。
使用 cancel() 方法:
val job = launch {
delay(1000)
println("Hello World!")
}
job.cancel()
println(job.isCancelled)
Thread.sleep(2000)
复制代码
执行结果:
true
复制代码
true表示job已经被取消了,并无打印"Hello World!"
使用 cancelAndJoin() 方法:
runBlocking<Unit> {
val job = launch {
repeat(100) { i ->
println("count time: $i")
delay(500)
}
}
delay(2100)
job.cancelAndJoin()
}
复制代码
执行结果:
count time: 0
count time: 1
count time: 2
count time: 3
count time: 4
复制代码
cancelAndJoin() 等价于使用了 cancel() 和 join()。
join() 方法用于等待已启动协程的完成,而且它不会传播其异常。 可是,崩溃的子协程也会取消其父协程,并带有相应的异常。
若是一个协程一直在执行计算,没有去检查取消标记,它就没法取消。即便调用了cancel() 或者 cancelAndJoin()。
runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var tempTime = startTime
var i = 0
while (i < 100) {
if (System.currentTimeMillis() >= tempTime) {
println("count time: ${i++}")
tempTime += 500L
}
}
}
delay(2100)
job.cancelAndJoin()
}
复制代码
上述代码仍然会打印100次。
若是使用isActive
检查取消标记,则Job 或 Deferred 的任务能够被取消:
runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var tempTime = startTime
var i = 0
while (isActive) {
if (System.currentTimeMillis() >= tempTime) {
println("count time: ${i++}")
tempTime += 500L
}
}
}
delay(2100)
job.cancelAndJoin()
}
复制代码
执行结果:
count time: 0
count time: 1
count time: 2
count time: 3
count time: 4
复制代码
isActive 是 CoroutineScope 的属性:
package kotlinx.coroutines.experimental
import kotlin.coroutines.experimental.*
import kotlin.internal.*
/** * Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient * and fast access to its own cancellation status via [isActive]. */
public interface CoroutineScope {
/** * Returns `true` when this coroutine is still active (has not completed and was not cancelled yet). * * Check this property in long-running computation loops to support cancellation: * ``` * while (isActive) { * // do some computation * } * ``` * * This property is a shortcut for `coroutineContext.isActive` in the scope when * [CoroutineScope] is available. * See [coroutineContext][kotlin.coroutines.experimental.coroutineContext], * [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive]. */
public val isActive: Boolean
/** * Returns the context of this coroutine. * * @suppress: **Deprecated**: Replaced with top-level [kotlin.coroutines.experimental.coroutineContext]. */
@Deprecated("Replace with top-level coroutineContext", replaceWith = ReplaceWith("coroutineContext", imports = ["kotlin.coroutines.experimental.coroutineContext"]))
@LowPriorityInOverloadResolution
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
public val coroutineContext: CoroutineContext
}
复制代码
本文介绍了三个部分:withContext的使用,CommonPool的建立以及如何取消协程。其中,还捎带介绍了 async 和 await 的使用。
该系列的相关文章:
Java与Android技术栈:每周更新推送原创技术文章,欢迎扫描下方的公众号二维码并关注,期待与您的共同成长和进步。