一个简单的HTML5 Web Worker 多线程与线程池应用

笔者最近对项目进行优化,顺带就改了些东西,先把请求方式优化了,使用到了web worker。发现目前尚未太多对web worker实际使用进行介绍使用的文章,大可能是一些API类的讲解,除了涉及到一些WebGL的文章,因此总结了这个文章,给你们参考参考。如下内容以默认你们对web worker已经有了初步了解,不会讲解基础知识。前端

 

1、为何要开子线程node

笔者这个项目是一个存储系统的中后台管理GUI,一些数据须要经过CronJob定时地去获取,而且业务对数据的即时性要求高,大量的和持久的HTTP请求是不可避免的,而且该项目部署了HTTP/2,带宽和并发数能够极大。并且不须要兼容IE系列,哈哈哈,针对这些点因而决定优(瞎)化(弄)。笔者一开始想到的就是使用HTML5的新特性web worker,而后将HTTP的请求工做从主线程放到子线程里面去作,工做完成后,返回子线程数据便可。这样能够下降主线程中的负荷,使主线程能够不劳而获。一旦子线程中发起的请求成功或错误后,子线程返回给主线程请求的response对象或者直接返回请求获得的数据或错误信息。最终的方案里,选择的是直接返回请求获得的数据,而不是response对象,这个在后面会详细说明为何这样作。子线程对于处于复杂运算,特别是搭配wasm,对于处理WebGL帧等有极大的性能优点。以往的纯JS视频解码,笔者只看到过可以解码MPEG1(大概240P画面)的canvas库,由于要达到60帧的画面流畅度,就必须保证1帧的计算时间要小于16ms,若是要解码1080P的画面甚至4K,JS可能跑不过来了,并且长时间的计算会严重阻塞主线程,影响页面性能,若是能开启子线程把计算任务交给子线程作,并经过wasm加快计算速度,这将在前端领域创造极大的可能性。webpack

 

2、为何要设计线程池ios

若是只开一个线程,工做都在这一个子线程里作,不能保证它不阻塞。若是无止尽的开启而不进行控制,可能致使运行管理平台应用时,浏览器的内存消耗极高:一个web worker子线程的开销大概在5MB左右。git

不管这5MB内存是否已被这个子线程彻底使用,仍是说仅仅是给这个子线程预规划的内存空间,但这个空间确实是被占用了。而且频繁地建立和终止线程,对性能的消耗也是极大的。因此咱们须要经过线程池来根据浏览器所在计算机的硬件资源对子线程的工做进行规划和调度,以及对僵尸线程的清理、新线程的开辟等等。根据测试,在页面关闭之后,主线程结束,子线程的内存占用会被一并释放,这点不须要作额外的处理。web

 

3、设计线程池json

对于线程池,咱们须要实现的功能有以下这些,代码中的英文注释为笔者项目上的需求,由于以前有外国同事在一块儿开发项目,为了他们阅读代码方便,因此统一使用英文注释,能够直接忽略,笔者会对重要地方直接给出说明。canvas

1. 初始化线程api

经过 Navagitor 对象的 HardWareConcurrecy 属性能够获取浏览器所属计算机的CPU核心数量,若是CPU有超线程技术,这个值就是实际核心数量的两倍。固然这个属性存在兼容性问题,若是取不到,则默认为4个。咱们默认有多少个CPU线程数就开多少个子线程。线程池最大线程数量就这么肯定了,简单而粗暴:promise

class FetchThreadPool {
    constructor (option = {}){
        const {
            inspectIntervalTime = 10 * 1000,
            maximumWorkTime = 30 * 1000
        } = option;
        this.maximumThreadsNumber = window.navigator.hardwareConcurrency || 4;
        this.threads = [];
        this.inspectIntervalTime = inspectIntervalTime;
        this.maximumWorkTime = maximumWorkTime;
        this.init();
    }
   ......
}

 

获取到最大线程数量后,咱们就能够根据这个数量来初始化全部的子线程了,并给它们额外加上一个咱们须要的属性:

  init (){
        for (let i = 0; i < this.maximumThreadsNumber; i ++){
            this.createThread(i);
        }
        setInterval(() => this.inspectThreads(), this.inspectIntervalTime);
    }

    createThread (i){
        // Initialize a webWorker and get its reference.
        const thread = work(require.resolve('./fetch.worker.js'));
        // Bind message event.
        thread.addEventListener('message', event => {
            this.messageHandler(event, thread);
        });
        // Stick the id tag into thread.
        thread['id'] = i;
        // To flag the thread working status, busy or idle.
        thread['busy'] = false;
        // Record all fetch tasks of this thread, currently it is aimed to record reqPromise.
        thread['taskMap'] = {};
        // The id tag mentioned above is the same with the index of this thread in threads array.
        this.threads[i] = thread;
    }

