进程、线程、协程是啥?网上有不少生动形象的例子,我我的认为,搞这么多花里胡哨的,目的就一个:最大化利用资源。java
注:进程、线程不是我想写的重点,协程才是,但不提一下前面两个,感受就像吃🍔🍟没有🥤同样奇怪。android
本文废话较多,不喜勿喷,耐心看下去,说不定会有意想不到的收获git
对于进程而言,合理利用的资源在本文中是指cpu的时间资源。程序员
完成一件事情须要不少步骤,执行步骤有不少策略,假设你的策略是one by one:必须一个步骤作完再进行下一步。github
若是cpu也像你同样的策略,那在对寄存器,内存,硬盘进行IO操做时,cpu就躺着休息了!由于cpu太快,很是快,而寄存器比较慢,内存更慢,硬盘更更慢,因此cpu只能等数据传输完,时间就浪费在了这里。编程
为了合理利用cpu等待的这段时间。可让cpu先提早作其余步骤。(打工人写到这里,眼眶都湿润了)api
那就把各个步骤,放到不一样的进程去执行,cpu在执行一个步骤,遇到了耗时等待的io时,就去执行另外一个步骤,也就是执行另外一个进程,但这时又发现一个问题,cpu确实把等待的时间利用起来了,可是有一部分时间并无执行进程里面的代码,而是在作进程环境的切换。markdown
举个不恰当的例子:多线程
你去找张三,让张三帮你写个网页,但张三说你提的这个功能技术上须要调研一下 (IO耗时操做),让你等他20分钟。你想,我先去找李四吧,先让李四把后台的协议先定下来,从张三到李四家花了5分钟,和李四的讨论花了10分钟。而后你回去找张三,到张三家里时,张三正好调研完了。闭包
20分钟没有在张三家里白等,可是你花费了10分钟在张三到李四家往返,10分钟你均可以让王五再改几版稿子,赵六再提几个场景了,老子分分钟几十万上下,这张三和李四就不能住一块儿吗?在这里,张三和李四的房子都是属于进程,进程间的切换稍微耗费点时间。
你以为时间都浪费在没有价值的地方,因此你就租了栋楼,让张三李四住一块儿,一人一层,这样找他们的时候就简单了!对了,把王五赵六也叫上吧。在这里,咱们把这栋楼看作进程,张三和李四看作线程。这个时候你找张三找李四都快多了。(打工人写到这里,眼泪已经流下来了)
随着业务越作越大,愈来愈多的应酬,你以为手下的人真烦,每一个人抢着都找你叽叽喳喳(一些操做系统的线程执行是抢占式的),管理时间成为了你的首要问题。因而你请了个秘书,会议的事情让秘书(操做系统)安排一下,每一个人最多只有两分钟的时间,固然,级别高的能够直接找你,超时也容许(一些操做系统是优先级高的能够是抢占式的,而普通线程是非强占式的)。
在秘书的安排下(调度),张三李四们就按秘书的安排来你的办公室,和你开会,一次的时间也就那么长,谈不完就下次再谈。但后来你发现,下一个到张三,但张三从工位到你办公室也要一点点时间。你怎么能忍受这些时间的浪费?这个例子可能举的很差,实际上是资源竞争和线程上下文切换的问题,多个线程在执行时,必然会对同一个资源进行竞争。这就像多个员工去竞争你这个老板同样,因此须要对共享资源进行加锁,而加锁就会涉及到线程状态的切换,这里是会浪费时间的。同时线程切换也浪费了一丢丢时间。
因而你引进了一套在线会议系统,你只须要按照秘书的安排,和对应的人开会就好,这样就减小了他们过来你办公室的时间。秘书通知某个员工到他了,再让其余全部员工等待。这也浪费了一点时间。
那若是容许员工之间提早沟通时间,由于他们知道本身何时有时间,他们能够主动让出这个开会的时间给其余人,并提早定好要开会的时间,一到时间直接开会,省去了秘书的通知,时间又节省了一点。这就是协程的主动让出cpu全部权的行为。固然,因为在线会议的引入,虽然你和一万个员工在开会,但对你而言,你所面对都是同一个屏幕。这里类比的可能也不恰当,但要知道的是,有些时候无论有多少个协程,对cpu而言可能就只有一个线程。因此在这里,张三和李四已经不是线程的概念了,而是协程的概念。
故事差很少就这样,在这里咱们关注到对于进程和线程的资源浪费,本质上是时间的浪费,而时间大部分就是花费在了空间上,同时合理的调度也是能省出一部分时间。
因此进程,线程等概念,我感受能够当作是时间的概念,举个例子:爽子一天挣208万,那你就能够用一爽比做208万,由于从某个层面而言,一爽和208万是等价的。固然你会触类旁通:一东。
从上文中,在这种等待多的状况下,你们自主的让出cpu时间片会更高效。你可能会有一丢丢疑问,那几个员工都想要同一个时间呢?让出还有意义吗?有的,由于不互相协调时间的话,那就最差的状况就是:你们都被安排在本身忙的时间(io),但若是协调好时间的话,就能够减小这种状况。这种就是io密集型的场景,采用协做的方式能大大减小时间的浪费。
你们主动让出的这种行为就是协做,而不是像一开始那样的都去抢占时间片。这里的让出,其实就是挂起的意思,而挂起就意味着某个时间后仍是要恢复的,这里和线程的状态切换很像,但又不必定是真正的切换线程,下文会详细讲。
从上面的故事中能够知道:从张三到李四的会议安排,是由你的秘书决定的,而主动让出和何时开会,是由员工决定的,这里的差异很大。在第二种方式下,你的秘书能够省去不少找员工沟通的时间。
而若是张三李四的工做就是负责和你开会,不须要写代码,不须要画稿子,不须要画原型,那让张三李四上线下线会议的行为反而会影响张三和李四的工做效率,这种就是计算密集型的状况,串行比并行高效。但其实有意思的是,有些语言的协程的调度机制作的不错,在某些状况下,不会频繁的切换上下文。
某天合做伙伴孙总过来你这参观,发现你管理员工的方法特别好,回去后立刻尝试搞一套。也请了一个秘书,市场上不少这种管理专业的人才,这些人才的培养早有体系,找一个很简单,因此孙总的公司很快就实现了经过秘书安排会议。这里是在说不一样操做系统线程的api设计大体相同。
但他遇到了个问题:他发现他的员工的协同能力比较差(也可能他发现他员工的协同能力很强,而后向你得意洋洋的炫耀)。员工的岗位,所掌握的技能,性格,素质水平等方面的不一样,不通过培训,很难一会儿要求全部人作到很好的协同,培训的方式和效果也不同。因此有的公司可能最终协做的方案是A,有的公司协同方案是B。这里实际上是在表达,协程是语言层面上的东西,每种语言的实现方式都不同,效果也不同。
上文是从生活的角度去写的,例子可能不那么的恰当,但至少咱们知道进程为了合理利用资源,作了努力,可是还有提高的空间。因此线程出现了,线程也作了努力,但仍是有提高空间。这时候协程出现了,也作了努力,至于提高空间,抱歉,我还处于热恋期,眼里都是协程的好。
协程也分有栈协程和无栈协程。常说调用栈调用栈,这里的有栈其实就是由于一些协程间可能有调用关系,有个大佬讲的很好:有栈协程和无栈协程
我是一名Android应用开发者,而kotlin中的协程是无栈协程,因此下文是我对于kotlin无栈协程的理解。下文的角度可能和上文不太同样,生活中的例子落实到代码实现,本就有差距,因此说大部分普通人一会儿难以理解程序开发,是由于没有编程思惟嘛。
要想了解协程,就必须了解闭包的概念,固然咱们能够从最熟悉的回调入手。
在第一次接触回调函数的时候,我相信不少人都会以为这是个特别的东西。它真的很灵活,很神奇!咱们看看:
咱们常在一个函数中调用经过参数传入的对象,进而使用这个对象的函数去作某事:
class LogPrinter {
fun print() {
...
}
}
fun printLog(logPrinter:LogPrinter) {
logPrinter.print();
}
复制代码
但你须要提早用类去描述这个行为。可是咱们知道类所能描述的不只仅是一个行为!
同时在事件驱动型的设计中,会有线程在不断的等待事件和消费事件,就是并发中的“生产者消费者”模型,假设只有一个消费者,在一些程序中可能称之为主线程,那主线程便是生产者(存在本身给本身发事件的状况),也是消费者。固然,大多生产者是其余线程:
而事件的行为和规则并非固定的,若是用类去提早描述一个行为,是否是有点大材小用了?是否是要提早声明一堆类?那有没有一种东西能够只针对行为的东西,真正要用的时候再定义?
有啊,那不就是函数吗?这里就体现出了回调函数的灵活之处。
内部类我见过,在函数里面声明和实现函数是什么鬼?就是闭包嘛!
java支持以参数形式传递函数吗?一直都支持,只是java8比较明显!
传递后的函数使用的this对象是谁?这里讲个故事你就明白了:
唐僧赶走了孙猴子,但大圣仍是惧怕师傅出事,便拔下了三根毛交给了八戒,告知八戒一出事就吹一根毛,俺老孙天然会出现。
同时为了保证能持续消费事件,不能在主线程中有耗时的操做,而耗时操做常见的都是在计算数据,获取数据等,主线程怎么作到不等待又能在合适的时间拿到结果进行处理?
一个回调函数能解决上面的种种问题,神奇吧!为何这么神奇?
是的,逼逼了这么多就是想看看你是否了解闭包,若是不了解,下面可能会看的云里雾里,可是若是你都懂,恭喜你,下面的内容对你而言极其简单!
不了解的话,先去查查,高阶函数、闭包、java的匿名内部类吧!
我这里将从你们都头疼的线程同步,从实际例子去讲闭包的神奇特性在协程中的应用,来聊聊回调魔法的强大之处。
有这样的一个场景:加载网页资源并绘制。因为加载网页是个耗时的io操做,通常状况下,咱们会将该耗时操做给另外一个线程去执行,以下图所示:
主线程须要resource去绘制显示,因此只能等待线程B执行完成,不然主线程将没有内容能够paint。虽然主线程没有执行耗时操做,可是主线程会阻塞并等待线程B唤醒本身,这是很简单的线程同步操做。
艺术源于生活,咱们的平常生活中也会有这种状况,咱们的解决办法就是:搞定了通知我。因此主线程应该在线程B通知本身加载资源完成后,才去paint。
但咱们一般在这个期间会去作其余的事情而不是等待,因此咱们但愿主线程去继续消费其余事件,而不是在阻塞。
这个时候咱们可使用回调实现非阻塞的通知。
咱们传入的函数回调,并无实现线程切换的功能,你暂且认为这里是线程B内部在invoke这个回调时作了线程切换,也就是将该回调包装后放到队列中,等待主线程消费,后面会详细讲线程切换。loadResource方法相似这个样子:
fun loadResource(callBack:(Resource) ->Unit) {
thread {
val result = load() //耗时
//包装一下,发送给主线程,让主线程执行callBack.invoke()
sendToMainThread(){
callBack.invoke(result)
}
}
}
复制代码
这段代码作到了:“作完通知我”,而且主线程并无被阻塞!也没有由于执行这段代码发生线程状态的变化!这里就能省下线程状态切换带来的开销。
仔细琢磨执行顺序就能发现特别之处:
在没有回调的代码中,主线程的代码执行顺序是:
有回调的代码中:主线程的执行顺序是
注意!在上面这种状况中的showHtmlPage函数内部总体顺序是不变的,顺序必然是:
但回调这个代码块相对于队列中的其余事件而言(事件其实也是一个个代码块),执行顺序发生了变化。因此你要是以为协程就是一个个待执行的代码块,对的!你摸到门道了!将要执行的代码使用回调 ”包装起来“,是无栈协程实现执行状态流转的重要前提之一!而回调能在合适的时机执行!这是线程使得被包装的代码块之间执行顺序发送变化的最好体现!
咱们再看这样的一个例子:
须要从不一样的地址获取资源,也是先用最简单的同步方法去实现:(图中的UIThread就是上文说的主线程,我画图一时懵逼,写错了)
问题也很明显,主线程也被阻塞两次,两次线程同步形成了线程的状态的切换,假设loadResourceA花10秒,loadResourceB花10秒,那从加载完到绘制,总共的时间,模糊估计花了20.10S,我又搬出了回调大法:
注意:(这里的线程状态切换耗费的时间为0.1s,只是为了好理解瞎编的,若是你很想知道线程切换的时间,能够谷歌一下)
fun showHtmlPage() {
val addrA = "A"
val addrB = "B"
loadResource(addrA) { resourceA ->
loadResource(addrB) { resourceB ->
paint(resourceFromA, resource)
}
}
}
复制代码
解决了阻塞问题,主线程有时间去作其余事情,就算假设cpu等调度都不花时间,但代码块从加载完到绘制总共的时间也花了20S,缩短的都是在线程切换过程当中的时间,那也没缩短多少秒。由于加载资源其实咱们仍是串行的。
这时候咱们继续利用并发和回调去合理利用资源:
注意:(这里的线程状态切换耗费的时间为0.1s,只是为了好理解瞎编的,若是你很想知道线程切换的时间,能够谷歌一下)
fun showHtmlPage() {
val addrA = "A"
val addrB = "B"
var resourceFromA:Resource
var resourceFromB:Resource
loadResource(addrA) {resource ->
if (resourceFromB!=null) {
paint(resource,resourceFromB)
}else {
resourceFromA = resource
}
}
loadResource(addrB) {resource ->
if (resourceFromA!=null) {
paint(resourceFromA,resource)
}else {
resourceFromB = resource
}
}
}
复制代码
我把两个加载数据变成了并行,这时加载完到绘制总共的时间,模糊估计也花了10S,这里缩短的就是串行的时机,由于两个资源的加载是同时在进行的。
这时候你会在想:你两个回调是否会有资源竞争问题?没有!前面已经说了loadResource方法内部作了线程切换操做!!!!
但为了更好的的理解为何要作线程切换,咱们先假设存在资源竞争:也就是回调都是由不一样的线程去invoke的,那你只能加锁:(我就简单粗暴的加锁了,是否锁对了,不要太纠结)
fun showHtmlPage() {
val addrA = "A"
val addrB = "B"
var resourceFromA:Resource
var resourceFromB:Resource
loadResource(addrA) {resource ->
synchronized(obj){
if (resourceFromB!=null) {
paint(resource,resourceFromB)
}else {
resourceFromA = resource
}
}
}
loadResource(addrB) {resource ->
synchronized(obj){
if (resourceFromA!=null) {
paint(resourceFromA,resource)
}else {
resourceFromB = resource
}
}
}
}
复制代码
加了锁后,加载完到绘制总共的时间,大胆一花了11.2s,多了这零点几秒就是由于加了锁,synchronized是互斥锁,会阻塞其余的线程。因此咱们为了解决一个问题,引入了一个新问题。
我作线程切换的缘由:这个loadResource方法中内部就作了线程切换,这两个回调必定是在主线程串行的,不存在多线程同时操做resourceFromA、resourceFromB的状况。
这点也颇有意思对吧!这也是利用协程解决资源竞争的一个思想:经过调整回调代码块的执行环境,将更改公共资源的代码块流转到一个线程去执行,进而解决多线程资源竞争的问题。
但在loadResource方法内实现线程切换显然是很差的设计,因此咱们须要设计一个调度器,去指定回调在哪一个线程执行,是否是在必定程度上能够解决多线程并发的资源竞争问题?同时上文的代码中,加载资源会初始化两个线程,那加载100个资源是否是须要100个线程?线程资源的建立和释放又是一个消耗资源的操做,这个又怎么解决?
若是开发者合理的利用调度器:指定操做公共资源的回调在同一个线程执行,这样就不会有资源竞争问题。那可让程序员在写的时候,就指定该代码段(回调)会在哪一个线程执行:
loadResource(UIThread,addrB) { resourceB ->
paint(resourceFromA, resource)
}
复制代码
因此这个调度器若是针对线程环境实现,会更加灵活。那咱们能够给上文的主线程单独设计一个符合下图模型的调度器,:
我相信不少程序员都知道怎么解决频繁建立和释放线程的问题,是的没错,就是线程池。
我在上文有提到:
若是张三李四的工做就是负责和你开会,不须要写代码,不须要画稿子,不须要画原型,那让张三李四上线下线会议的行为反而会影响张三和李四的工做效率
是否能够根据io密集型或运算密集型,选择不一样的调度器?由于只有开发人员知道是哪一种类型。
因此咱们能够:
针对运算型的调度器的线程池能够根据cpu核心数去建立线程数量。
针对io密集型的调度器的线程池能够建立好多个线程。
那这就有三个基本的调度器了!但在上文提到loadResource方法中内部就作了线程切换,假设有不少相似于loadResource的方法,他们是否是也要本身作线程切换?这不符合单一职责对吧?
注意!如下是我为了好理解思想,用java写出来的代码 ,kotlin具体实现协程的代码不彻底是这样的!
(为何是java?由于我用kotlin写这个例子感受怪怪的)
在java中的回调是匿名内部类,因此咱们声明个接口,用来给使用方实例化,做为通用回调,这个回调供用户包装本身的耗时同步代码,因此须要返回值。
public interface FakerCallBack {
public Object invokeSuspend();
}
复制代码
异步拿结果,必然是经过回调,这里要提一下什么叫续体传递风格,其实就是经过回调将结果传递给调用方的意思,举个简单例子:
通常写法:
public String getWelcomeSpeech() {
return "welcome";
}
public void sayWelcome() {
System.out.println(getWelcomeSpeech());
}
复制代码
续体传递风格:
interface FakerCompletionResult {
public void resumeWith(Object value);
}
public void getWelcomeSpeech(FakerCompletionResult completion) {
completion.resumeWith("welcome");
}
public void sayWelcome() {
getWelcomeSpeech(value -> {
System.out.println(value)
});
}
复制代码
因此要定一个能够传递结果的回调
public interface FakerContinuation {
public void resumeWith(Object obj);
}
复制代码
咱们要切换协程,线程认哪一个回调?是的,Runnable,他也是个接口,这里为了区分开,我定义一个Job(不算屡次一举哈,由于这里就是要和线程的区分开,写一个好理解):
public interface FakerJob {
public void start();
}
复制代码
咱们上面定义的回调接口不具有指定线程这些功能,因此咱们须要定义一个外层回调去包装用户传入的代码块,赋予数线程切换的能力,就像AOP。
public interface FakerInterceptor {
public void dispatch(FakerJob callBack);
}
复制代码
调度器就是拦截器的实现,为何?由于要先拦截了,而后才能在指定线程中执行回调嘛!
我写了两个调度器,
固定20个线程的线程池实现的IO密集型调度器。
使用Handler实现切换到主线程执行的调度器。
你要不是个Android开发者,可能不知道什么是Handler。那你能够写个单线程的线程池的调度器,而后实现本身的消息队列,至于如何实现延迟事件执行,达到模拟线程的sleep功能。只须要记住一个思想:延迟不是阻塞,只是挂起这个代码块,让该线程去消费其余代码块,到时间再回来执行。固然若是线程原本就空闲,那消息队列的消费者也是会有等待的状况,实在不会写能够参考android的handler实现。
public class IODispatchers implements FakerInterceptor {
private ExecutorService executor = Executors.newFixedThreadPool(20);
@Override
public void dispatch(FakerJob callBack) {
executor.execute(callBack::start);
}
}
public class MainDispatchers implements FakerInterceptor {
private Handler executor = new Handler(Looper.getMainLooper());
@Override
public void dispatch(FakerJob callBack) {
executor.post(callBack::start);
}
}
复制代码
写个调度器的声明类:
public class FakerDispatchers {
public static FakerInterceptor IO = new IODispatchers();
public static FakerInterceptor Main = new MainDispatchers();
}
复制代码
咱们还须要定义一个回调构造器,统一对外api,让用户方便使用这个强大的回调,有些不须要结果也不执行耗时操做的,callBack能够直接传入null:
public class FakerCompletionBuilder {
public static void launch(FakerInterceptor dispatchers,FakerCallBack callBack, FakerContinuation fakerContinuation) {
dispatchers.dispatch(new FakerJob() {
@Override
public void start() {
if (callBack!=null) {
fakerContinuation.resumeWith(
callBack.invokeSuspend()
);
}else {
fakerContinuation.resumeWith(null);
}
}
});
}
}
复制代码
是骡子是马,拉出来溜溜!我直接在android真机跑,Activity的代码都是kotlin了懒得改代码了,差很少的,你应该看得懂:
(2021.6.21 修改:)
fun launchTest() {
FakerCompletionBuilder.launch(FakerDispatchers.IO, {
Log.d(">>> MainActivity", "launchTest ${Thread.currentThread().name}")
val r = loadResult("addrA")
}) {
FakerCompletionBuilder.launch(
FakerDispatchers.Main, null
) {
Log.d(">>> MainActivity", "launchTest r = $r ${Thread.currentThread().name}")
}
}
}
private fun loadResult(addr: String): String? {
try {
if (addr == addrA) {
Thread.sleep(1000)
} else {
Thread.sleep(3000)
}
} catch (e: InterruptedException) {
e.printStackTrace()
}
return "load end $addr"
}
复制代码
注意,这里的 Thread.sleep是为了模拟耗时,因此此次不算是线程状态变化哈。运行结果:
>>> MainActivity: launchTest pool-1-thread-1
>>> MainActivity: launchTest result = load end addrA main
复制代码
很顺利就切换了线程!
因为我这是android程序,若是在主线程阻塞等待,结果会有偏差,因此我再加一个单线程的调度器:
public class SingleDispatchers implements FakerInterceptor {
private ExecutorService executor = Executors.newSingleThreadExecutor();
@Override
public void dispatch(FakerJob callBack) {
executor.execute(callBack::start);
}
}
public class FakerDispatchers {
public static FakerInterceptor IO = new IODispatchers();
public static FakerInterceptor Main = new MainDispatchers();
public static FakerInterceptor Single = new SingleDispatchers();
}
复制代码
使用:
var i = 0
var j = 0
fun testResourceCompetition() {
(1..10000).forEach {
load()
}
Thread.sleep(10000)
println("i $i")
println("j $j")
}
fun load() {
FakerCompletionBuilder.launch(FakerDispatchers.IO, null) {
j++
FakerCompletionBuilder.launch(FakerDispatchers.Single,null) {
i++
}
}
}
复制代码
结果:
I/System.out: i 10000
I/System.out: j 9869
复制代码
j的值每次都是不同,而i的值一直是10000。不须要加锁就解决了资源竞争,有意思吧!
若是利用上文实现的FakerCompletionBuilder,进行并行加载资源:
fun loadDouble() {
var resultA: String? = null
var resultB: String? = null
FakerCompletionBuilder.launch(FakerDispatchers.IO, { loadResource(addrA) }) {result ->
FakerCompletionBuilder.launch(FakerDispatchers.Main, null) {
resultA = result.toString()
if (resultB != null) {
Log.d(">>> MainActivity", "result = $resultA $resultB")
}
}
}
FakerCompletionBuilder.launch(FakerDispatchers.IO, { loadResource(addrB) }) {result->
FakerCompletionBuilder.launch(FakerDispatchers.Main, null) {
resultB = result.toString()
if (resultA != null) {
Log.d(">>> MainActivity", "result = $resultA $resultB")
}
}
}
}
复制代码
实现一个简单的线程切换的回调不难,在最后一个例子中也给出了同步两个异步线程结果的方式,真正的核心代码执行的顺序是这样:
//这两个咱们但愿并行
loadResource(addrA)
loadResource(addrB)
//但最后必定是:
Log.d(">>> MainActivity", "result = $resultA $resultB")
复制代码
我为了实现这种效果,使用回调将三条语句拆开,并分别包装,这里能够说:我给这几条语句放在了不一样的协程里。
而后经过不一样的线程去执行了loadResource所在的协程,最后使用使用控制流,让结果输出的协程在最后执行。
回顾上文可知:能够经过线程影响这些回调代码块的顺序。但从最后这个例子中,还能够看到:想要从顺序不受控的协程执行后,获得指望顺序的执行结果,还可使用控制流去处理!没有阻塞,没有锁!有意思吧!
从上面的代码看到,条件判断语句会有一些重复代码:
resultB = result.toString()
if (resultA != null) {
Log.d(">>> MainActivity", "result = $resultA $resultB")
}
resultA = result.toString()
if (resultB != null) {
Log.d(">>> MainActivity", "result = $resultA $resultB")
}
复制代码
是否是以为,我应该抽出一个函数?不!千万不要!由于我在回调函数那一小结说了:我但愿的是在某个函数中声明咱们想要的函数,而不是在类中又去声明另外一个函数。简单的说:不想为了改变这个函数的内部行为,污染了这个函数所属的类!因此下面我来解决这重复代码。
当有不少条语句控制条件,咱们会用switch语句,控制条件能够给个状态值。是的,就是状态机:
public void test(state : Int) {
switch (state) {
case 0: {
loadResource(addrA);
break;
}
case 1: {
loadResource(addrB)
break;
}
case 2: {
Log.d(">>> MainActivity", "result = $resultA $resultB")
break;
}
}
}
复制代码
经过传入的state的值,能够控制这几句代码的执行顺序。
但谁调用这个方法?谁存储转态值?
但在不声明新函数?不定义新成员变量的状况下,怎么实现?
public void loadDouble() {
//实例化一个匿名内部类,也就是咱们所说的回调,包装整个loadDouble方法内部的代码
FakerContinuation continuation= new FakerContinuation() {
//初始化状态
int state = 0;
//接收两个结果
String resultA = null;
String resultB = null;
@Override
public void resumeWith(Object obj) {
switch (state) {
case 0: {
//将加载A的代码包装起来,丢给子线程执行,这个launch方法前面没看到,下面会讲,注意这里传了个this
FakerCompletionBuilder.launch(FakerDispatchers.Main, FakerDispatchers.IO, this, new FakerCallBack() {
@Override
public Object invokeSuspend() {
//加载资源A
return loadResource("A");
}
}, new FakerContinuation() {
@Override
public void resumeWith(Object obj) {
//加载资源A完成后,存储值
resultA = obj.toString();
}
});
FakerCompletionBuilder.launch(FakerDispatchers.Main, FakerDispatchers.IO, this, new FakerCallBack() {
@Override
public Object invokeSuspend() {
//加载资源B
return loadResource("B");
}
}, new FakerContinuation() {
@Override
public void resumeWith(Object obj) {
//加载资源B完成后,存储值
resultB = obj.toString();
}
});
//将状态切换为1
state = 1;
return; //退出最外层的resumeWith方法
}
case 1: {
//下一次调用resumeWith方法,会进入到该状态, 进行判断,下一次谁调用,下面会讲
if (resultA == null || resultB == null) {
return;
}
}
}
//输出结果
Log.d(">>> ", resultA + " " + resultB);
}
};
//手动执行父闭包
continuation.resumeWith(null);
}
复制代码
利用闭包的特性,声明了闭包内部的成员变量,而后又启动了两个协程进行资源的加载,可是在状态0的时候已经return了,为何会从新进入到switch?
那是由于我新增了一个启动协程的方法:
//传了父闭包的调度器,和父闭包
public static void launch(FakerInterceptor dispatchersParen, FakerInterceptor dispatchers, FakerContinuation continuationParen, FakerCallBack callBack, FakerContinuation continuation) {
dispatchers.dispatch(new FakerJob() {
@Override
public void start() {
Object obj = callBack.invokeSuspend();
continuation.resumeWith(obj);
//注意这里,使用父闭包的调度器执行从新执行了父闭包的resumeWith,并将结果传过去了,因此刚刚的switch能够执行
dispatchersParen.dispatch(new FakerJob() {
@Override
public void start() {
continuationParen.resumeWith(obj);
}
});
}
});
}
复制代码
不是说不加新方法吗?哈哈,我这里加的是一个通用的协程启动方法,要想实现上文说的效果,最好的方式就是加这个方法啦!并且该方法是FakerCompletionBuilder中定义的,并无污染本来的类和函数。
为何作了个父闭包调度器的切换?必须的呀!由于咱们指望的就是下面这样的执行效果:
suspend fun showHtmlPage () {
val resultA = async{loadResource(addrA)} //异步线程执行
val resultB = async{loadResource(addrB)} //异步线程执行
log(resultA,resultB) //主线程执行
}
复制代码
同样没有加锁!判断语句再也不是重复代码!看吧!switch和回调在一块儿,绕是挺绕的,可是颇有意思对吧!这段其实已经很接近协程了,思想其实已经体现出来了,建议再细细品味一下!!!
switch将须要异步执行和同步执行的语句,分别划分在不一样的闭包中,每执行完一个闭包就改变lable的值(改变执行状态),而后经过子闭包从新调用父闭包的回调方法,父闭包的resumeWith方法从新进入后,因为状态改变了,因此执行了不一样与上一次的代码块。就实现了在父闭包中,各个不一样代码块执行状态的流转,进而实现表现上的同步获得结果。
认真想一想,其实就是将一个完整的代码块,根据开发者的声明(例如:launch{}),拆成不一样的代码块,使用闭包包装起来,而后根据各个代码块的状况调整状态,改变了全部代码块的执行顺序。注意!这里强调了开发者的声明,很明显,分块实际上是由开发者决定的!也就是程序决定的!
这里印证了上文的:将要执行的代码使用回调 ”包装起来“,在合适的时机执行!这是线程回调使得代码执行顺序流转的最好体现!也是无栈协程实现执行状态流转的重要前提之一!
这里父闭包,启动子闭包,子闭包又从新调用父闭包,其实能够看作一个特殊的循环!!可是是递归吗?这个问题留给你!
可是有两个地方很郁闷:
1.切换到父闭包的调度器去回调resumeWith方法,而个人主线程调度器是将闭包丢到了队列里面,若是都是在主线程中,又得等一会才能执行,这个等一下是否必要?
2.咱们明明已经将结果经过父闭包的resumeWith方法返回了,为何switch中还要在子闭包中的resumeWith方法赋值?
//传了父闭包的调度器,和父闭包
public static void launch(FakerInterceptor dispatchersParen, FakerInterceptor dispatchers, FakerContinuation continuationParen, FakerCallBack callBack, FakerContinuation continuation) {
....
//注意这里,使用父闭包的调度器执行从新执行了父闭包的resumeWith,并将结果传过去了,因此刚刚的switch能够执行
dispatchersParen.dispatch(new FakerJob() {
@Override
public void start() {
//这里!!!
continuationParen.resumeWith(obj);
}
});
}
});
}
复制代码
...
}, new FakerContinuation() {
@Override
public void resumeWith(Object obj) {
//加载资源A完成后,存储值
resultA = obj.toString();
}
});
...
复制代码
那是由于我不知是谁传进来的结果,没法确认是结果A仍是结果B!简单地说:我不知道A和B的回调顺序!
这里不是Kotlin协程的真正实现,是我本身写的例子,为何会有这么奇怪的实现?由于不想这么早丢出await,客官不要急嘛!等到下文讲到await后,这个问题就迎刃而解了!
切换到了父闭包的调度器去回调resumeWith方法,由于我这里的主线程调度器是将闭包丢到了队列里面,若是都是在主线程中,又得等一会才能执行,这个等一下是否必要?
是的,我感受不必!因此kotlin中有标志声明是可能挂起!而不是必定挂起!如:函数开头中的suspend的关键字、闭包执行的返回值。
咦!不知不觉就告诉了你:挂起,简单的理解就是暂时不执行这个闭包!
因此咱们能够用最简单的方式模拟一下,前提条件是同一个线程,请牢记这个!
因为在kotlin中,launch的返回值是个job,因此我以为使用async更贴切一点,注意!kotlin的实现不是这样的,可是为了让你更加简单的明白为何不须要挂起,更简单的说明挂起的思想,就手写了这个例子,请务必知悉:
//定义一个标志位
final int SUSPEDN = 1;
//增长一个返回值
public static Object async(FakerInterceptor dispatchersParen, FakerInterceptor dispatchers, FakerContinuation continuationParen, FakerCallBack callBack, FakerContinuation continuation) {
//注意这里,若是调度器不等于父协程的调度器,就挂起
if (dispatchers != dispatchersParen) {
dispatchers.dispatch(new FakerJob() {
@Override
public void start() {
Object obj = callBack.invokeSuspend();
continuation.resumeWith(obj);
dispatchersParen.dispatch(new FakerJob() {
@Override
public void start() {
continuationParen.resumeWith(obj);
}
});
}
});
//返回这个标志位
return SUSPEDN;
} else { //不然不必挂起
Object obj = callBack.invokeSuspend();
continuation.resumeWith(obj); //子闭包也传递结果
return obj; //直接执行,直接返回结果
}
}
复制代码
就是返回一个标志位告诉switch是否须要切换状态而已,使用看看就知道了:
public void loadDouble() {
FakerContinuation continuation= new FakerContinuation() {
int state = 0;
String resultA = null;
String resultB = null;
Object resultLaunchB ;
@Override
public void resumeWith(Object obj) {
switch (state) {
case 0: {
Object resultLaunchA = FakerCompletionBuilder.async(FakerDispatchers.Main, FakerDispatchers.Main, this, new FakerCallBack() {
@Override
public Object invokeSuspend() {
return loadResource("A");
}
}, new FakerContinuation() {
@Override
public void resumeWith(Object obj) {
resultA = obj.toString();
}
});
resultLaunchB = FakerCompletionBuilder.async(FakerDispatchers.Main, FakerDispatchers.IO, this, new FakerCallBack() {
@Override
public Object invokeSuspend() {
return loadResource("B");
}
}, new FakerContinuation() {
@Override
public void resumeWith(Object obj) {
resultB = obj.toString();
}
});
state = 1;
//注意这里,决定了要不要跳闭包等待下一次状态切换,仍是继续执行代码
if (resultLaunchA == SUSPEDN) {
return;
}else {
resultA = resultLaunchA.toString();
}
if (resultLaunchB == SUSPEDN) {
return;
}else {
resultB = resultLaunchB.toString();
}
}
case 1: {
if (resultA == null || resultB == null) {
return;
}
break;
}
}
Log.d(">>> ", resultA + " " + resultB);
}
};
continuation.resumeWith(null);
}
复制代码
要不要继续跳出switch,是由async的返回值决定的,因此要不要挂起,其实就是指:
这下咱们知道了,是否挂起,是不必定的,要看具体条件,在kotlin中就是经过返回值声明的哦!
但有个状况是必然会挂起的!那就是delay!delay的意思很明确,就是延迟一下再执行,体现挂起这个行为的最佳例子!
咱们知道Thread.sleep是会阻塞的,delay为何不会阻塞?而是挂起?若是你已经理解了上文的内容,其实你心中已经有答案了!不理解也不要紧,再往下看看说不定就清晰了!
咱们要实现这样的效果:
log(1)
delay(1000) //延时一秒后再往下执行,可是线程不阻塞
log(2)
复制代码
注意! 我这里就使用主线程调度器的实现,由于简单,为何简单?android开发者看到后,必定会直呼内行!
先改改主线程的调度器:
public class MainDispatchers implements FakerInterceptor {
private Handler executor = new Handler(Looper.getMainLooper());
@Override
public void dispatch(FakerJob callBack) {
executor.post(callBack::start);
}
//是的,就是postDelayed,惊不惊喜意不意外?,因此说生产者消费者模型真的很厉害!
@Override
public void dispatch(Long time,FakerJob callBack) {
executor.postDelayed(callBack::start,time);
}
}
复制代码
再建立一个delay的构造器:
public static Object delay(Long time, FakerInterceptor dispatchersParen, FakerContinuation continuationParen) {
dispatchersParen.dispatch(time, new FakerJob() {
@Override
public void start() {
//延迟结束
continuationParen.resumeWith(SUSPEDN_FINISH);
}
});
//延迟
return SUSPEDN;
}
复制代码
使用:
public void test() {
FakerContinuation continuation = new FakerContinuation() {
int state = 0;
Object resultLaunchB;
@Override
public void resumeWith(Object obj) {
switch (state) {
case 0: {
Log.d(">>> ", "1");
state = 1;
//注意这个this,就是父闭包的引用,因此延时任务是从新回调父闭包的resumeWith方法
Object result = FakerCompletionBuilder.delay(1000L, FakerDispatchers.Main, this);
if (result == SUSPEND) {
return;
}
}
case 1: {
Log.d(">>> ", "2");
}
}
}
};
continuation.resumeWith(null);
}
复制代码
简单吧,就是加入队列,而后排队去了!到了时间后才回到父闭包的resumeWith,从而进入到状态2。这里就像本文一开始的故事中写的同样:
log2主动让出了1秒!而后主线程去消费队列中的执行其余事件!
来看看上文提到的另外一个问题:
咱们明明已经将结果经过父闭包的resumeWith方法返回了,也就是咱们说的续体传递风格,为何switch中还要在子闭包中的resumeWith方法赋值?
由于在不给返回结果添加标志位的状况下,很差控制加载资源A和B的返回结果!那我换个思路,给个对象去存储异步的结果,当结果到了时候,通知父闭包。什么意思呢?
java程序员大多知道Future,由于runnable没有结果返回,正如咱们上文的Job同样:
public interface FakerJob {
public void start();
}
复制代码
因此java提供了个Future来获取runnable执行完的结果,但Future是阻塞的,而协咱们要实现的是非阻塞的Future:
public interface LightFuture {
public Object await(FakerContinuation continuation);
}
复制代码
是否是有deferred的味道了?看看具体实现吧,都在注释里面了:
public class LightFutureImpl implements LightFuture, FakerContinuation {
//状态,异步协程是否执行完毕
public boolean isCompleted = false;
//存储异步协程执行的结果
public Object result;
//父闭包的引用
public FakerContinuation continuation;
//这个方法很熟悉了,传入父闭包,若是结果已经有了,直接返回,若是没有,告知父闭包这里应该挂起!
public Object await(FakerContinuation continuation) {
this.continuation = continuation;
if (isCompleted) {
return result;
}
return FakerContinuation.SUSPEDN;
}
//异步协程通知该Future结果已经获取了
@Override
public void resumeWith(Object obj) {
isCompleted = true;
result = obj;
//父协程没有,将结果存下来,可是不回调
if (continuation != null) {
//通知父协程结果已经拿到,能够进入到下一个状态了
continuation.resumeWith(obj);
}
}
}
复制代码
定义咱们的async构造器:
public static LightFuture async(FakerInterceptor dispatchersParen, FakerInterceptor dispatchers, FakerCallBack callBack) {
//实例化一个LightFutureImpl
LightFutureImpl lightFuture = new LightFutureImpl();
dispatchers.dispatch(new FakerJob() {
@Override
public void start() {
Object result = callBack.invokeSuspend();
//注意这是切换回了父协程的调度器
dispatchersParen.dispatch(new FakerJob() {
@Override
public void start() {
//注意看这里,在异步协程执行完成后,将结果存到LightFutureImpl中
lightFuture.resumeWith(result);
}
});
}
});
return lightFuture;
}
复制代码
使用:
public void test() {
FakerContinuation continuation = new FakerContinuation() {
int state = 0;
//用来获取结果B的Future
LightFuture futureB;
Object resultB;
@Override
public void resumeWith(Object obj) {
//注意,这里定义一个做用域lable17
lable17:
{
//存储其余协程传递的结果
resultB = obj;
switch (state) {
case 0: {
//启动协程,加载资源A
LightFuture futureA = FakerCompletionBuilder.async(FakerDispatchers.Single, FakerDispatchers.IO, new FakerCallBack() {
@Override
public Object invokeSuspend() {
return loadResource("A");
}
});
//启动协程,加载资源B
futureB = FakerCompletionBuilder.async(FakerDispatchers.Single, FakerDispatchers.IO, new FakerCallBack() {
@Override
public Object invokeSuspend() {
return loadResource("B");
}
});
//状态改成1
state = 1;
//尝试去取结果,注意这里传入了父闭包的引用
Object result = futureA.await(this);
//取不到结果,直接挂起,退出这个父闭包,等待下一个状态的到来
if (result == FakerContinuation.SUSPEDN) {
return;
}
}
//协程A执行完成,进入了这个状态,但咱们啥也不干直接跳出switch,为何必定是协程A,稍后讲
case 1: {
break;
}
case 2: {
//直接跳出lable17,注意是lable17!
break lable17;
}
}
//从状态1到这里,输出协程A执行的结果
System.out.println(">>>"+obj.toString());
//状态切到2
state = 2;
//尝试取出结果,注意这里传入了父闭包的引用
resultB = futureB.await(this);
//尝试取不到,就挂起,等待协程B回调该父闭包,进入状态2
if (resultB == FakerContinuation.SUSPEDN) {
return;
}
}// lable17 end
//这句必定是从状态2过来的,因此直接输出B结果
System.out.println(">>>"+resultB.toString());
}
};
continuation.resumeWith(null);
}
private String loadResource(String addr) {
try {
if (addr.equals("A")) {
Thread.sleep(1000);
} else {
Thread.sleep(2000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return "load end " + addr;
}
复制代码
很神奇是吗?为何必定是协程A进入状态1?
由于咱们在状态1前都没有调用过:
//尝试取出结果,注意这里传入了父闭包的引用
resultB = futureB.await(this);
复制代码
因此协程B就算执行完成了,也不会回调父闭包的resumeWith方法,由于咱们有个判空还记得吗?:
public class LightFutureImpl implements LightFuture, FakerContinuation {
public boolean isCompleted = false;
public Object result;
public FakerContinuation continuation;
public Object await(FakerContinuation continuation) {
this.continuation = continuation;
if (isCompleted) {
return result;
}
return FakerContinuation.SUSPEDN;
}
@Override
public void resumeWith(Object obj) {
isCompleted = true;
result = obj;
//这里
if (continuation != null) {
continuation.resumeWith(obj);
}
}
}
复制代码
因此状态1必定是协程A进入的!
其实就将结果暂时存到了别的地方,你取的时候多是直接取到,也可能被挂起,等到父闭包传递进来。实现告终果的同步,可是又不阻塞线程。一个简易版的async就实现啦!
注意!kotlin的await实现很复杂,网上有不少种关于阻塞仍是不阻塞的讨论
@Override
public void resumeWith(@NotNull Object result) {
synchronized (this){
this.result = result;
notifyAll(); // 协程已经结束,通知下面的 wait() 方法中止阻塞
}
}
public void await() throws Throwable {
synchronized (this){
while (true){
Object result = this.result;
if(result == null) wait(); // 调用了 Object.wait(),阻塞当前线程,在 notify 或者 notifyAll 调用时返回
else if(result instanceof Throwable){
throw (Throwable) result;
} else return;
}
}
}
复制代码
其实kotlin的LightFuture也就是Deferred,真正实现是CompletableDeferredImpl,是继承自Job的,由于Job有状态,而LightFuture也有个完成(isCompleted)状态。但怕信息太复杂,你一下看懵了,因此我没搞那么复杂去继承FakerJob。
kotlin关于await的源码超级复杂,看着看着会懵逼,并且因为黑魔法的存在,直接看kotlin层的源码我感受会看傻人。这里你可能有两个疑问:
回答问题1 :
我怎么证实本文中的await实现是对的?我不保证我必定正确,我是阅读了kotlin代码decompile后的java代码,认为实现方式多是这样,下文会给出decompile后的java代码和kotlin代码的对比。
回答问题2:
没办法,java自己就不支持协程,但由于协程是语言层面的东西,因此我才能彻底经过java代码实现协程。下文会给出decompile后的java代码,你就知道为何kotlin写个协程这么简单了。
黑魔法是指编译器对咱们写下的kotlin代码作了处理,例如咱们写下的是这样的代码:
suspend fun showHtmlPage() = runBlocking {
val resultA = async { loadResource("addrA") }
val resultB = async { loadResource("addrB") }
Log.d(">>>", resultA.await().toString()+resultB.await().toString())
}
复制代码
decompile后的java代码是这样的:
若是你理解了上文的全部思想,相信看懂这部分代码必定不难,给点耐心:
public final Object showHtmlPage(@NotNull Continuation $completion) {
return BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
// $FF: synthetic field
private Object L$0;
Object L$1;
Object L$2;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var10000;
String var5;
StringBuilder var6;
Object var7;
label17: {
Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Deferred resultB;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineScope $this$runBlocking = (CoroutineScope)this.L$0;
//注意这
Deferred resultA = BuildersKt.async$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
return MainActivity.this.loadResource("addrA");
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
//注意这里
resultB = BuildersKt.async$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
return MainActivity.this.loadResource("addrB");
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
var6 = new StringBuilder();
var5 = ">>>";
this.L$0 = resultB;
this.L$1 = var5;
this.L$2 = var6;
this.label = 1;
//注意这里!
var10000 = resultA.await(this);
if (var10000 == var8) {
return var8;
}
break;
case 1:
var6 = (StringBuilder)this.L$2;
var5 = (String)this.L$1;
resultB = (Deferred)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
var6 = (StringBuilder)this.L$1;
var5 = (String)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
//注意这里
break label17;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var7 = var10000;
var6 = var6.append(String.valueOf(var7));
var5 = var5;
this.L$0 = var5;
this.L$1 = var6;
this.L$2 = null;
//注意这里
this.label = 2;
var10000 = resultB.await(this);
if (var10000 == var8) {
return var8;
}
}
var7 = var10000;
//注意这里
return Boxing.boxInt(Log.d(var5, var6.append(String.valueOf(var7)).toString()));
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.L$0 = value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 1, (Object)null);
}
private final String loadResource(String addr) {
try {
if (Intrinsics.areEqual(addr, "A")) {
Thread.sleep(2000L);
} else {
Thread.sleep(1000L);
}
} catch (InterruptedException var3) {
var3.printStackTrace();
}
return "load end " + addr;
}
复制代码
能够看到不管协程里面作了啥,都会生成switch语句,我估计是为了统一辈子成吧。
和我上文实现的是否是差很少?因此到底async到底如何实现的,我就是经过这里得出的结论,而且写出了测试代码,得出的结果也比较符合kotlin的async。我工做不到一年,水平有限,因此真正的实现方式仍是得你们伙分析论证。
若是你不纠结实现方式,而是思想,我很高兴你get到了我但愿读者get到的思想:将结果存储,在合适的时候回调父协程,经过父协程的状态机从新执行对应状态的的代码块。
我就是经过这样的思想,实现了几个协程的流转,以及执行结果的同步,并且没有阻塞!
kotlin的协程还有不少我没提到的强大功能,由于写本文的想法很简单:就是想知道kotlin的协程是如何使用闭包实现的。我以为上文写的很简单了,就算表述不行,你跑一下代码就能恍然大悟!我真的尽力了铁子们😭,但愿大家读完这一万多个字能有收获!