你们若是已经使用Kotlin语言进行开发,对协程这个概念应该不会很陌生。虽然在网上有不少Kotlin协程相关的文章,但当我开始准备使用的时候,仍是有以下几个疑虑。html
接下来就带着这几个问题一块儿来了解一下Kotlin的协程。java
关于协程,我在网上看到最多的说法是协程是轻量级的线程。那么协程首先应该解决的问题就是程序中咱们经常遇到的 “异步” 的问题。咱们看看官网介绍的几个使用例子。android
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3'
复制代码
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后台启动一个新的协程并继续
delay(1000L)
println("World!")
}
println("Hello,") // 主线程中的代码会当即执行
runBlocking { // 可是这个表达式阻塞了主线程
delay(2000L) // ……咱们延迟 2 秒来保证 JVM 的存活
}
}
复制代码
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 假设咱们在这里作了一些有用的事
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 假设咱们在这里也作了一些有用的事
return 29
}
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
复制代码
结果:git
The answer is 42
Completed in 2015 ms
复制代码
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
复制代码
结果:github
The answer is 42
Completed in 1017 ms
复制代码
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// 这里咱们可使用任何喜欢的断言风格来使用挂起函数
}
}
复制代码
更新详细的使用可参考官网示例bash
既然已经有这么多异步处理的框架,那咱们为什么还要使用协程。这里举个例子,看看对同个需求,不一样异步框架的处理方式。微信
如今有一个产品需求,生成一个二维码在页面展现给用户。咱们来对比看看不一样的作法。并发
Thread(Runnable {
try {
val qrCode: Bitmap =
CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE)
runOnUiThread {
img_qr_code.setImageBitmap(qrCode)
}
} catch (e: WriterException) {
e.printStackTrace()
}
}).start()
}
复制代码
Executors.newSingleThreadExecutor().execute {
try {
val qrCode: Bitmap =
CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE)
runOnUiThread {
img_qr_code.setImageBitmap(qrCode)
}
} catch (e: WriterException) {
e.printStackTrace()
}
}
复制代码
Observable.just(SHARE_QR_CODE)
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
return CodeCreator.createQRCode(ShareActivity.this, s);
}
})
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
img_qr_code.setImageBitmap(bitmap);
}
});
复制代码
val job = GlobalScope.launch(Dispatchers.IO) {
val bitmap = CodeCreator.createQRCode(ShareActivity.this, SHARE_QR_CODE)
launch(Dispatchers.Main) {
img_qr_code.setImageBitmap(bitmap)
}
}
}
复制代码
经过这个例子,能够看出使用协程的很是方便解决 "异步回调" 问题。 相比传统的Thread及Excutors,RxJava将嵌套回调转换成链式调用的形式,提升了代码可读性。协程直接将链式调用转换成了协程内的顺序调用,"代码更加精简"。app
官网上对于协程的有一句介绍。框架
本质上,协程是轻量级的线程
那么协程的执行效率到底怎么样呢?下面咱们采用官网的示例在相同的环境和设备下作下对比。
启动了 1000个协程,而且为每一个协程都输出一个点
var startTime = System.currentTimeMillis()
repeat(times) { i -> // 启动大量的协程
GlobalScope.launch(Dispatchers.IO) {
Log.d(this@MainActivity.toString(), "$i=.")
}
}
var endTime = System.currentTimeMillis() - startTime;
Log.d(this@MainActivity.toString(), "endTime=$endTime")
复制代码
执行结果:endTime=239 ms
var startTime = System.currentTimeMillis()
repeat(times) { i ->// 启动大量的线程
Thread(Runnable {
Log.d(this@MainActivity.toString(), "$i=.")
}).start()
}
var endTime = System.currentTimeMillis() - startTime;
复制代码
执行结果:endTime=3161 ms
var startTime = System.currentTimeMillis()
var executors = Executors.newCachedThreadPool()
repeat(times) { i -> // 使用线程池
executors.execute {
Log.d(this@MainActivity.toString(), "$i=.")
}
}
var endTime = System.currentTimeMillis() - startTime;
Log.d(this@MainActivity.toString(), "endTime=$endTime")
复制代码
执行结果:endTime=143 ms
var startTime = System.currentTimeMillis()
repeat(times) { i -> // 启动Rxjava
Observable.just("").subscribeOn(Schedulers.io())
.subscribe {
Log.d(this@MainActivity.toString(), "$i=.")
}
}
var endTime = System.currentTimeMillis() - startTime;
Log.d(this@MainActivity.toString(), "endTime=$endTime")
复制代码
执行结果:endTime=241 ms
源码工程:CorountineTest
利用AS自带的Profiler对运行时的CPU状态进行检测,咱们能够看到Thread对CPU的消耗比较大,Koroutine、Executor、RxJava的消耗基本差很少。
从执行时间和Profiler上看,Coroutine比使用Thread性能提高了一个量级,但与Excutor和RxJava性能是在一个量级上。
注意这里的例子为了简便,由于异步执行的时间基本和repeat的时间差很少,咱们没有等全部异步执行完再打印时间,这里不追求精确的时间,只为作量级上的对比。
咱们先来看一段简单的Kotlin程序。
GlobalScope.launch(Dispatchers.IO) {
print("hello world")
}
复制代码
咱们接着看下launch的实现代码。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
复制代码
这里注意,咱们经过追踪最后的继承关系发现,DefaultScheduler.IO最后也是一个CoroutineContext。
接着发现继续看coroutine.start的实现,以下:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
复制代码
接着继续看CoroutineStart的start策略,以下:
@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
CoroutineStart.ATOMIC -> block.startCoroutine(completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
复制代码
继续看startCoroutineCancellable方法,以下:
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
复制代码
继续看resumeCancellableWith方法实现:
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
复制代码
最后发现调用的resumeCancellableWith方法实现以下:
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
复制代码
这里关键的触发方法在这个地方
dispatcher.dispatch(context, this)
复制代码
咱们看 DefaultScheduler.IO最后的dispatch方法:
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
复制代码
这里咱们最终发现是调用了CoroutineScheduler的dispatch方法,继续看CoroutineScheduler的实现发现,CoroutineScheduler继承了Executor。
经过dispatch的调用最后能够发现CoroutineScheduler其实就是对Worker的调度,咱们看看Worker的定义。
internal inner class Worker private constructor() : Thread()
复制代码
经过这里咱们发现另一个老朋友Thread,因此到这里也符合上面性能验证的测试结果。
到这里咱们也有结论了,协程异步实现机制本质也就是自定义的线程池。
suspend有什么做用,如何作到异步不用回调?下面先定义一个最简单的suspend方法。
suspend fun hello(){
delay(100)
print("hello world")
}
复制代码
经过Kotlin Bytecode转换为java 代码以下:
@Nullable
public final Object hello(@NotNull Continuation $completion) {
Object $continuation;
label20: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label;
Object L$0;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Test.this.hello(this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).L$0 = this;
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(100L, (Continuation)$continuation) == var6) {
return var6;
}
break;
case 1:
Test var7 = (Test)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String var2 = "hello world";
boolean var3 = false;
System.out.print(var2);
return Unit.INSTANCE;
}
复制代码
这里首先咱们发现方法的参数多了一个Continuation completion而且内部回定义一个 Object continuation,看看Continuation的定义。
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
复制代码
这是一个回调接口,里面有一个关键的方法为resumeWith。 这个方法的具体调用经过上面的协程调用流程能够知道 ,在DispatchedContinuation的resumeCancellableWith会触发。
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
复制代码
那么resumeWith里面作了那些事情?咱们看下具体的实如今ContinuationImpl的父类BaseContinuationImpl中。
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
复制代码
首先咱们发现这里实际上是一个递归的循环,而且会调用invokeSuspend方法触发实际的调用,等待返回结果。经过上面的分析能够看出2点。
欢迎关注个人我的公众号
微信搜索:一码一浮生,或者搜索公众号ID:life2code