原文地址javascript
想不到我在平常开发中,居然遇到“优先异步队列”的需求。github上有相似功能,而且集成redis的库有Kue、Bull等。可是,对于追求“简、易、轻”的我来讲,其实并不满意。根据“二八原则”,我决定,本身来实现一个只有上述二者“两成”功能的轻量级开源库:priority-async-queue。java
开源库的命名为何这么长呢?缘由是,我认为开发者只需看一眼库名就知道其功能,一点都不含糊。可是,为了行文流畅,priority-async-queue
下面统一简称为“paq”。git
paq
按照 redux 做者的套路,首先,咱们须要明确的是,你可能不须要 paq。github
只有遇到 N 个异步任务不能并行执行,而且只能顺序执行时,你才须要使用 paq。redis
简单来讲,若是你须要执行的 N 个异步任务,并不存在资源争夺和占用、数据共享、严格的逻辑顺序、优先权对比等,paq 多是不必的,用了反而下降执行效率、影响程序性能。redux
paq 的设计思路很是简单,一共3个类:异步
Task
是描述每一个待执行(异步/同步)任务的执行逻辑以及配置参数。PriorityQueue
是控制每一个待执行(异步/同步)任务的优先级队列、具备队列的基本属性和操做。AsyncQueue
是控制每一个待执行(异步/同步)任务能严格顺序地执行的队列。下面是 paq 的程序流程图:async
1. addTask函数
addTask 是核心方法,它能够建立一个任务,而且添加到 paq 队列中。性能
paq.addTask([options, ]callback);
复制代码
options
是一个可选对象,包含如下属性:
{
id: undefined, // 任务id
priority: 'normal', // 任务权重,例如: low, normal, mid, high, urgent
context: null, // 执行任务的上下文
start: (ctx, options) => {}, // 任务将要被执行的回调
completed: (ctx, res) => {}, // 任务执行完成后的回调
failed: (ctx, err) => {}, // 任务执行失败后的回调
remove: (ctx, options) => {} // 任务被删除后的回调
}
复制代码
callback
是一个描述执行任务的逻辑的函数,它包含两个参数:ctx
和 options
:
ctx
是任务所属的paq实例。options
是此任务的options参数的最终值。paq.addTask((ctx, options) => {
console.log(ctx === paq); // true
});
复制代码
2. removeTask
removeTask 方法是根据任务 id 来删除等待对列中的任务。
paq.removeTask(taskId);
复制代码
若是成功删除任务,它将返回true。不然,它将返回false。
3. pause
pause 方法是暂停 paq 继续执行任务。
paq.pause();
复制代码
注意: 可是,您没法暂停当前正在执行的任务,由于没法暂时检测到异步任务的进度。
4. isPause
isPause 属性,返回当前队列是否处于暂停状态。
paq.isPause; // return true or false.
复制代码
5. resume
resume 方法,用来从新启动 paq 队列执行任务。
paq.resume();
复制代码
6. clearTask
cleartTask 方法,用来清除 paq 等待队列中全部的任务。
paq.clearTask();
复制代码
1. 基本用法
只要向 paq 添加任务,该任务就会自动被执行。
const PAQ = require('priority-async-queue');
const paq = new PAQ();
paq.addTask((ctx, options) => {
console.log('This is a simple task!');
});
// This is a simple task!
复制代码
2. 同步任务
你能够使用 paq 执行一系列同步任务,例如:
const syncTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask(() => {
console.log('Step', i, 'sync');
return i;
});
}
};
syncTask(3);
// Step 0 sync
// Step 1 sync
// Step 2 sync
复制代码
3. 异步任务
你还能够使用 paq 执行一系列的异步任务,例如:
const asyncTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask(() => {
return new Promise(resolve => {
setTimeout(() => {
console.log('Step', i, 'async');
resolve(i);
}, i * 1000);
});
});
}
};
asyncTask(3);
// Step 0 async
// Step 1 async
// Step 2 async
复制代码
4. 混合任务
你甚至能够使用 paq 执行一系列同步和异步交错的任务,例如:
const mixTask = (n) => {
asyncTask(n);
syncTask(n);
asyncTask(n);
};
mixTask(2);
// Step 0 async
// Step 1 async
// Step 0 sync
// Step 1 sync
// Step 0 async
// Step 1 async
复制代码
5. 绑定执行上下文
有时若是你须要指定上下文来执行任务,例如:
const contextTask = (n) => {
var testObj = {
name: 'foo',
sayName: (name) => {
console.log(name);
}
};
for (let i = 0; i < n; i++) {
paq.addTask({ context: testObj }, function () {
this.sayName(this.name + i);
});
}
};
contextTask(3);
// foo0
// foo1
// foo2
复制代码
注意: this
在箭头函数中并不存在,或者说它是指向其定义处的上下文。
6. 延迟执行
paq 还支持延迟执行任务,例如:
const delayTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({ delay: 1000 * i }, () => {
console.log('Step', i, 'sync');
return i;
});
}
};
delayTask(3);
// Step 0 sync
// Step 1 sync
// Step 2 sync
复制代码
7. 优先权
若是须要执行的任务具备权重,例如:
const priorityTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({ priority: i === n - 1 ? 'high' : 'normal' }, () => {
return new Promise(resolve => {
setTimeout(() => {
console.log('Step', i, 'async');
resolve(i);
}, i * 1000);
});
});
}
};
priorityTask(5);
// Step 0 async
// Step 4 async
// Step 1 async
// Step 2 async
// Step 3 async
复制代码
默认优先级映射以下:
{
"low": 1,
"normal": 0, // default
"mid": -1,
"high": -2,
"urgent": -3
}
复制代码
8. 回调函数
有时,你但愿可以在任务的开始、完成、失败、被删除时作点事情,例如
const callbackTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({
id: i,
start: (ctx, options) => {
console.log('start running task id is', options.id);
},
completed: (ctx, res) => {
console.log('complete, result is', res);
},
failed: (ctx, err) => {
console.log(err);
}
}, () => {
if (i < n / 2) {
throw new Error(i + ' is too small!');
}
return i;
});
}
};
callbackTask(5);
// start running task id is 0
// Error: 0 is too small!
// start running task id is 1
// Error: 1 is too small!
// start running task id is 2
// Error: 2 is too small!
// start running task id is 3
// complete, result is 3
// start running task id is 4
// complete, result is 4
复制代码
9. 删除任务
有时,你须要删除一些任务,例如:
const removeTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({
id: i,
remove: (ctx, options) => {
console.log('remove task id is', options.id);
}
}, () => {
return new Promise(resolve => {
setTimeout(() => {
console.log('Step', i, 'async');
resolve(i);
}, i * 1000);
});
});
}
console.log(paq.removeTask(3));
console.log(paq.removeTask(5));
};
removeTask(5);
// remove task id is 3
// true
// false
// Step 0 async
// Step 1 async
// Step 2 async
// Step 4 async
复制代码
注意: 你必须在建立任务时分配id,并根据id删除任务。
若是须要监视 paq 队列的状态,paq 提供如下事件侦听器:
1. addTask
paq.on('addTask', (options) => {
// Triggered when the queue adds a task.
});
复制代码
2. startTask
paq.on('startTask', (options) => {
// Triggered when a task in the queue is about to execute.
});
复制代码
3. changeTask
paq.on('changeTask', (options) => {
// Triggered when a task in the queue changes.
});
复制代码
4. removeTask
paq.on('removeTask', (options) => {
// Triggered when the queue remove a task.
});
复制代码
5. completed
paq.on('completed', (options, result) => {
// Triggered when the task execution in the queue is complete.
});
复制代码
6. failed
paq.on('failed', (options, err) => {
// Triggered when a task in the queue fails to execute.
});
复制代码
最后,想补充的是:若遇到 Promise.all
和 Promise.race
解决不了的需求,能够考虑一下 paq。