本文已收录 【修炼内功】跃迁之路
在上一篇文章 JVM 细说线程中已经介绍了应用程序常见的一些线程模型,本篇就上篇说起的协程作简单的介绍
谈到并发/异步,首先想到的可能即是线程/进程,Java在近20年的发展中从JDK1.2以后便采用1:1线程模型,Java在核心类库中提供了众多异步API,可使多线程应用发挥强大的并发能力并得到不错的性能javascript
现在,在不少高并发的场景下(如I/O密集型)操做系统的线程调度成为了性能的瓶颈,每每cpu使用率及内存使用率还稳如泰山,但系统load已经堵到不行html
那,协程可以为I/O密集型的场景带来什么帮助?本篇就从Node.js的异步API聊起前端
总有人会说,协程其实就是线程,只不过是换了一种写法的语法糖,就如同Java8中的Lambda表达式,也总有人会说Lambda表达式只不过是匿名类的语法糖而已(见Java8 Lambda到底是不是匿名类的语法糖),然,非也vue
如上篇文章所述,N:M线程模型能够解决N:1模型中阻塞问题,同时也能充分利用CPU的多核优点,这也是大部分协程实现的基础java
N
能够理解为用户线程数,其数量根据业务逻辑须要而定,M
能够理解为内核线程数,其数量固定(或相对固定),每个用户线程都须要放到内核线程中才能执行,用户线程的调度由应用程序管理(甚至能够交由编程人员经过编写程序管理)。而协程则能够理解为上述的用户线程,一种更为轻量级的线程node
为何恰恰选Node.js聊协程?这要从Node.js的架构特色提及~ git
Node.js是单线程么?是,也不是~es6
Node.js使用事件驱动及非阻塞I/O实现异步模型github
Node.js is a platform built on Chrome's JavaScript runtime for easily building fast, scalable network applications. Node.js uses an event-driven, non-blocking I/O model that makes it lightweight and efficient, perfect for data-intensive real-time applications that run across distributed devices.
APPLICATION为咱们所编写的应用层,其JS的解释执行由V8引擎负责,这里的执行线程只有一个
,这也就是一般所说Node.js为单线程的缘由,在编写Node.js程序时没有办法建立子线程,同时若是有部分逻辑阻塞或者长时间运行,则会影响整个运行时npm
Node.js的高并发则依赖事件驱动(EVENT LOOP)及非阻塞I/O(LIBUV),在进行I/O操做时Node.js将任务交由UV线程池
中的线程执行,并在事件队列中注册回调,在I/O操做完成时触发回调继续后续的动做,在整个I/O操做的过程当中并不会阻塞JS的解释执行
Node.js的工做线程数固定(可经过环境变量UV_THREADPOOL_SIZE
指定),每一个工做线程对应一个内核线程,工做线程数能够理解为N:M线程模型中的M
Node.js应用层的异步任务由开发人员编写,每一个异步任务能够理解为用户线程,任务数对应于N:M线程模型中的N
因为Node.js上述的特色(单执行线程,多工做线程),没有过多的干扰,很是适合用来说述协程的概念及应用
为了按部就班地理解协程的概念,咱们从异步的常规实现方式一一提及,来说述协程的演变过程(对经常使用异步编程无感的可直接跳到协程一节)
将函数作为另外一个函数的入参传入,并由调用方在合适的时机(处理完成或失败)进行调用
// 获取个人信息 let request = $.get( "user/info/me", // 回调函数 function(data) { /* do something */ } ); // 2秒后取消请求(若是请求仍未返回) setTimeout( // 回调函数 function() { if (!!request) { request.abort() } }, 2000 );
回调能够在必定程度上将主流程与异步逻辑分离,异步逻辑的处理不会阻塞主流程的执行,但回调也带了一些问题
Future.get
以阻塞方式等待异步结果)Callback Hell
如,将文件夹下的全部图片拼接成一张图片
fs.readdir(source, function(err, files) { if (err) { console.error('Error finding files: ', err); } else { files.forEach(function(filename, fileIndex) { console.info(filename); gm(source + filename).size(function(err, values) { if (err) { console.error('Error identifying file size: ', err); } else { console.info(filename + ' : ' + values); aspect = values.width / values.height; widths.forEach(function(width, widthIndex) { height = Math.round(width / aspect); console.info('resizing ' + filename + 'to ' + height + 'x' + height); let destFile = dest + "w" + width + '_' + filename; this.resize(width, height).write(destFile, function(err) { if (err) { console.error('Error writing file: ', err); } else { console.info('Writing file to: ', destFile); } }); }); } }); }); } });
大量的回调函数嵌套在一块儿,可阅读性和可维护性都并不高
再如,多个http请求存在先后依赖关系,前一个请求的返回值做为后一个请求的参数
$.get("step/1", (data1) => { $.get(`step/2/${data1}`, (data2) => { $.get(`step/3/${data2}`, (data3) => { /* do the final thing */ }) }) })
化解Callback Hell的方法有不少,其中最简单的方式即是将代码模块化、扁平化
function step(url, then) { $.get(url, (data) => { then(data); }); } function doFinalThing(data) { /* do the final thing */ } // Thunk 化 function proxyStep(url, then) { return (data) => { step(`${url}/${data}`, then); } } let step3Proxy = proxyStep('step/3', doFinalThing); let step2Proxy = proxyStep('step/2', step3Proxy); step('step/1', step2Proxy);
Thunk
的概念在下文会有介绍,在将逻辑进行抽象化、模块化以后,代码则会变得清晰起来
事件是回调的另外一种形式,在代码逻辑的分离上作的更为完全
回调的事件化
// 定义事件 class EventGet { constructor(url = '') { this.url = url; // 事件完成时的回调 this.onComplete = (data) => {} } // 触发事件 emitGet() { $.get(this.url, this.onComplete) } } let step1 = new EventGet('step/1') let step2 = new EventGet('step/2') let step3 = new EventGet('step/3') step1.onComplete = (data) => { // step1完成时触发step2 step2.url += `/${data}`; step2.emitGet(); } step2.onComplete = (data) => { // step2完成时触发step3 step3.url += `/${data}`; step3.emitGet(); } step3.onComplete = (data) => { /* do the final thing */ } // 触发step1 step1.emitGet();
事件相比于单纯的回调更为语义化,也更容易表达程序所要执行的逻辑
这里是否让你想起了Java中的CompletableFuture
?但请再次注意,Node.js中应用层的解释执行只有一个线程,每次GET请求并不是建立了一个内核线程去执行,而是交给了UV线程池,由事件机制来回调onComplete函数处理请求的结果
其实,若是作过前端开发,随处均可以看到回调或事件的使用
<nz-button-group> <button nz-button nzType="default" (click)="cancle()">取消</button> <button nz-button nzType="danger" (click)="delete()">删除</button> </nz-button-group>
function clickHandler() { /* do something when clicking */ } var btn = document.getElementById("btn"); btn.onclick = clickHandler; btn.addEventListener("click", clickHandler, false);
Promise 是异步编程的一种解决方案,比传统的解决方案(回调函数和事件)更合理、更强大
Promise有三个状态,pending
(进行中)、fulfilled
(已成功)和rejected
(已失败),pending
能够转换为fulfilled
或rejected
其中之一,且状态一旦转换就不会再变
promise的使用详见ex6-promise,这里不赘述,若是将上述回调或事件的示例转为promise的方式,能够编写以下
function promiseGet(url) { return new Promise((resolve, reject) => { $.get(url, resolve, reject); }); } // 回调嵌套改成链式调用 promiseGet( "step/1" ).then((data1) => { return promiseGet(`step/2/${data1}`) }).then((data2) => { return promiseGet(`step/3/${data2}`) }).then((data3) => { /* do the final thing*/ }).catch((e) => { /* handle exception */ }).finally(() => { /* do finnaly */ });
promise可将先后有依赖关系的异步处理转换为链式调用的形式,一样是回调,却能够大大避免Callback Hell,并使调用逻辑更加清晰
同时,promise还能够轻松编写并行代码
function promiseGet(url) { return new Promise((resolve, reject) => { $.get(url, resolve, reject); }); } Promise.all([ promiseGet("user/info/张三"), promiseGet("user/info/李四"), promiseGet("user/info/赵五") ]).then((users) => { /* do something */ }).catch((e) => { /* handle exception */ }).finally(() => { /* do finnaly */ })
响应式编程做为近几年很火的一种编程范式,以一种流(Stream)的方式处理数据,响应式的概念十分庞大,这里不作详述,如下以一个rxjs示例展现响应式编程如何解耦异步逻辑
function getOnObserver(url, observer) { $.get( 'user/info/张三', (data) => { observer.next(data); observer.complete(); }, (error) => { observer.error(error) } ); } // 建立流 let observable = Observable.create((observer) => { getOnObserver('user/info/张三', observer); getOnObserver('user/info/李四', observer); getOnObserver('user/info/赵五', observer); }); // 订阅/消费流 observable.subscribe((data) => { /* do something for each result */ });
响应式编程的威力远不止此,流式处理有丰富的api(可简单参考Java8中Stream API)、背压保护策略等等,经过其事件回调机制能够在I/O密集型应用中一展身手
在粗略了解了几种常规的异步编程方式以后,从本节内容开始真正进入协程的范畴
子程序(或者称为函数),在全部语言中都是层级调用,严格遵循线程栈的入栈出栈,子程序调用老是一个入口一个返回,调用顺序是明确的
而协程的调用和子程序不一样,协程看上去也是子程序,但执行过程当中协程内部可中断,而后转而执行别的子程序/协程,在适当的时候再返回来接着执行
generator 函数是 ES6 提供的一种异步编程解决方案,语法行为与传统函数彻底不一样
function* helloGenerator() { yield console.log('hello'); yield console.log('I\'m'); } function* worldGenerator() { yield console.log('world'); yield console.log('ManerFan'); } let hello = helloGenerator(); let world = worldGenerator(); // 交替执行helloGenerator及worldGenerator hello.next(); world.next(); hello.next(); world.next();
运行结果
hello world I'm ManerFan
按照常理,在同一个线程中顺序调用helloGenerator
及worldGenerator
,两个函数均会按照调用顺序完整的执行,按预期应该输出
hello I'm world ManerFan
在使用generator
时,其next
方法会在方法体内遇到yield
关键字时暂停执行,交回该函数的执行权,相似于线程的挂起,所以generator也被称之为暂停函数
generator函数能够在内部使用yield关键字标识暂停点,generator函数的暂停、恢复执行可由应用程序灵活控制(内核线程的调度由系统控制),这与传统函数的执行规则彻底不一样,generator函数的调度权彻底交给了应用层
yield
关键字除了标识暂停点以外,还能够在恢复执行的时候传值进来(generator更高阶的用法详见es6-generator)
function* foo(x) { var y = 2 * (yield (x + 1)); var z = yield (y / 3); return (x + y + z); } let f = foo(5); let step1 = f.next(); console.log(step1); // { value:6, done:false } let step2 = f.next(12); console.log(step2); // { value:8, done:false } let step3 = f.next(13); console.log(step3); // { value:42, done:true }
不论事件仍是Promise亦或响应式都离不开回调,事件将主流程与异步回调分离,Promise将异步回调转为链式回调,响应式将异步回调转为流式回调,当generator遇到异步回调会发生什么?
如下,模拟定义$.get
函数以下
let $ = { get(url, callback) { setTimeout(() => callback(url.substring(5)), 500); } }
以上文回调嵌套为例
// 回调方式 $.get("step/1", (data1) => { $.get(`step/2/${data1}`, (data2) => { $.get(`step/3/${data2}`, (data3) => { /* do the final thing */ }) }) })
利用generator可暂停、可恢复的能力,可在异步回调逻辑中触发恢复下一步的动做,并将当前的异步处理结果带回,以此将回调嵌套拉平,将异步回调逻辑写出同步的顺滑感,咱们称之为异步逻辑的“同步化”(同步的写法,异步的执行)
// 封装异步调用 function get(url) { $.get(url, (data) => { // 触发后续流程,并将数据代入后续流程 req.next(data) }) } // generator 异步逻辑同步化 function* asyncAsSync() { // 同步的写法,异步的执行 let result1 = yield get('step/1'); let result2 = yield get(`step/2/${result1}`); let result3 = yield get(`step/3/${result2}`); console.log(result3); /* do the final thing */ } // 生成generator var req = asyncAsSync(); // 触发一次 req.next(); // do something at the same time console.log('do something at the same time when excute gets');
输出
do something at the same time when excute gets 3/2/1
asyncAsSync
函数中看似是同步的逻辑,实则每个yield get()
都是一次异步调用,异步的结果经过req.next()
带回,而且asyncAsSync
函数的调用并不会阻塞最后一行console.log
的执行
在使用generator的过程当中其实并非很方便,generator函数的暂停与恢复须要使用程序控制,这对于编写程序来讲门槛会提升,那有没有一种方法能够自动的执行generator的next函数呢?
首先介绍generator自动执行使用的一种函数变形方式,柯里化(Thunk)
thunk函数的定义
// fn(arg1, arg2, arg3, ..., callback) const thunk = function(fn) { return function (...args) { return function (callback) { return fn.call(this, ...args, callback); } }; };
首先,被柯里化函数的入参知足如下规则
柯里化将函数拆解成了两部分,一部分为只要参数入参的函数,一部分为只有回调函数入参的函数,其使用方法以下
// $.get为一函数,其入参为url及回调 // 对$.get进行柯里化 // getThunk为一函数,其入参为url const getThunk = thunk($.get); // getInfoMe为一函数,其入参为回调函数 const getInfoMe = getThunk('user/info/me'); getInfoMe(function(data) => { /* do something */ })
对thunk实现比较完美的库参见npm-thunkify
使用thunk实现generator的自动执行器
// 自动执行器 function co(fn) { var gen = fn(); // nextCallBack(data)函数的执行,来自 // 1. 下方的首次显示触发 // 2. yield值的回调触发 function nextCallBack(data) { // result的值为一个接收回调函数的函数 var result = gen.next(data); if (result.done) return; // 执行result值中的函数,并将回调参数设置为nextCallBack(data),异步递归回调 // 当回调的时候继续执行nextCallBack(data),并将执行结果代入 result.value(nextCallBack); } nextCallBack(); }
let getThunk = thunk($.get); // generator 异步逻辑同步化 function* asyncAsSync() { let result1 = yield getThunk('step/1'); let result2 = yield getThunk(`step/2/${result1}`); let result3 = yield getThunk(`step/3/${result2}`); console.log(result3); /* do the final thing */ } co(asyncAsSync); // do something at the same time console.log('do something at the same time when excute gets');
输出
do something at the same time when excute gets 3/2/1
getThunk(url)
的执行结果为一个函数,咱们记为getStep(callback)
,该函数的入参为一个回调函数callback,而callbak即$.get
的回调函数(这里很重要,参见thunk定义)
关键在于co
中的nextCallBack(data)
,咱们记为co#nextCallBack(data)
co#nextCallBack(data)
会触发一次yield
,yeild
的结果即getThunk(url)
的执行结果getStep(callback)
,而getStep(callback)
的入参又被设为co#nextCallBack(data)
,执行异步递归回调
上述代码的执行逻辑为
gen.next(data)
执行yield getThunk('step/1')
获得result,其值为函数getStep(callback)
result.value(nextCallBack)
执行getStep(callback)
,并将callback入参设置为co#nextCallBack
getStep(co#nextCallBack)
执行$.get('step/1')
,当数据返回时执行回调函数co#nextCallBack(data)
,其中data为$.get('step/1')
的结果1
gen.next(1)
执行yield getThunk('step/2/1')
获得函数getStep(callback)
result.value(nextCallBack)
执行getStep(callback)
,并将callback入参设置为co#nextCallBack(data)
getStep(co#nextCallBack)
执行$.get('step/2/1')
,当数据返回时执行回调函数co#nextCallBack(data)
,其中data为$.get('step/2/1')
的结果2/1
generator的自动执行关键在于如下几点
co#nextCallBack
中触发generator的nextco#nextCallBack
,执行并递归回调co#nextCallBack
并将执行结果代入将generator的精髓用到极致的还要当属koa(koa2已经使用async改写,再也不使用generator),它将http server端异步middleware的书写体验整个提高了一个层级
middleware相似于java servlet中的filter,其执行过程相似于剥洋葱
而当全部的middleware(包括核心core)都是异步的话,整个处理逻辑在各middleware之间的跳转就变得复杂起来
koa使用generator的特性,巧妙实现了请求处理逻辑在各异步middleware间的灵活跳转执行
如下,简单模拟koa-middleware的实现逻辑
// 定义app let app = { middlewares: [], core: function* (next) { console.log("excute core!"); // yield 异步操做 yield* next; }, // 将多个middleware组合成链式结构 compose(middlewares) { function* noop() {} return function* (next){ var i = middlewares.length; var prev = next || noop(); var curr; while (i--) { curr = middlewares[i]; prev = curr.call(this, prev); } yield* prev; } }, // 添加middleware use(middleware) { this.middlewares.push(middleware); }, run() { let chain = this.compose([...this.middlewares, this.core]); co(chain); } }
app.use(function* (next) { console.log("before middleware1"); // yield 异步操做 yield* next; console.log("after middleware1"); // yield 异步操做 }); app.use(function* (next) { console.log("before middleware2"); // yield 异步操做 yield* next; console.log("after middleware2"); // yield 异步操做 }); app.run();
输出
before middleware1 before middleware2 excute core! after middleware2 after middleware1
以上,generator的自动执行依赖于thunk化,而thunk又很是生涩难懂,若是将generator与Promise结合,或许会更容易理解一些
function promiseGet(url) { return new Promise((resolve, _) => { $.get(url, resolve); }); } function * asyncAsSync() { let result1 = yield promiseGet('step/1'); let result2 = yield promiseGet(`step/2/${result1}`); let result3 = yield promiseGet(`step/3/${result2}`); console.log(result3); /* do the final thing */ } function co(fn) { var g = fn(); // nextCallBack(data)函数的执行,来自 // 1. 下方的首次显示触发 // 2. yield值的回调触发 function nextCallBack(data){ // result的值为Promise对象 var result = g.next(data); if (result.done) return result.value; result.value.then(function(data){ // Promise的then回调中,递归执行nextCallBack(data),并将执行结果代入 nextCallBack(data); }); } nextCallBack(); } co(asyncAsSync); // do something at the same time console.log('do something at the same time when excute gets');
对generator自动执行封装较好的看tj大神的tj-co
generator(function*)? yield? thunk? co? 想使用generator实在不要再复杂,不过好在es2017开始提供了async/await
function promiseGet(url) { return new Promise((resolve, _) => { $.get(url, resolve); }); } // 异步代码同步化 async function asyncAsSync() { let result1 = await promiseGet('step/1'); let result2 = await promiseGet(`step/2/${result1}`); let result3 = await promiseGet(`step/3/${result2}`); console.log(result3); /* do the final thing */ } asyncAsSync(); // do something at the same time console.log('do something at the same time when excute gets');
输出
do something at the same time when excute gets 3/2/1
没有了生涩的thunk,没有了烧脑的自动执行器,代码得以变得更加清爽
简单来说,async
其实就是generator
的语法糖
async
替代generator的标星函数function*
await
替代yield
await
后可跟普通值、普通函数及Promise对象async
自带自动执行器async/await
相比generator + thunk/Promise + co
的方案,更加语义化,也更容易理解
借助Promise的能力,还能够异步并行处理数据
function promiseGet(url) { return new Promise((resolve, reject) => { $.get(url, resolve, reject); }); } async function asyncRun() { let names = await Promise.all([ promiseGet("user/info/张三"), promiseGet("user/info/李四"), promiseGet("user/info/赵五") ]) names.forEach((name) => console.log(name)); } asyncRun().catch((e) => { /* handle exception */}) // do something at the same time console.log('do something at the same time when excute gets');
输出
do something at the same time when excute gets 张三 李四 赵五
更多async
的用法详见async/await
使用async/await
的koa
middleware处理逻辑能够简单模拟以下
// 定义app let app = { middlewares: [], core: async (next) => { console.log("excute core!"); // await 异步操做 await next; }, // 将多个middleware组合成链式结构 compose(middlewares) { var i = middlewares.length; var prev = Promise.resolve(); var curr; while (i--) { curr = middlewares[i]; prev = curr.bind(this, prev); } return prev; }, // 添加middleware use(middleware) { this.middlewares.push(middleware); }, run() { let chain = this.compose([...this.middlewares, this.core]); chain(); } }
app.use(async (next) => { console.log("before middleware1"); // await 异步操做 await next(); console.log("after middleware1"); // await 异步操做 }); app.use(async (next) => { console.log("before middleware2"); // await 异步操做 await next(); console.log("after middleware2"); // await 异步操做 }); app.run();
输出
before middleware1 before middleware2 excute core! after middleware2 after middleware1