欢迎关注个人知乎专栏: https://zhuanlan.zhihu.com/starkwangnode
starkwang/Maus: A Simple JSON-RPC Framework running in NodeJS or Browser, based on websocket.webpack
这几天写了个小型的RPC框架,最初只是想用 TCP-JSON 写个纯 NodeJS 平台的东西,后来无心中开了个脑洞,若是基于 Websocket 把浏览器当作 RPC Server ,那岂不是只要是能运行浏览器(或者nodejs)的设备,均可以做为分布式计算中的一个 Worker 了吗?git
打开一张网页,就能成为分布式计算的一个节点,看起来仍是挺酷炫的。github
能够参考:谁能用通俗的语言解释一下什么是RPC框架? - 知乎web
简单地说就是你能够这样注册一个任意数量的worker
(姑且叫这个名字好了),它里面声明了具体的方法实现:算法
var rpcWorker = require('maus').worker; rpcWorker.create({ add: (x, y) => x + y }, 'http://192.168.1.100:8124');
而后你能够在另外一个node进程里这样调用:编程
var rpcManager = require('maus').manager; rpcManager.create(workers => { workers.add(1, 2, result => console.log(result)); }, 8124)
这里咱们封装了底层的通讯细节(能够是tcp、http、websocket等等)和任务分配,只须要用异步的方式去调用worker
提供的方法便可,经过这个咱们能够垂手可得地作到分布式计算的map
和reduce
:数组
rpcManager.create(workers => { //首先定义一个promise化的add var add = function(x, y){ return new Promise((resolve, reject)=>{ workers.add(x, y, result => resolve(result)); }) } //map&reduce Promise.all([add(1,2), add(3,4), add(4,5)]) .then(result => result.reduce((x, y) => x + y)) .then(sum => console.log(sum)) //19 }, 8124)
若是咱们有三个已经注册的Worker
(多是本地的另外一个nodejs进程、某个设备上的浏览器、另外一个机器上的nodejs),那么咱们这里会分别在这三个机器上分别计算三个add
,而且将三个结果在本地相加,获得最后的值,这就是分布式计算的基础。promise
要实现双向的通讯,咱们首先要定义这样一个“远程调用”的通讯标准,在个人实现中比较简单:浏览器
{ [id]: uuid //在某些通讯中须要惟一标识码 message: '......' //消息类别 body: ...... //携带的数据 }
首先咱们要解决的问题是,如何让Manager
知道Worker
提供了哪些方法可供调用?
这个问题其实很简单,只要在 websocket 创建的时刻发送一个init
消息就能够了,init
消息大概长这样:
{ message: 'init', body: ['add', 'multiply'] //body是方法名组成的数组 }
同时,咱们要将Manager
传入的回调函数,记录到Manager.__workersStaticCallback
中,以便延迟调用:
manager.create(callback, port) //记录下这个callback //一段时间后。。。。。。 manager.start() //任务开始
如今咱们的Manager
收到了一个远程可调用的方法名组成的数组,咱们接下来须要在Manager
中生成一个workers
实例,它应该包含全部这些方法名,但底层依然是调用一个webpack通讯。这里咱们能够用相似元编程的奇技淫巧,下面的是部分代码:
//收到worker发来的init消息以后 var workers = { __send: this.__send.bind(this), //这个this指向Manager,而不是本身 __functionCall: this.__functionCall.bind(this) //同上 }; var funcNames = data.body; //好比['add', 'multiply'] funcNames.forEach(funcName => { //使用new Function的奇技淫巧 rpc[funcName] = new Function(` //截取参数 var params = Array.prototype.slice.call(arguments,0,arguments.length-1); var callback = arguments[arguments.length-1]; //这个__functionCall调用了Manager底层的通讯,具体在后面解释 this.__functionCall('${funcName}',params,callback); `) }) //将workers注册到Manager内部 this.__workers = workers; //若是此时Manager已经在等待开始了,那么开始任务 if (this.__waitingForInit) { this.start(); }
还记得上面咱们有个start
方法么?它是这样写的:
start: function() { if (this.__workers != undefined) { //若是初始化完毕,workers实例存在 this.__workersStaticCallback(this.__workers); this.__waitingForInit = false; } else { //不然将等待初始化完毕 this.__waitingForInit = true; } },
若是只是单个Worker
和单个Manager
,而且远程方法都是同步而非异步的,那么咱们显然不须要考虑返回值顺序的问题:
好比咱们的Manager
调用了下面一堆方法:
workers.add(1, 1, callback); workers.add(2, 2, callback); workers.add(3, 3, callback);
因为Worker
中add
的是同步的方法,那么显然咱们收到返回值的顺序是:
2 4 6
但若是Worker
中存在一个异步调用,那么这个顺序就会被打乱:
workers.readFile('xxx', callback); workers.add(1, 1, callback); workers.add(2, 2, callback);
显然咱们收到的返回值顺序是:
2 4 content of xxx
因此这里就须要对发出的函数调用作一个序列化,具体的方法就是对于每个调用都给一个uuid(惟一标识码)。
好比咱们调用了:
workers.add(1, 1, stupid_callback);
那么首先Manager
会对这个调用生成一个 uuid :
9557881b-25d7-4c94-84c8-2463c53b67f4
而后在__callbackStore
中将这个 uuid 和stupid_callback
绑定,而后向选中的某个Worker
发送函数调用信息(具体怎么选Worker
咱们后面再说):
{ id: '9557881b-25d7-4c94-84c8-2463c53b67f4', message: 'function call', body: { funcName: 'add', params: [1, 1] } }
Worker
执行这个函数以后,发送回来一个函数返回值的信息体,大概是这样:
{ id: '9557881b-25d7-4c94-84c8-2463c53b67f4', message: 'function call', body: { result: 2 } }
而后咱们就能够在__callbackStore
中找到这个 uuid 对应的 callback ,而且执行它:
this.__callbackStore[id](result);
这就是workers.add(1, 1, stupid_callback)
这行代码背后的原理。
若是存在多个Worker
,显然咱们不能把全部的调用都傻傻地发送到第一个Worker
身上,因此这里就须要有一个任务分配机制,个人机制比较简单,大概说就是在一张表里对每一个Worker
记录下它是否繁忙的状态,每次当有调用需求的时候,先遍历这张表,
若是找到有空闲的Worker
,那么就将对它发送调用;
若是全部Worker
都繁忙,那么先把这个调用暂存在一个队列之中;
当收到某个Worker
的返回值后,会检查队列中是否有任务,有的话,那么就对这个Worker
发送最前的函数调用,若没有,就把这个Worker
设为空闲状态。
具体任务分配的代码比较冗余,分散在各个方法内,因此只介绍方法,就不贴上来了/w\
所有的Manager代码在这里(抱歉还没时间补注释):
Maus/manager.js at master · starkwang/Maus
这里要再说一遍,咱们的RPC框架是基于websocket的,因此Worker
能够是一个PC浏览器!!!能够是一个手机浏览器!!!能够是一个平板浏览器!!!
Worker
的实现远比Manager
简单,由于它只须要对惟一一个Manager
通讯,它的逻辑只有:
接收Manager
发来的数据;
根据数据作出相应的反应(函数调用、初始化等等);
发送返回值
因此咱们也不放代码了,有兴趣的能够看这里:
Maus/worker.js at master · starkwang/Maus
假设咱们的加法是经过这个框架异步调用的,那么咱们该怎么写算法呢?
在单机状况下,写个斐波拉契数列简直跟喝水同样简单(事实上这种暴力递归的写法很是很是傻逼且性能低下,只是做为范例演示用):
var fib = x => x>1 ? fib(x-1)+fib(x-2) : x
可是在分布式环境下,咱们要将workers.add
方法封装成一个Promise化的add
:
//这里的x, y多是数字,也多是个Promise,因此要先调用Promise.all var add = function(x, y){ return Promise.all([x, y]) .then(arr => new Promise((resolve, reject) => { workers.add(arr[0], arr[1], result => resolve(result)); })) }
而后咱们就能够用相似同步的递归方法这样写一个分布式的fib
算法:
var fib = x => x>1 ? add(fib(x-1), fib(x-2)) : x;
而后你能够尝试用你的电脑里、树莓派里、服务器里的nodejs、手机平板上的浏览器做为一个Worker
,总之集合全部的计算能力,一块儿来计算这个傻傻的算法(事实上相比于单机算法会慢不少不少,由于通讯上的延迟远大于单机的加法计算,但只是为了演示啦):
//分布式计算fib(40) fib(40).then(result => console.log(result));