贴一张挂在官网的图片:https://grpc.io/docs/what-is-... javascript
能够理解 gRPC 是 RPC(远程过程调用)框架的一种实现,关于 RPC 的介绍由于并非本次的主题,因此放个连接来帮助你们理解:https://www.zhihu.com/questio... java
我所理解 RPC 整个执行的过程就是 Client
调用方法 -> 序列化请求参数 -> 传输数据 -> 反序列化请求参数 -> Server 处理请求 -> 序列化返回数据 -> 传输数据 -> Client
接收到方法返回值:node
其主要逻辑会集中在 数据的序列化/反序列化 以及 数据的传输上,而这两项 gRPC 分别选用了 Protocol Buffers 和 HTTP2 来做为默认选项。c++
gRPC 在 Node.js 的实现上一共有两个官方版本,一个是基于 c++ addon 的版本,另外一个是纯 JS 实现的版本。git
除了上边提到的两个 gRPC 的实现,在 Node.js 中还存在一些其余的模块用来辅助使用 gRPC。github
此次笔记主要是针对 grpc-node 方式的实现,在 c++ addon 模块的实现下,并非一个 gRPC 的完整实现,作的事情更多的是一个衔接的工做,经过 JS、c++ 两层封装将 c++ 版本的 gRPC 能力暴露出来供用户使用。 web
之因此选择它是由于以为逻辑会较 grpc-js 清晰一些,更适合理解 gRPC 总体的运行逻辑。 c#
在项目仓库中,两个目录下是咱们须要关注的:服务器
ext 中的代码主要用于调用 c++ 版本 gRPC 的接口,并经过 NAN 提供 c++ addon 模块。
src 中的代码则是调用了 ext 编译后的模块,并进行一层应用上的封装。
而做为使用 gRPC 的用户就是引用的 src 下的文件了。 并发
咱们先经过官方的 hello world 示例来讲明咱们是如何使用 gRPC 的,由于 gRPC 默认的数据序列化方式采用的 protobuf,因此首先咱们须要有一个 proto 文件,而后经过 gRPC 提供的文件来生成对应的代码,生成出来的文件包含了 proto 中所定义的 service、method、message 等各类结构的定义,并可以让咱们用比较熟悉的方式去使用。
示例中的 proto 文件:
package helloworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
grpc_tools 是用来生成 proto 对应代码的,这个命令行工具提供了多种语言的生成版本。
在 Node 中,会生成两个文件,通常命名规则为 xxx_pb.js
、xxx_grpc_pb.js
,xxx_pb.js
是 proto 中各类 service、method 以及 message 的结构描述及如何使用的接口定义,而 xxx_grpc_pb.js
主要则是针对 xxx_pb.js
的一个整合,按照 proto 文件中定义的结构生成对应的代码,在用户使用的时候,使用前者多半用于构造消息结构,使用后者则是方法的调用。
生成后的关键代码(XXX_grpc_pb.js):
const grpc = require('@grpc/grpc'); const helloworld_pb = require('./helloworld_pb.js'); function serialize_helloworld_HelloReply(arg) { if (!(arg instanceof helloworld_pb.HelloReply)) { throw new Error('Expected argument of type helloworld.HelloReply'); } return Buffer.from(arg.serializeBinary()); } function deserialize_helloworld_HelloReply(buffer_arg) { return helloworld_pb.HelloReply.deserializeBinary(new Uint8Array(buffer_arg)); } function serialize_helloworld_HelloRequest(arg) { if (!(arg instanceof helloworld_pb.HelloRequest)) { throw new Error('Expected argument of type helloworld.HelloRequest'); } return Buffer.from(arg.serializeBinary()); } function deserialize_helloworld_HelloRequest(buffer_arg) { return helloworld_pb.HelloRequest.deserializeBinary(new Uint8Array(buffer_arg)); } // The greeting service definition. const GreeterService = exports.GreeterService = { // Sends a greeting sayHello: { path: '/helloworld.Greeter/SayHello', requestStream: false, responseStream: false, requestType: helloworld_pb.HelloRequest, responseType: helloworld_pb.HelloReply, requestSerialize: serialize_helloworld_HelloRequest, requestDeserialize: deserialize_helloworld_HelloRequest, responseSerialize: serialize_helloworld_HelloReply, responseDeserialize: deserialize_helloworld_HelloReply, }, }; exports.GreeterClient = grpc.makeGenericClientConstructor(GreeterService);
最终导出的 sayHello
就是咱们在 proto 文件中定义的 SayHello
方法,因此咱们在做为 Client
的时候使用,就是很简单的调用 sayHello
就好了:
const messages = require('./helloworld_pb'); const services = require('./helloworld_grpc_pb'); const grpc = require('grpc'); const client = new services.GreeterClient( target, grpc.credentials.createInsecure() ); const request = new messages.HelloRequest(); request.setName('Niko'); client.sayHello(request, function(err, response) { console.log('Greeting:', response.getMessage()); });
其实真实写的代码也就上边的几行,实例化了一个 Client
,实例化一个 Message
并构建数据,而后经过 client
调用对应的 method
传入 message
,就完成了一个 gRPC 请求的发送。
在这个过程当中,咱们直接可见的用到了 grpc-node
的 credentials
以及 makeGenericClientConstructor
,咱们就拿这两个做为入口,首先从 makeGenericClientConstructor
来讲。
在翻看 index.js 文件中能够发现, makeGenericClientConstructor
实际上是 client.makeClientConstructor
的一个别名,因此咱们须要去查看 src/client.js 中对应函数的定义,就像函数名同样,它是用来生成一个 Client 的构造函数的,这个构造函数就是咱们在上边示例中的 GreeterClient
。
源码所在位置: https://github.com/grpc/grpc-...
当对照着 xxx_grpc_pb.js
与源码来看时,会发现调用函数只传入了一个参数,而函数定义却存在三个参数,这个实际上是历史缘由致使的,咱们能够直接忽略后边的两个参数。
精简后的源码:
exports.makeClientConstructor = function(methods) { function ServiceClient(address, credentials, options) { Client.call(this, address, credentials, options); } util.inherits(ServiceClient, Client); ServiceClient.prototype.$method_definitions = methods; ServiceClient.prototype.$method_names = {}; Object.keys(methods).forEach(name => { const attrs = methods[name]; if (name.indexOf('$') === 0) { throw new Error('Method names cannot start with $'); } var method_type = common.getMethodType(attrs); var method_func = function() { return requester_funcs[method_type].apply(this, [ attrs.path, attrs.requestSerialize, attrs.responseDeserialize ] .concat([].slice.call(arguments)) ); }; ServiceClient.prototype[name] = method_func; ServiceClient.prototype.$method_names[attrs.path] = name; // Associate all provided attributes with the method Object.assign(ServiceClient.prototype[name], attrs); if (attrs.originalName) { ServiceClient.prototype[attrs.originalName] = ServiceClient.prototype[name]; } }); ServiceClient.service = methods; return ServiceClient; };
methods
参数就是咱们上边文件中生成的对象,包括服务地址、是否使用 stream、以及 请求/返回值 的类型及对应的序列化/反序列化 方式。
大体的逻辑就是建立一个继承自 Client
的子类,而后遍历咱们整个 service
来看里边有多少个 method
,并根据 method
不一样的传输类型来区分使用不一样的函数进行数据的传输,最后以 method
为 key 放到 Client
子类的原型链上。
common.getMethodType
就是用来区分 method
到底是什么类型的请求的,目前 gRPC
一共分了四种类型,双向 Stream
、两个单向 Stream
,以及 Unary
模式:
exports.getMethodType = function(method_definition) { if (method_definition.requestStream) { if (method_definition.responseStream) { return constants.methodTypes.BIDI_STREAMING; } else { return constants.methodTypes.CLIENT_STREAMING; } } else { if (method_definition.responseStream) { return constants.methodTypes.SERVER_STREAMING; } else { return constants.methodTypes.UNARY; } } };
在最后几行有一处判断 originalName
是否存在的操做,这个是在 proto-loader 中存在的一个逻辑,将 methodName 转换成纯小写放了进去,单纯看注释的话,这并非一个长期的解决方案: https://github.com/grpc/grpc-...
P.S. proto-loader 是 JS 里边一种动态加载 proto 文件的方式,性能比经过 grpc_tools 预生成代码的方式要低一些。
全部的请求方式,都被放在了一个叫作 requester_funcs
的对象中,源码中的定义是这样的:
var requester_funcs = { [methodTypes.UNARY]: Client.prototype.makeUnaryRequest, [methodTypes.CLIENT_STREAMING]: Client.prototype.makeClientStreamRequest, [methodTypes.SERVER_STREAMING]: Client.prototype.makeServerStreamRequest, [methodTypes.BIDI_STREAMING]: Client.prototype.makeBidiStreamRequest };
从这里就能够看出,实际上是和咱们 getMethodType
所对应的四种处理方式。
最终,将继承自 Client
的子类返回,完成了整个函数的执行。
首先咱们须要看看继承的 Client
构造函数究竟作了什么事情。
抛开参数类型的检查,首先是针对拦截器的处理,咱们能够经过两种方式来实现拦截器,一个是提供拦截器的具体函数,这个在全部 method
触发时都会执行,还有一个能够经过传入 interceptor_provider
来实现动态的生成拦截器,函数会在初始化 Client
的时候触发,并要求返回一个新的 interceptor
对象用于执行拦截器的逻辑。
// interceptors 用法 const interceptor = function(options, nextCall) { console.log('trigger') return new InterceptingCall(nextCall(options)); } const client = new services.GreeterClient( target, grpc.credentials.createInsecure(), { interceptors: [interceptor] } ); // interceptor_providers 用法 const interceptor = function(options, nextCall) { console.log('trigger') return new InterceptingCall(nextCall(options)); } const interceptorProvider = (methodDefinition) => { console.log('call interceptorProvider', methodDefinition) return interceptor } const client = new services.GreeterClient( target, grpc.credentials.createInsecure(), { interceptor_providers: [interceptorProvider] } );
P.S. 须要注意的是,若是传入 interceptor_providers,则会在两个地方触发调用,一个是实例化 Client 的时候,还有一个是在 method 真实调用的时候,每次调用都会触发,因此若是要复用 interceptor,最好在函数以外构建出函数体
可是这样的拦截器实际上是没有太多意义的,咱们不可以针对 metadata
、message
来作本身的修改,若是咱们观察 InterceptingCall
的具体函数签名,会发现它支持两个参数的传入。
function InterceptingCall(next_call, requester) { this.next_call = next_call; this.requester = requester; }
上边示例只介绍了第一个参数,这个参数预期接受一个对象,对象会提供多个方法,咱们能够经过console.log(nextCall(options).constructor.prototype)
来查看都有哪些,例如 sendMessage
、start
之类的。
而观察这些函数的实现,会发现他们都调用了一个 _callNext
。
InterceptingCall.prototype.sendMessage = function(message) { this._callNext('sendMessage', [message]); }; InterceptingCall.prototype.halfClose = function() { this._callNext('halfClose'); }; InterceptingCall.prototype.cancel = function() { this._callNext('cancel'); }; InterceptingCall.prototype._callNext = function(method_name, args, next) { var args_array = args || []; var next_call = next ? next : this._getNextCall(method_name); if (this.requester && this.requester[method_name]) { // Avoid using expensive `apply` calls var num_args = args_array.length; switch (num_args) { case 0: return this.requester[method_name](next_call); case 1: return this.requester[method_name](args_array[0], next_call); case 2: return this.requester[method_name](args_array[0], args_array[1], next_call); } } else { if (next_call === emptyNext) { throw new Error('Interceptor call chain terminated unexpectedly'); } return next_call(args_array[0], args_array[1]); } };
在 _callNext
方法中,咱们就能够找到 requester
参数到底是有什么用了,若是 requester
也有实现对应的 method_name
,那么就会先执行 requester
的方法,随后将 next_call
对应的方法做为调用 requester
方法的最后一个参数传入。
在 grpc-node 中,拦截器的执行顺序与传入顺序有关,是一个队列,先传入的拦截器先执行,若是传入了第二个参数,则先执行第二个参数对应的方法,后执行第一个参数对应的方法。
因此若是咱们想作一些额外的事情,好比说针对 metadata
添加一个咱们想要的字段,那么就能够这么来写拦截器:
var interceptor = function(options, nextCall) { return new InterceptingCall(nextCall(options), { start: function(metadata, listener, next) { next(metadata, { onReceiveMetadata: function (metadata, next) { metadata.set('xxx', 'xxx') next(metadata); }, }); }, }); };
稍微特殊的地方是,start
函数的next
参数被调用时传入的第二个参数并非一个InterceptingCall
的实例,而是一个InterceptingListener
的实例,二者都有_callNext
的实现,只不过所提供的方法不彻底同样罢了。
接下来的代码逻辑主要是用于建立 Channel
,能够经过传递不一样的参数来覆盖 Channel
,也能够用默认的 Channel
,这个 Channel
对应的 gRPC 中其实就是作数据传输的那一个模块,能够理解为 HTTP2 最终是在这里使用的。
通常不多会去覆盖默认的 Channel
,因此咱们直接去看 grpc-node 里边的 Channel
是如何实现的。
Channel
是 c++ 代码实现的,代码的位置: https://github.com/grpc/grpc-...
若是有同窗尝试过混用 grpc-node
和 grpc-js
,那么你必定有看到过这个报错:Channel's second argument (credentials) must be a ChannelCredentials
缘由就在于 Channel
实例化过程当中会进行检查咱们建立 Channel
传入的 credential
是不是继承自 grpc 中的 ChannelCredentials
类。
而 grpc-node
和 grpc-js
用的是两个不一样的类,因此混用的话可能会出现这个问题。
而后就是根据传入的 credential
的不一样来判断是否要使用加密,而通常经常使用的 grpc.credentials.createInsecure()
其实就是不走加密的意思了,咱们能够在 https://github.com/grpc/grpc-... 和 https://github.com/grpc/grpc-... 来看到对应的逻辑。
后边就是调用 c++ 版本的 grpc 来构建对应的 Channel
了,若是有老铁看过 c++ 版本是如何建立 grpc Client 的,那么这些代码就比较熟悉了: https://github.com/grpc/grpc/...
在 grpc-node
中也是调用的一样的 API 来建立的。
当 Client
被建立出来后,咱们会调用 Client
上的方法(也就是发请求了),这时候就会触发到上边提到的 requester_funcs
其中的一个,咱们先从最简单的 Unary
来讲,这种 Client/Server 都是 Unary
请求方式时会触发的函数。
咱们经过上边 method_func
中调用方式能够肯定传递了什么参数进去,有几个固定的参数 path、request 序列化方式,以及 response 的反序列化方式。
后边的参数就是由调用时传入的动态参数了,这些能够在 makeUnaryRequest
函数定义中看到,分别是 argument
(也就是 request body)、metadata
(能够理解为 header,一些元数据)、options
是一个可选的参数(自定义的拦截器是放在这里的),能够用于覆盖 method 的一些描述信息,以及最后的 callback
就是咱们接收到 response 后应该作的操做了。
整个函数的实现,按长度来讲,有一半都是在处理参数,而剩下的部分则作了两件事,一个是实例化了 ClientUnaryCall
对象,另外一个则是处理拦截器相关的逻辑,并启动拦截器来发送整个请求。
在 makeUnaryRequest
函数中涉及到拦截器的部分有这么几块 resolveInterceptorProviders
、getLastListener
与getInterceptingCall
。
先来看 ClientUnaryCall
作了什么事情,在源码中有这样的一个代码块,是使用该对象的场景:
function ClientUnaryCall(call) { EventEmitter.call(this); this.call = call; } var callProperties = { argument: argument, metadata: metadata, call: new ClientUnaryCall(), channel: this.$channel, methodDefinition: method_definition, callOptions: options, callback: callback }; // 以及后续与拦截器产生了一些关联 var emitter = callProperties.call; // 这行代码很诡异,看起来是能够在实例化的时候传入的,却选择了在这里覆盖属性值 emitter.call = intercepting_call; var last_listener = client_interceptors.getLastListener( methodDefinition, emitter, callProperties.callback );
关于 ClientUnaryCall
的定义也很是简单,实际上是一个继承自 EventEmitter
的子类,增长了一个 call
属性的定义,以及两个方法封装调用了 call
属性对应的一些方法。
强烈怀疑 这部分代码是后期有过调整,由于 ClientUnaryCall
构造函数的实现中是能够接受一个参数做为 call
属性的赋值的,然而在代码应用中选择了后续覆盖 call
属性,而非直接在实例化的时候传入进去
resolveInterceptorProviders
是用来处理用户传入的拦截器的,这个函数在 Client
的整个生命周期会有两处调用,一个是在上边 Client
实例化的过程当中会触发一次,再有就是每次 method
被调用以前,会从新触发该函数。 resolveInterceptorProviders
的逻辑很简单,就是遍历咱们传入的 interceptor_provider
并将对应 method 的信息描述传入并执行,获得 provider
返回的 interceptor
用做拦截器。
在 Client
实例化过程当中是会遍历全部的 method
来执行,而在具体的 method
触发时则只触发当前 method
相关的 provider
逻辑。
getLastListener
按照注释中的描述,是为了得到一个最后会触发的监听者,源码大体是这样的:
https://github.com/grpc/grpc-...
var listenerGenerators = { [methodTypes.UNARY]: _getUnaryListener, [methodTypes.CLIENT_STREAMING]: _getClientStreamingListener, [methodTypes.SERVER_STREAMING]: _getServerStreamingListener, [methodTypes.BIDI_STREAMING]: _getBidiStreamingListener }; function getLastListener(method_definition, emitter, callback) { if (emitter instanceof Function) { callback = emitter; callback = function() {}; } if (!(callback instanceof Function)) { callback = function() {}; } if (!((emitter instanceof EventEmitter) && (callback instanceof Function))) { throw new Error('Argument mismatch in getLastListener'); } var method_type = common.getMethodType(method_definition); var generator = listenerGenerators[method_type]; return generator(method_definition, emitter, callback); }
一样也使用了一个枚举来区分不一样的方法类型来调用不一样的函数来生成对应的 listener。
好比这里用到的 getUnaryListener
,是这样的一个逻辑:
function _getUnaryListener(method_definition, emitter, callback) { var resultMessage; return { onReceiveMetadata: function (metadata) { emitter.emit('metadata', metadata); }, onReceiveMessage: function (message) { resultMessage = message; }, onReceiveStatus: function (status) { if (status.code !== constants.status.OK) { var error = common.createStatusError(status); callback(error); } else { callback(null, resultMessage); } emitter.emit('status', status); } }; }
代码也算比较清晰,在不一样的阶段会触发不一样的事件,而后再真正返回结果之后,触发 callback
来告知用户请求响应。
也就是咱们在示例中调用 sayHello
时传入的 callback
被调用的地方了。
getInterceptingCall
函数的调用会返回一个实例,经过操做该实例咱们能够控制请求的开始、数据的发送以及请求的结束。
咱们上边 getLastListener
返回的对象触发的时机也是会在这里能够找到的。
从源码上来看会涉及到这么几个函数:
var interceptorGenerators = { [methodTypes.UNARY]: _getUnaryInterceptor, [methodTypes.CLIENT_STREAMING]: _getClientStreamingInterceptor, [methodTypes.SERVER_STREAMING]: _getServerStreamingInterceptor, [methodTypes.BIDI_STREAMING]: _getBidiStreamingInterceptor }; function getInterceptingCall(method_definition, options, interceptors, channel, responder) { var last_interceptor = _getLastInterceptor(method_definition, channel, responder); var all_interceptors = interceptors.concat(last_interceptor); return _buildChain(all_interceptors, options); } function _getLastInterceptor(method_definition, channel, responder) { var callback = (responder instanceof Function) ? responder : function() {}; var emitter = (responder instanceof EventEmitter) ? responder : new EventEmitter(); var method_type = common.getMethodType(method_definition); var generator = interceptorGenerators[method_type]; return generator(method_definition, channel, emitter, callback); } function _buildChain(interceptors, options) { var next = function(interceptors) { if (interceptors.length === 0) { return function (options) {}; } var head_interceptor = interceptors[0]; var rest_interceptors = interceptors.slice(1); return function (options) { return head_interceptor(options, next(rest_interceptors)); }; }; var chain = next(interceptors)(options); return new InterceptingCall(chain); }
_getUnaryInterceptor 因为篇幅较长,直接贴 GitHub 连接了: https://github.com/grpc/grpc-...
大体的逻辑就是咱们经过 method_definition
、channel
等参数来获取到一个 interceptor
,并将其拼接到原有的 interceptor
后边,做为最后执行的拦截器, _buildChain
函数比较简单,就是实现了一个链式调用的函数,用来按顺序执行拦截器。
关于 interceptor 如何使用能够看咱们介绍 interceptor 用法时写的 demo
主要的逻辑实际上在 _getUnaryInterceptor
中,咱们会建立一个功能全面的 interceptor
,函数会返回一个匿名函数,就是咱们在上边代码中看到的调用 generator
的地方了,而在匿名函数的开头部门,咱们就调用了 getCall
来获取一个 call
对象,这个 call
对象就是咱们与 gRPC 服务器之间的通道了,请求最终是由 call
对象负责发送的。
getCall
中实际上调用了 channel
对象的 createCall
方法,这部分的逻辑也是在 c++ 中作的了,包含数据的发送之类的逻辑。
这是咱们回到 makeUnaryRequest
函数,再看函数结束的地方调用的那三个方法,第一个 start,将咱们的 metadata(能够理解为 header) 发送了过去,而后将真实的信息发送了过去,最后调用关闭方法。
咱们能够在 _getUnaryInterceptor
中的 start
、sendMessage
以及 halfClose
函数中都有调用 _startBatchIfReady
函数,而这个方法实际上就是调用的 channel
上的 startBatch
方法,再根据调用链查找,最终会看处处理逻辑在这里:https://github.com/grpc/grpc/...
opType 与 代码中 switch-case 中的对应关系在这里: https://github.com/grpc/grpc-...
首先在 start
里边主要是发送了 metadata
,而且尝试接受服务端返回过来的 metadata
,并在回调中触发咱们传入的 listener
的 onReceiveMetadata
方法。
而后检查 response 的状态是否正确,并触发 listener
的 onReceiveStatus
方法。
接下来是调用 sendMessage
方法,在这里咱们将消息体进行序列化,并发送,在回调中就会去调用咱们传入的 callback。
最后在 halfClose
方法中其实就是发送一个指令来设置请求的结束。
整个的流程细化之后大概是这个样子的:
上边总体的记录就是关于 Client 这一侧是如何实现的了。
主要涉及到 Client 的构建、发送请求时作的事情、拦截器的做用。
而更深刻的一些逻辑实际上是在 c++ 版本的 gRPC 库里所实现,因此本次笔记并无过多的涉及。
文章涉及到的部分示例代码仓库地址: https://github.com/Jiasm/grpc...