其中:

id为数字类型,表示这个线程的惟一标识,

busy为布尔类型,表示这个线程当前是否处于工做繁忙状态,

taskMap为对象类型,存有这个线程当前的全部工做任务的key/value对,key为任务的ID taskId,value为这个任务的promise的resolve和reject回调对象。

由上图还能够看出,在初始化每一个子线程时咱们还给这个子线程在主线程里绑定了接收它消息的事件回调。在这个回调里面咱们能够针对子线程返回的消息,在主线程里作对应的处理:

  messageHandler (event, thread){
        let {channel, threadCode, threadData, threadMsg} = event.data;
        // Thread message ok.
        if (threadCode === 0){
            switch (channel){
                case 'fetch':
                    let {taskId, code, data, msg} = threadData;
                    let reqPromise = thread.taskMap[taskId];
                    if (reqPromise){
                        // Handle the upper fetch promise call;
                        if (code === 0){
                            reqPromise.resolve(data);
                        } else {
                            reqPromise.reject({code, msg});
                        }
                        // Remove this fetch task from taskMap of this thread.
                        thread.taskMap[taskId] = null;
                    }
                    // Set the thread status to idle.
                    thread.busy = false;
                    this.redirectRouter();
                    break;

                case 'inspection':
                    // console.info(`Inspection info from thread, details: ${JSON.stringify(threadData)}`);
                    // Give some tips about abnormal worker thread.
                    let {isWorking, workTimeElapse} = threadData;
                    if (isWorking && (workTimeElapse > this.maximumWorkTime)){
                        console.warn(`Fetch worker thread ID: ${thread.id} is hanging up, details: ${JSON.stringify(threadData)}, it will be terminated.`);
                        fetchThreadPool.terminateZombieThread(thread);
                    }
                    break;

                default:
                    break;
            }
        } else {
            // Thread message come with error.
            if (threadData){
                let {taskId} = threadData;
                // Set the thread status to idle.
                thread.busy = false;
                let reqPromise = thread.taskMap[taskId];
                if (reqPromise){
                    reqPromise.reject({code: threadCode, msg: threadMsg});
                }
            }
        }
    }

这里处理的逻辑其实挺简单的:

1). 首先规定了子线程和主线程之间通讯的数据格式:

{
     threadCode: 0,
     threadData: {taskId, data, code, msg}, 
     threadMsg:  'xxxxx',
     channel: 'fetch',
}

其中:

threadCode: 表示这个消息是否正确,也就是子线程在post此次message的时候,是不是由于报错而发过来,由于咱们在子线程中会有这个设计机制,用来区分任务完成后的正常的消息和执行过程当中因报错而发送的消息。若是为正常消息,咱们约定为0,错误消息为1,暂定只有1。

threadData: 表示消息真正的数据载体对象,若是threadCode为1,只返回taskId,以帮助主线程销毁找到调用上层promise的reject回调函数。Fecth取到的数据放在data内部。

threadMsg: 表示消息错误的报错信息。非必须的。

channel: 表示数据频道,由于咱们可能经过子线程作其余工做,在咱们这个设计里至少有2个工做,一个是发起fetch请求,另一个是响应主线程的检查(inspection)请求。因此须要一个额外的频道字段来确认不一样工做。

这个数据格式在第4步的子线程的设计中,也会有对应的体现。

2). 若是是子线程回复的检查消息,那么根据子线程返回的状态决定这个子线程是否已经挂起了,若是是就把它当作一个僵尸线程杀掉。并从新建立一个子线程,替换它原来的位置。

3). 在任务结束后,这个子线程的busy被设置成了false,表示它从新处于闲置状态。

4). 在给子线程派发任务的时候,咱们post了taskId,在子线程的回复信息中,咱们能够拿到这个taskId,并经过它找到对应的promise的resolve或者reject回调函数,就能够响应上层业务中Fetch调用,返回从服务端获取的数据了。

 

二、执行主线程中Fetch调用的工做

首先,咱们在主线程中封装了统一调用Fetch的收口,页面全部请求均走这个惟一入口,对外暴露Get和Post方法,里面的业务有关的部分代码能够忽略:

const initRequest = (url, options) => {
    if (checkRequestUnInterception(url)){
        return new Promise(async (resolve, reject) => {
            options.credentials = 'same-origin';
            options.withCredentials = true;
            options.headers = {'Content-Type': 'application/json; charset=utf-8'};
            fetchThreadPool.dispatchThread({url, options}, {resolve, reject});
        });
    }
};

const initSearchUrl = (url, param) => (param ? url + '?' + stringify(param) : url);

