在上一篇文章中咱们先列举了大体的需求,定义了消息协议。此次咱们着手搭建基本的RPC框架,首先实现基础的方法调用功能。java
RPC调用的第一步,就是在服务端定义要对外暴露的方法,在grpc或者是thrift中,这一步咱们须要编写语言无关的idl文件,而后经过idl文件生成对应语言的代码。而在咱们的框架里,出于简单起见,咱们不采用idl的方式,直接在代码里定义接口和方法。这里先规定对外的方法必须遵照如下几个条件:git
为何要有这几个规定呢,具体的缘由是这样的:由于java中的RPC框架场用到的动态代理在go语言中并不支持,因此咱们须要显式地定义方法的统一格式,这样在RPC框架中才能统一地处理不一样的方法。因此咱们规定了方法的格式:github
这里咱们须要注意的是,服务提供者在对外暴露时并不须要以接口的形式暴露,只要服务提供者有符合规则的方法便可;而客户端在调用方法时指定的是服务提供者的具体类型,不能指定接口的名称,即便服务提供者实现了这个接口。golang
contet.Context缓存
context是go语言提供的关于请求上下文的抽象,它携带了请求deadline、cancel信号的信息,还能够传递一些上下文信息,很是适合做为RPC请求的上下文,咱们能够在context中设置超时时间,还能够将一些参数无关的元数据经过context传递到服务端。网络
实际上,方法的固定格式以及用Call和Go来表示同步和异步调用都是go自带的rpc里的规则,只是在参数里增长了context.Context。不得不说go自带的rpc设计确实十分优秀,值得好好学习理解。框架
首先是面向使用者的RPC框架中的客户端和服务端接口:异步
type RPCServer interface {
//注册服务实例,rcvr是receiver的意思,它是咱们对外暴露的方法的实现者,metaData是注册服务时携带的额外的元数据,它描述了rcvr的其余信息
Register(rcvr interface{}, metaData map[string]string) error
//开始对外提供服务
Serve(network string, addr string) error
}
type RPCClient interface {
//Go表示异步调用
Go(ctx context.Context, serviceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call
//Call表示异步调用
Call(ctx context.Context, serviceMethod string, arg interface{}, reply interface{}) error
Close() error
}
type Call struct {
ServiceMethod string // 服务名.方法名
Args interface{} // 参数
Reply interface{} // 返回值(指针类型)
Error error // 错误信息
Done chan *Call // 在调用结束时激活
}
复制代码
此次先实现RPC调用部分,这两层暂时忽略,后续再实现。post
接下来咱们须要选择一个序列化协议,这里就选以前使用过的messagepack。以前设计的通讯协议分为两个部分:head和body,这两个部分都须要进行序列化和反序列化。head部分是元数据,能够直接采用messagepack序列化,而body部分是方法的参数或者响应,其序列化由head中的SerializeType决定,这样的好处就是为了后续扩展方便,目前也使用messagepack序列化,后续也能够采用其余的方式序列化。学习
序列化的逻辑也定义为接口:
type Codec interface {
Encode(value interface{}) ([]byte, error)
Decode(data []byte, value interface{}) error
}
复制代码
肯定好了序列化协议以后,咱们就能够定义消息协议相关的接口了。协议的设计参考上一篇文章:从零开始实现一个RPC框架(零)
接下来就是协议的接口定义:
//Messagge表示一个消息体
type Message struct {
*Header //head部分, Header的定义参考上一篇文章
Data []byte //body部分
}
//Protocol定义了如何构造和序列化一个完整的消息体
type Protocol interface {
NewMessage() *Message
DecodeMessage(r io.Reader) (*Message, error)
EncodeMessage(message *Message) []byte
}
复制代码
根据以前的设计,因此交互都经过接口进行,这样方便扩展和替换。
协议的接口定义好了以后,接下来就是网络传输层的定义:
//传输层的定义,用于读取数据
type Transport interface {
Dial(network, addr string) error
//这里直接内嵌了ReadWriteCloser接口,包含Read、Write和Close方法
io.ReadWriteCloser
RemoteAddr() net.Addr
LocalAddr() net.Addr
}
//服务端监听器定义,用于监听端口和创建链接
type Listener interface {
Listen(network, addr string) error
Accept() (Transport, error)
//这里直接内嵌了Closer接口,包含Close方法
io.Closer
}
复制代码
各个层次的接口定义好了以后,就能够开始搭建基础的框架了,这里不附上具体的代码了,具体代码能够参考github连接 ,这里大体描述一下各个部分的实现思路。
客户端的功能比较简单,就是将参数序列化以后,组装成一个完整的消息体发送出去。请求发送出去的同时,将未完成的请求都缓存起来,每收到一个响应就和未完成的请求进行匹配。
发送请求的核心在Go
和send
方法,Go
的功能是组装参数,send
方法是将参数序列化并经过传输层的接口发送出去,同时将请求缓存到pendingCalls
中。而Call
方法则是直接调用Go
方法并阻塞等待知道返回或者超时。 接收响应的核心在input
方法,input
方法在client初始化完成时经过go input()
执行。input
方法包含一个无限循环,在无限循环中读取传输层的数据并将其反序列化,并将反序列化获得的响应与缓存的请求进行匹配。
注:send
和input
方法的命名也是从go自带的rpc里学来的。
服务端在接受注册时,会过滤服务提供者的各个方法,将合法的方法缓存起来。
服务端的核心逻辑是serveTransport
方法,它接收一个Transport
对象,而后在一个无限循环中从Transport
读取数据并反序列化成请求,根据请求指定的方法查找自身缓存的方法,找到对应 的方法后经过反射执行对应的实现并返。执行完成后再根据返回结果或者是执行发生的异常组装成一个完整的消息,经过Transport
发送出去。
服务端在反射执行方法时,须要将实现者做为执行的第一个参数,因此参数比方法定义中的参数多一个。
这两个部分就比较简单了,codec基本上就是使用messagepack实现了对应的接口;protocol的实现就是根据咱们定义的协议进行解析。
在执行过程当中,除了客户端的用户线程和服务端用来执行方法的服务线程,还分别增长了客户端轮询线程和服务端监听线程,大体的示意图以下:
到此咱们的RPC框架已经具有了雏形,可以支持基础的RPC调用了。实际上整个框架就是参考go自带的rpc的结构,客户端和服务端的线程模型和go自带的rpc同样,只是本身定义了序列化和消息协议,并且实现的过程当中保留了扩展的接口,方便后续进行完善和扩展。下一步的规划是实现过滤器链,以便后续实现服务治理相关的功能。