代码地址以下:
http://www.demodashi.com/demo/12932.htmljavascript
koa是由Express原班人马打造的,致力于成为一个更小、更富有表现力、更健壮的Web框架,Koa不定制路由,无冗余的中间件,开发设计方案趋向定制化,因此很适合对业务和技术有灵活要求的web场景。css
因为restful、加解密、跨域、参数解析、中间件等比较基础,且文档丰富,本小节将直接跳过,侧重于分享如下几点:html
一、路由转发时,如何利用钩子函数机制作到controller层业务解耦 二、在socket通讯中如何动态加载protobuf进行数据格式交换 三、如何基于websocket绑定相同的端口 四、如何利用c++编写node扩展库
中间件及钩子函数机制皆为业务解耦的有效实现方式,其中中间件模式因其实现方便而应用普遍, 如koa、express、sails中都曾大量用到,
而钩子函数机制在node生态中被大量用到ORM对数据库的操做,如mongoose、waterline,鲜有在controller层的普遍应用,本小节则尝试分享
一个简易的Hooks实现方式,并应用在koa框架中。java
编写koa-hooks, 并提交到npmnode
const hooks = require('hooks') class ApiHooks { constructor(ctx, next, cb) { this._ctx = ctx this._next = next this._cb = cb this._listenerTree = {} this.addListenerTree() } addListenerTree() { for (let fn in hooks) { this[fn] = hooks[fn] } } addHooks(listeners) { const self = this try { listeners.map(listener => { const [method, hooksFn] = listener.split('.') if(hooksFn.match('before')) self.addFn(method, hooksFn, 'pre') if(hooksFn.match('after')) self.addFn(method, hooksFn, 'post') }) } catch (err) { console.log('err:', err) } } addFn(method, hooksFn, hook) { const self = this self[hook](method, async (next) => { await self[hooksFn](self._ctx, next, self._cb) }) } } module.exports = ApiHooks
编写一个restful风格接口/v1/verb/get
,继承ApiHooks, 添加对应的钩子函数beforeVerbCheckLogin实现登陆检查c++
/** * Created by Joseph on 18/09/2017. */ const Api = require('koa-hooks').Api const VerbService = require('../../services/verb.js') class VerbApi extends Api { constructor(ctx, next, cb) { super(ctx, next, cb) this.addHooks([ 'verbGetOnThisRequest.beforeVerbCheckLogin', 'verbPostOnThisRequest.beforeVerbCheckLogin', 'verbPutOnThisRequest.beforeVerbCheckLogin', 'verbDeleteOnThisRequest.beforeVerbCheckLogin', ]) } async beforeVerbCheckLogin(ctx, next, cb) { const data = await VerbService.beforeVerbCheckLogin(ctx, next) data ? cb(ctx, data) : await next() } async verbGetOnThisRequest(ctx, next, cb) { const data = await VerbService.verbGetOnThisTest(ctx, next) data ? cb(ctx, data) : await next() } async verbPostOnThisRequest(ctx, next, cb) { const data = await VerbService.verbPostOnThisTest(ctx, next) data ? cb(ctx, data) : await next() } async verbPutOnThisRequest(ctx, next, cb) { const data = await VerbService.verbPutOnThisTest(ctx, next) data ? cb(ctx, data) : await next() } async verbDeleteOnThisRequest(ctx, next, cb) { const data = await VerbService.verbDeleteOnThisTest(ctx, next) data ? cb(ctx, data) : await next() } } module.exports = (ctx, next, cb) => new VerbApi(ctx, next, cb)
启动服务,请求接口http://127.0.0.1:3000/v1/verb/get
,能够发现此钩子函数已经生效web
注释掉//'verbGetOnThisRequest.beforeVerbCheckLogin', 再次请求接口,能够发如今需求变更状况对源码修改极少,代码可维护性提高数据库
protobuf是谷歌开源的是一种轻便高效的结构化数据存储格式, 且平台无关、语言无关、可扩展,一般用在tcp编程对数据传输要求较高的场
景,protobuf兼有json的可读性,且传输效率远大于json、xml等,很是适合流式数据交换。express
A) 根据文件名及message动态加载protobufnpm
const protobuf = require('protobufjs') const protoPath = '/Users/dreamboad/Projects/koa-service/message/' class Proto { async loadByName(protoName, messageName, obj, type) { return new Promise((resolve, reject) => { protobuf.load(`${protoPath}${protoName}.proto`, (err, root) => { if (err) { return console.log(err) || resolve() } const data = root.lookupType(`${protoName}.${messageName}`) if (type === 'encode' && data.verify(obj)) { return console.log('encode err') || resolve() } switch (type) { case 'decode': return resolve(data.toObject(data.decode(obj), { objects: true })) case 'encode': return resolve(data.encode(data.create(obj) || '').finish()) } }) }) } async deserialize(protoName, messageName, obj) { return await this.loadByName(protoName, messageName, obj, 'decode') } async serialize(protoName, messageName, obj) { return await this.loadByName(protoName, messageName, obj, 'encode') } } module.exports = new Proto()
B) 编写soket client
/** * 一、动态加载protobuf * 二、socket数据流断包、粘包处理(TODO) * 三、心跳机制、及断线重连 */ const net = require('net') const [HOST, PORT] = ['127.0.0.1', 9999] const client = new net.Socket() const connection = () => { client.connect(PORT, HOST, () => { console.log('CONNECTED TO: ' + HOST + ':' + PORT)}) } client.on('data', (data) => { console.log(`${HOST}:${PORT} CONNECT DATA: `, data) }) client.on('error', (e) => { console.log(`${HOST}:${PORT} CONNECT ERROR: ` + e) }) client.on('timeout', (e) => { console.log(`${HOST}:${PORT} CONNECT TIMEOUT: ` + e) }) client.on('end', (e) => { console.log(`${HOST}:${PORT} CONNECT END: ` + e) }) client.on('close', (e) => { console.log(`${HOST}:${PORT} CONNECT CLOSE: ` + e) if (client.destroyed) { client.destroy() } setTimeout(connection, 3000) }) process.on('exit', () => { client.destroy() client.on('close', () => { console.log('Connection closed') }) }) // 链接 客户端 module.exports = { connection, client }
C) 在soket通讯中序列化/反序列化json数据
/** * 序列化、反序列化 */ const crypto = require('crypto') const Proto = require('./protobuf') class SocketProto { async doTranslation(obj, protoName, messageName, operation) { try { switch (operation) { case 'decode': return await Proto.deserialize(obj, protoName, messageName) case 'encode': return await Proto.serialize(obj, protoName, messageName) } } catch (error) { console.log(error) } } async decode(obj, protoName, messageName) { return await this.doTranslation(obj, protoName, messageName, 'decode') } async encode(obj, protoName, messageName) { return await this.doTranslation(obj, protoName, messageName, 'encode') } } module.exports = new SocketProto()
D) 链接服务器,读写流式数据,并用proto解析
const { connection, client } = require('./socket_client') const SocketProto = require('./socket_protobuf') const config = require('../config/').msgIdConfig connection() const writer = module.exports.writer = async (protoName, messageName, obj) => { const w = await SocketProto.encode(protoName, messageName, obj) return client.write(w) } const reader = module.exports.reader = async (protoName, messageName, obj) => { const r = await SocketProto.decode(protoName, messageName, obj) return r } client.on('data', (buf) => { chooseFnByMsg('', 'basemsg', buf) }) const chooseFnByMsg = (msgId, type, obj) => { if (msgId) { if (!config[msgId] || !config[msgId].req || !config[msgId].res) { return console.log('noting to do: ', msgId) } } switch (type) { case 'basemsg': return reader(config.head.res.pName, config.head.res.mName, obj) case 'write': return writer(config[msgId].req.pName, config[msgId].req.mName, obj) case 'read': return reader(config[msgId].res.pName, config[msgId].res.mName, obj) default: console.log('noting to do default: ', msgId) break } } chooseFnByMsg(1, 'write', { Field: "String" }) module.exports = chooseFnByMsg
E) server及client分别在终端打印结果
A) koa server
const app = new Koa() // web socket const server = require('http').Server(app.callback()) const io = require('socket.io')(server) io.on('connection', client => { console.log('new connection:') client.on('news', (data, cb) => { console.log('news:', data) }) client.on('disconnect', () => { console.log('disconnect:') }) })
B) websocket client
const client = require('socket.io-client').connect('http://localhost:3000') client.emit('news', "hello world")
IO异步及高并发是Node的优点,但若在须要密集计算、集成基于C++的第三方SDK等场景时,Node的劣势则显现出来,此时能够基于node-gyp来嵌入集成C++解决以上等问题。
A) 安装node-gyp
cnpm install -g node-gyp
A) 编辑binding.gyp、C++、Node调用模块
{ "targets": [ { "target_name": "demo", "sources": ["src/demo.cc"] }, { "target_name": "test_params_nocb", "sources": ["src/test_params_nocb.cc"] }, { "target_name": "test_function_nocb", "sources": ["src/test_function_nocb.cc"] }, { "target_name": "test_params_function_nocb", "sources": ["src/test_params_function_nocb.cc"] } ] }
// test_function_nocb.cc #include <node.h> namespace demo { using v8::Function; using v8::FunctionCallbackInfo; using v8::Isolate; using v8::Local; using v8::Null; using v8::Object; using v8::String; using v8::Value; void RunCallback(const FunctionCallbackInfo<Value>& args) { Isolate* isolate = args.GetIsolate(); Local<Function> cb = Local<Function>::Cast(args[0]); Local<Value> argv[1] = { String::NewFromUtf8(isolate, "hello world") }; cb->Call(Null(isolate), 1, argv); } void Init(Local<Object> exports, Local<Object> module) { NODE_SET_METHOD(module, "exports", RunCallback); } NODE_MODULE(test_function_nocb, Init) } // namespace demo
module.exports.embeddedProxy = (cb, params) => { return new Promise((resolve, reject) => { try { return cb((data) => { resolve(data) }, params) } catch (err) { return resolve({ data: "调用失败", code: -1 }) } }) }
C) 编译C++
node-gyp configure node-gyp build
D) 定义路由并调用接口
注:本文著做权归做者,由demo大师代发,拒绝转载,转载须要做者受权