export const fetchGet = (url, param) => (initRequest(initSearchUrl(url, param), {method: 'GET'}));

export const fetchPost = (url, param) => (initRequest(url, {method: 'POST', body: JSON.stringify(param)}));

在线程池中,咱们实现了对应的方法来执行Fetch请求:

    dispatchThread ({url, options}, reqPromise){
        // Firstly get the idle thread in pools.
        let thread = this.threads.filter(thread => !thread.busy)[0];
        // If there is no idle thread, fetch in main thread.
        if (!thread){
            thread = fetchInMainThread({url, options});
        }
        // Stick the reqPromise into taskMap of thread.
        let taskId = Date.now();
        thread.taskMap[taskId] = reqPromise;
        // Dispatch fetch work to thread.
        thread.postMessage({
            channel: 'fetch',
            data: {url, options, taskId}
        });
        thread.busy = true;
    }

这里调度的逻辑是:

1). 首先遍历当前全部的子线程,过滤出闲置中的子线程,取第一个来下发任务。

2). 若是没有闲置的子线程,就直接在主线程发起请求。后面能够优化的地方:能够在当前子线程中随机找一个,来下发任务。这也是为何每一个子线程不直接使用task属性,而给它一个taskMap,就是由于一个子线程可能同时拥有两个及以上的任务。

 

三、定时轮训检查线程与终结僵尸线程

   inspectThreads (){
        if (this.threads.length > 0){
            this.threads.forEach(thread => {
                // console.info(`Inspection thread ${thread.id} starts.`);
                thread.postMessage({
                    channel: 'inspection',
                    data: {id: thread.id}
                });
            });
        }
    }

    terminateZombieThread (thread){
        let id = thread.id;
        this.threads.splice(id, 1, null);
        thread.terminate();
        thread = null;
        this.createThread(id);
    }

从第1步的代码中咱们能够得知初始化定时检查 inspectThreads 是在整个线程池init的时候执行的。对于检查僵尸线程和执行 terminateZombieThread 也是在第1步中的处理子线程信息的回调函数中进行的。

 

4. 子线程的设计

子线程的设计,相对于线程池来讲就比较简单了:

export default self => {
    let isWorking = false;
    let startWorkingTime = 0;
    let tasks = [];
    self.addEventListener('message', async event => {
        const {channel, data} = event.data;
        switch (channel){
            case 'fetch':
                isWorking = true;
                startWorkingTime = Date.now();
                let {url, options, taskId} = data;
                tasks.push({url, options, taskId});
                try {
                    // Consider to web worker thread post data to main thread uses data cloning
                    // not change the reference. So, here we don't post the response object directly,
                    // because it is un-cloneable. If we persist to post id, we should use Transferable
                    // Objects, such as ArrayBuffer, ImageBitMap, etc. And this way is just like to
                    // change the reference(the control power) of the object in memory.
                    let response = await fetch(self.origin + url, options);
                    if (response.ok){
                        let {code, data, msg} = await response.json();
                        self.postMessage({
                            threadCode: 0,
                            channel: 'fetch',
                            threadData: {taskId, code, data, msg},
                        });
                    } else {
                        const {status, statusText} = response;
                        self.postMessage({
                            threadCode: 0,
                            channel: 'fetch',
                            threadData: {taskId, code: status, msg: statusText || `http error, code: ${status}`},
                        });
                        console.info(`%c HTTP error, code: ${status}`, 'color: #CC0033');
                    }
                } catch (e){
                   self.postMessage({
                       threadCode: 1,
                       threadData: {taskId},
                       threadMsg: `Fetch Web Worker Error: ${e}`
                   });
                }
                isWorking = false;
                startWorkingTime = 0;
                tasks = tasks.filter(task => task.taskId !== taskId);
                break;

            case 'inspection':
                // console.info(`Receive inspection thread ${data.id}.`);
                self.postMessage({
                    threadCode: 0,
                    channel: 'inspection',
                    threadData: {
                        isWorking,
                        startWorkingTime,
                        workTimeElapse: isWorking ? (Date.now() - startWorkingTime) : 0,
                        tasks
                    },
                });
                break;

            default:
                self.postMessage({
                    threadCode: 1,
                    threadMsg: `Fetch Web Worker Error: unknown message channel: ${channel}}.`
                });
                break;
        }
    });
};

首先,在每一个子线程声明了 taksk 用来保存收到的任务,是为后期一个子线程同时作多个任务作准备的,当前并不须要,子线程一旦收到请求任务,在请求完后以前, isWorking 状态一直都为 true 。全部子线程有任务之后,会直接在主线程发起请求,不会随机派发给某个子线程。

而后,咱们在正常的Fecth成功后的数据通讯中,post的是对response处理之后的结构化数据,而不是直接post这个response对象,这个在第一章节中有提到,这里详细说一下:

