RPC是远程过程调用(Remote Procedure Call)的简称,经过RPC咱们能够像调用本地方法同样调用位于其余位置的函数。你们更常见的多是HTTP API调用,简单来对比的话,RPC比起HTTP调用封装更完善,调用者没必要手动处理序列化和反序列化,使用成本更低一些(虽然学习成本可能会更高)。json
出于学习目的,此次的目标是使用go语言来实现一个本身的RPC。在现实世界里,对于一个RPC工具,除了方法调用之外,人们更看重的是其余功能好比服务发现、负载均衡、熔断降级之类的功能,这里暂时不会涉及,而是仅关注实现一个能够工做的方法调用。缓存
在以前的文章里大体了解了go语言自带的rpc框架,其中就提到go rpc预留了codec接口,可让用户在go rpc使用本身的序列化协议,此次就尝试实现一个本身的codec来实现本身的RPC。bash
要实现一个RPC,基本的元素大概有这几个:序列化协议、网络模型和线程模型。而go rpc里的codec基本上实现的就是序列化协议。网络
原本想着用比较熟悉的thrift协议,可是使用thrift自己实现了RPC流程,因此它并非一个单纯的序列化协议,它的序列化逻辑可能没法和go rpc很好的契合,再加上还须要书写IDL定义,增长复杂度。原本就是为了熟悉go,因此这里先从简单的开始,因而选择messagepack做为序列化协议。并发
messagepack是一个比较轻量级的序列化协议,它的逻辑和json相似,可是使用的是二进制形式,因此比json序列化更快,序列化后产生的数据也更小,基本上能够认为是一个二进制版本的json。负载均衡
要实现本身的codec,须要分别实现go rpc中提供个两个接口:ServerCodec和ClientCodec,很明显他们分别表示服务端和客户端的逻辑,两个接口的定义具体以下:框架
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
Close() error
}
type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
复制代码
能够看到,go rpc将一次请求/响应抽象成了header+body的形式,读取数据时分为读取head和读取body,写入数据时只需写入body部分,go rpc会替咱们加上head部分。 接下来咱们定义两个结构,用来表示一次请求/响应的完整数据:tcp
type MsgpackReq struct {
rpc.Request //head
Arg interface{} //body
}
type MsgpackResp struct {
rpc.Response //head
Reply interface{} //body
}
复制代码
这里的msgpackReq和msgpackResp直接内嵌了go rpc里自带的Request和Response,自带的Request和Response定义了序号、方法名等信息。函数
接下来就是自定义Codec的声明:工具
type MessagePackServerCodec struct {
rwc io.ReadWriteCloser //用于读写数据,实际是一个网络链接
req MsgpackReq //用于缓存解析到的请求
closed bool //标识codec是否关闭
}
type MessagePackClientCodec struct {
rwc io.ReadWriteCloser
resp MsgpackResp //用于缓存解析到的请求
closed bool
}
func NewServerCodec(conn net.Conn) *MessagePackServerCodec {
return &MessagePackServerCodec{conn, MsgpackReq{}, false}
}
func NewClientCodec(conn net.Conn) *MessagePackClientCodec {
return &MessagePackClientCodec{conn, MsgpackResp{}, false}
}
复制代码
在以前的文章里提到了,codec须要包含一个数据源用于读写数据,这里直接将网路链接传递进去。
接下来是具体的方法实现,出于简单起见,这里将反序列化部分的两步合并为一步,在读取head部分时就将全部的数据解析好并缓存起来,读取body时直接返回缓存的结果。具体的思路就是:
这里直接上代码:
func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
//先判断codec是否已经关闭,若是是则直接返回
if c.closed {
return nil
}
//将r和arg组装成一个MsgpackReq并序列化
request := &MsgpackReq{*r, arg}
reqData, err := msgpack.Marshal(request)
if err != nil {
panic(err)
return err
}
//先发送数据长度
head := make([]byte, 4)
binary.BigEndian.PutUint32(head, uint32(len(reqData)))
_, err = c.rwc.Write(head)
//再将序列化产生的数据发送出去
_, err = c.rwc.Write(reqData)
return err
}
func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
//先判断codec是否已经关闭,若是是则直接返回
if c.closed {
return nil
}
//读取数据
data, err := readData(c.rwc)
if err != nil {
//client一旦初始化就会开始轮询数据,因此要处理链接close的状况
if strings.Contains(err.Error(), "use of closed network connection") {
return nil
}
panic(err) //简单起见,出现异常直接panic
}
//将读取到的数据反序列化成一个MsgpackResp
var response MsgpackResp
err = msgpack.Unmarshal(data, &response)
if err != nil {
panic(err) //简单起见,出现异常直接panic
}
//根据读取到的数据设置request的各个属性
r.ServiceMethod = response.ServiceMethod
r.Seq = response.Seq
//同时将读取到的数据缓存起来
c.resp = response
return nil
}
func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
//这里直接用缓存的数据返回便可
if "" != c.resp.Error {//若是返回的是异常
return errors.New(c.resp.Error)
}
if reply != nil {
//正常返回,经过反射将结果设置到reply变量,由于reply必定是指针类型,因此没必要检查CanSet
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
}
return nil
}
func (c *MessagePackClientCodec) Close() error {
c.closed = true //关闭时将closed设置为true
if c.rwc != nil {
return c.rwc.Close()
}
return nil
}
复制代码
以上就是client部分的实现,值得注意的有几点:
一样直接上代码:
func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
//先判断codec是否已经关闭,若是是则直接返回
if c.closed {
return nil
}
//将r和reply组装成一个MsgpackResp并序列化
response := &MsgpackResp{*r, reply}
respData, err := msgpack.Marshal(response)
if err != nil {
panic(err)
return err
}
head := make([]byte, 4)
binary.BigEndian.PutUint32(head, uint32(len(respData)))
_, err = c.rwc.Write(head)
//将序列化产生的数据发送出去
_, err = c.rwc.Write(respData)
return err
}
func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
//先判断codec是否已经关闭,若是是则直接返回
if c.closed {
return nil
}
//读取数据
data, err := readData(c.rwc)
if err != nil {
//这里不能直接panic,须要处理EOF和reset的状况
if err == io.EOF {
return err
}
if strings.Contains(err.Error(), "connection reset by peer") {
return err
}
panic(err) //其余异常直接panic
}
//将读取到的数据反序列化成一个MsgpackReq
var request MsgpackReq
err = msgpack.Unmarshal(data, &request)
if err != nil {
panic(err) //简单起见,出现异常直接panic
}
//根据读取到的数据设置request的各个属性
r.ServiceMethod = request.ServiceMethod
r.Seq = request.Seq
//同时将解析到的数据缓存起来
c.req = request
return nil
}
func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
if arg != nil {
//参数不为nil,经过反射将结果设置到arg变量
reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
}
return nil
}
func (c *MessagePackServerCodec) Close() error {
c.closed = true
if c.rwc != nil {
return c.rwc.Close()
}
return nil
}
复制代码
实际上server端的实现几乎和client端的逻辑同样,只是request和response的角色不一样而已。其中有几点须要注意:
具体思路参考go语言处理TCP拆包/粘包 ,这里附上readData的实现:
func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
const HeadSize = 4 //设定长度部分占4个字节
headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize))
headData := make([]byte, HeadSize)
for {
readSize, err := conn.Read(headData)
if err != nil {
returnError = err
return
}
headBuf.Write(headData[0:readSize])
if headBuf.Len() == HeadSize {
break
} else {
headData = make([]byte, HeadSize-readSize)
}
}
bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen))
bodyData := make([]byte, bodyLen)
for {
readSize, err := conn.Read(bodyData)
if err != nil {
returnError = err
return
}
bodyBuf.Write(bodyData[0:readSize])
if bodyBuf.Len() == bodyLen {
break
} else {
bodyData = make([]byte, bodyLen-readSize)
}
}
data = bodyBuf.Bytes()
returnError = nil
return
}
复制代码
接下来咱们经过简单的Echo调用测试一下咱们的codec:
//声明接口类
type EchoService struct {}
//定义方法Echo
func (service *EchoService) Echo(arg string, result *string) error {
*result = arg
return nil
}
//服务端启动逻辑
func RegisterAndServeOnTcp() {
err := rpc.Register(&EchoService{})//注册并非注册方法,而是注册EchoService的一个实例
if err != nil {
log.Fatal("error registering", err)
return
}
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
if err != nil {
log.Fatal("error resolving tcp", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("error accepting", err)
} else {
//这里先经过NewServerCodec得到一个实例,而后调用rpc.ServeCodec来启动服务
rpc.ServeCodec(msgpk.NewServerCodec(conn))
}
}
}
//客户端调用逻辑
func Echo(arg string) (result string, err error) {
var client *rpc.Client
conn, err := net.Dial("tcp", ":1234")
client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))
defer client.Close()
if err != nil {
return "", err
}
err = client.Call("EchoService.Echo", arg, &result) //经过类型加方法名指定要调用的方法
if err != nil {
return "", err
}
return result, err
}
//main函数
func main() {
go server.RegisterAndServeOnTcp() //先启动服务端
time.Sleep(1e9)
wg := new(sync.WaitGroup) //waitGroup用于阻塞主线程防止提早退出
callTimes := 10
wg.Add(callTimes)
for i := 0; i < callTimes; i++ {
go func() {
//使用hello world加一个随机数做为参数
argString := "hello world "+strconv.Itoa(rand.Int())
resultString, err := client.Echo(argString)
if err != nil {
log.Fatal("error calling:", err)
}
if resultString != argString {
fmt.Println("error")
} else {
fmt.Printf("echo:%s\n", resultString)
}
wg.Done()
}()
}
wg.Wait()
}
复制代码
上面的例子里首先经过go server.RegisterAndServeOnTcp()启动了服务端,而后同时启动了10个go routine来发起请求,客户端在收到响应以后会打印对应的结果。最后执行main函数,控制台会输出结果(后面的随机数可能会不一样):
echo:hello world 8674665223082153551
echo:hello world 6129484611666145821
echo:hello world 5577006791947779410
echo:hello world 605394647632969758
echo:hello world 4037200794235010051
echo:hello world 3916589616287113937
echo:hello world 894385949183117216
echo:hello world 1443635317331776148
echo:hello world 2775422040480279449
echo:hello world 6334824724549167320
复制代码
到这里,一个简单的自定义的go语言rpc就已经完成了,虽然自定义部分只有序列化协议部分而已,好比线程模型还是go rpc自带的逻辑,除此以外也没有前言里提到的各类高级功能。后续再考虑尝试用go语言从零开始实现一个RPC吧。
有细心的同窗可能已经发现了,这里实现的逻辑当中彻底没有考虑并发的问题,缓存数据也是直接放到codec对象。而这样简单的实现也不会致使并发调用失败,其中具体的缘由就是go rpc在处理每一个codec对象时,读取请求都是顺序的,而后再并发的处理请求并返回结果。