轻量级 JS 任务调度工具,容许并行数控制,超时,重试,错误抓取,运行状态统计等,同时支持多种调度模式,包括当即调度、按频率调度等。npm
为何说任务执行队列对 JS 来讲比其它热门语言更为重要?因为 JS 是事件驱动型的动态脚本语言,咱们最多见到的就是在代码里各类各样的异步回调,NodeJs 标榜的轻量高效也是基于 Javascript 非阻塞 I/O 模型而谈的,能够说,基于回调的编程思惟是 JS 的一大特点,但有时它会出问题,例如在 for 循环中执行异步任务,因为每一个任务都是瞬间返回的,也就是说,循环会很快遍历完,但任务倒是堆积的,轻则服务繁忙,重则直接把服务压垮,如何轻巧地控制任务并行量不少时候是一个痛点。编程
npm i bobolink
建立一个默认配置的Bobolinkpromise
按照须要建立一个Bobolink实例(Bobolink实例之间互不影响, 因此能够多种场景使用多个Bobolink,甚至能够经过多个Bobolink合从而应对一些复杂场景)异步
const Bobolink = require('bobolink'); // 每一个Bobolink实例都有一个大的队列用于存听任务,因此能够很放心地将任务扔给它,适当的时机下Bobolink会很可靠地调度这些任务。 let q = new Bobolink();
put单个任务函数
因为Promise的执行代码在建立的时刻就已经被执行(then和catch内的代码则经过回调执行),因此简单把Promise扔进Bobolink是不可行的工具
// 下面的打印序是 1 2 3 new Promise(resolve => { console.log(1); resolve(3) }).then(res => { console.log(res); }) console.log(2)
经过将Promise扔进一个function能够达到延期执行的效果ui
function p() { // 返回promise任务 return new Promise(resolve => { console.log(1); resolve(3) }).then(res => { console.log(res); }) } console.log(2);
此时p必须等待调用才会执行内部的Promise代码,且p返回的是该Promise,值即可以继续传递。 每一个放置到Bobolink的Promise任务都应该以这种方式封装spa
function p() { return new Promise(resolve => { console.log(1); resolve(2) }).then(res => { console.log(res); return 3; }) } // 因为队列很空闲, 能够当即调度本任务, // 因此很快就成功打印出了1, 以后的then则须要等待合适的时机回调, // 若是Promise及其上面的全部then都执行完了, 最终会传递到put.then q.put(p).then(task => { // 打印最终值3 console.log(task.res) });
固然,若是在put的时候,队列执行中的任务数已经到达最大并行量,则须要等待有任务执行完成时腾出空间,而且排在当前任务以前的任务已经都被调度完了才会获得执行。队列
put一组任务事件
Bobolink容许同时put多个任务,且put.then会在该组任务都被执行完毕时才被调用
function getP(flag) { return function p() { return new Promise(resolve => { resolve(flag) }); } } q.put([getP(1), getP(2), getP(3)]).then(tasks => { // 打印每一个任务的返回值, 按放入顺序一一对应 for (let i = 0; i < tasks.length; i++) { console.log(tasks[i].res); } })
配置
目前支持的参数以下:
let q = new Bobolink({ // 最大并行数,最小为1 concurrency: 5, // 任务超时时间ms,0不超时 timeout: 15000, // 任务失败重试次数,0不重试 retry: 0, // 是否优先处理失败重试的任务,为true则失败的任务会被放置到队列头 retryPrior: false, // 是否优先处理新任务,为true则新任务会被放置到队列头 newPrior: false, // 最大可排队的任务数, -1为无限制, 超过最大限制时添加任务将返回错误'bobolink_exceeded_maximum_task_number' max: -1, // 指定任务的调度模式,仅在初始化时设置有效 scheduling: { // 默认为'immediately',任务将在队列空闲时当即获得调度。 // 你也能够将它设置为'frequency', 而且指定countPerSecond, Bobolink将严格地按照设定的频率去调度任务。 enable: 'frequency', frequency: { // 每秒须要调度的任务数,仅在任务队列有空闲时才会真正调度。 countPerSecond: 10000 } }, // 任务失败的handler函数,若是设置了重试,同个任务失败屡次会执行catch屡次 catch: (err) => { } });
参数能够在运行期更改, 对后续生效
q.setOptions({ concurrency: 5, timeout: 15000, retry: 0, retryPrior: false, newPrior: false, catch: null });
任务运行状态
使用Bobolink执行的Promise任务全部错误会被catch并包装,因此只存在put.then而不存在put.catch(除非put.then自身出错)。任务执行以后获取到的响应有一些有用的值能够用于服务统计
taskRes = { // 执行是否遇到错误, 判断任务是否执行成功的判断依据是err === undefined, err为任何其它值都表明了运行失败。 // 任务出错时, 若是不重试, 那么catch到的错误会直接放入err, 超时时err为'bobolink_timeout' // 若是重试, 且在最大重试次数以后依然错误的话, 会将最后一次的错误放入err // 若是重试, 且在重试期间成功的话, 被认为是成功的, 因此err为空 err: undefined, // 执行Promise返回的结果 res: Object, // 从任务放入队列到该任务最后一次被调度, 所通过的时间(ms) waittingTime: 20, // 该任务最后一次运行的时间(ms) runTime: 1, // 该任务出错重试的次数 retry: 2 }
插队
除了队列控制参数newPrior和retryPrior以外,也容许在put的时候指定当前任务是否优先处理
Bobolink.ptototype.put(tasks, prior)
默认状况下,任务是放入队尾的,但若是指定了prior为true,则会被放置到队头,put任务组时会维持组任务本来的顺序,并整个放入队头。
更多
q.options:获取当前队列的配置。
q.queueTaskSize:获取队列排队中的任务数。
q.runningTaskCount:获取队列执行中的任务数。