Fetch请求的response对象并不是单纯的Object对象。在子线程和主线程之间使用postMessage等方法进行数据传递,数据传递的方式是克隆一个新的对象来传递,而非直接传递引用,但response对象做为一个非普通的特殊对象是不能够被克隆的......。要传递response对象只有就须要用到HTML5里的一些新特性好比  Transferable object 的 ArrayBuffer  、 ImageBitmap  等等,经过它们能够直接传递对象的引用,这样作的话就不须要克隆对象了,进而避免因对response对象进行克隆而报错,以及克隆含有大量数据的对象带来的高额开销。这里咱们选择传递一个普通的结构化Object对象来现实基本的功能。

对于子线程中每次给主线程post的message,也是严格按照第1步中说明的那样定义的。

还有一点须要说明:笔者的项目都是基于webpack的模块化开发,要直接使用一个web worker的js文件,笔者选了"webworkify-webpack"这个库来处理模块化的,这个库还执行在子线程中随意import其余模块,使用比较方便:

import work from 'webworkify-webpack';

因此,在第1步中才出现了这样的建立子线程的方式: const thread = work(require.resolve('./fetch.worker.js')); 

该库把web worker的js文件经过  createObjectURL 方法把js文件内容转成了二进制格式,这里请求的是一个二进制数据的连接(引用),将会到内存中去找到这个数据,因此这里并非一个js文件的连接:

若是你的项目形态和笔者不一样,大可没必要如此,按照常规的web worker教程中的指导方式走就行。

笔者这个项目在主线程和子线程之间只传递了不多量的数据,速度很是快,一旦你的项目须要去传递大量数据,好比说一个异常复杂的大对象,若是直接传递结构化对象,速度会很慢,能够先字符串化了之后再发送,避免了在post的过程当中时间消耗过大。

笔者捕捉到的一个postMessage的消耗,若是数据量小的话,还算正常:

 

5. 经过子线程发起请求

// ...
@catchError
async getNodeList (){
    let data = await fetchGet('/api/getnodelist');
    !!data && store.dispatch(nodeAction.setNodeList(data));
},
// ...

数据回来了:

 

 

从截图中能够看出,和直接在主线程中发起的Fetch请求不一样的是,在子线程中发起的请求,在Name列里会增长一个齿轮在开头以区分。

须要注意的一点是:若是子线程被终结,没法查看返回信息等,由于这些数据的占用内存已经随子线程的终结而被回收了。

咱们在子线程中写一个明显的错误,也会回调reject,并在控制台报错:

从开发者工具里能够检测到这8个子线程:

 

大概的设计就是如此,目前这个线程池只针对Fetch的任务,后续还须要在业务中进行优化和加强,已适配更多的任务。针对其余的任务,在这里架子其实已基本实现,须要增长对不一样channel的处理。

 

4、Web Worker的兼容性

从caniuse给出的数据来看,兼容性异常的好,甚至连IE系列都在好几年前就已经支持:

可是...,这个兼容性只能说明可否使用Web Woker,这里的兼容并不能代表能在其中作其余操做。好比标准规定,能够在子线程作作计算、发起XHR请求等,但不能操做DOM对象。笔者在项目中使用的Fetch,而非Ajax,而后Fecth在IE系列(包括Edge)浏览器中并不支持,会直接报错。在近新版本的Chrome、FireFox、Opera中均无任何问题。后来做者换成了Axios这种基于原生的XHR封装的库,在IE系列中仍是要报错。后来又换成了纯原生的XmlHttpRequest,依旧报错。这就和标准有出入了......。同窗们能够试试,不知到笔者的方法是否百分百正确。但欣慰的是前几天的新闻说微软将来在Edge浏览器开发中将使用Chromium内核。

至于Web Woker衍生出来的其余新特性,好比 Shared Web Woker等,或者在子线程中再开子线程,这些特性的使用在各个浏览器中并不统一,有些支持,有些不支持,或者部分支持,因此对于这些特性暂时就不要去考虑它们了。

 

5、展望

在前端开发这块(没用Web前端了,是笔者认为如今的前端开发已经不只限于Web平台了,也不只限于前端了),迄今为止活跃度是很是之高了。新技术、新标准、新协议、新框(轮)架(子)的出现是很是快速的。技术跌该更新频率极高,好比这个Web Worker,四年前就定稿了,笔者如今针对它写博客......。一个新技术的出现可能不能形成什么影响,可是多种新技术的出现和搭配使用将带来翻天覆地的变化。前端的发展愈来愈多地融入了曾经只在Client、Native端出现的技术。特别是近年来的WebGL、wasm等新技术的推出,都是具备意义的。

相关文章
相关标签/搜索