go rpc 源码分析

1.概述

go 源码中带了rpc框架,以相对精简的当时方式实现了rpc功能,目前源码中的rpc官方已经宣布再也不添加新功能,并推荐使用grpc. 做为go标准库中rpc框架,仍是有不少地方值得借鉴及学习,这里将从源码角度分析go原生rpc框架。git

2.server端

server端主要分为两个步骤,首先进行方法注册,经过反射处理将方法取出,并存到map中.而后是网络调用,主要是监听端口,读取数据包,解码请求 调用反射处理后的方法,将返回值编码,返回给客户端.github

2.1 方法注册

图

2.1.1 Register
// Register publishes the receiver's methods in the DefaultServer. func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) } // RegisterName is like Register but uses the provided name for the type // instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
	return DefaultServer.RegisterName(name, rcvr)
}
复制代码

如上,方法注册的入口函数有两个,分别为Register以及RegisterName,这里interface{}一般是带方法的对象.若是想要自定义方法的接收对象,则可使用RegisterName.golang

2.1.2 反射处理过程
type methodType struct {
	sync.Mutex // protects counters
	method     reflect.Method    //反射后的函数
	ArgType    reflect.Type      //请求参数的反射值
	ReplyType  reflect.Type      //返回参数的反射值
	numCalls   uint              //调用次数
}


type service struct {
	name   string                 // 服务名,这里一般为register时的对象名或自定义对象名
	rcvr   reflect.Value          // 服务的接收者的反射值
	typ    reflect.Type           // 接收者的类型
	method map[string]*methodType // 对象的全部方法的反射结果.
}
复制代码

反射处理过程,其实就是将对象以及对象的方法,经过反射生成上面的结构,如注册Arith.Multiply(xx,xx) error 这样的对象时,生成的结构为 map["Arith"]*service, service 中ethod为 map["Multiply"]*methodType.json

几个关键代码以下:bash

生成service对象网络

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
	//生成service
    s := new(service)
	s.typ = reflect.TypeOf(rcvr)
	s.rcvr = reflect.ValueOf(rcvr)
	sname := reflect.Indirect(s.rcvr).Type().Name()
 
    ....
	s.name = sname

	// 经过suitableMethods将对象的方法转换成map[string]*methodType结构
	s.method = suitableMethods(s.typ, true)
    
    ....

    //service存储为键值对
	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		return errors.New("rpc: service already defined: " + sname)
	}
	return nil
}
复制代码

生成 map[string] *methodType并发

func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
	methods := make(map[string]*methodType)

    //经过反射,遍历全部的方法
	for m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		// Method must be exported.
		if method.PkgPath != "" {
			continue
		}
		// Method needs three ins: receiver, *args, *reply.
		if mtype.NumIn() != 3 {
			if reportErr {
				log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
			}
			continue
		}
        //取出请求参数类型
		argType := mtype.In(1)
        ...

		// 取出响应参数类型,响应参数必须为指针
		replyType := mtype.In(2)
		if replyType.Kind() != reflect.Ptr {
			if reportErr {
				log.Println("method", mname, "reply type not a pointer:", replyType)
			}
			continue
		}
		...


		// 去除函数的返回值,函数的返回值必须为error.
		if returnType := mtype.Out(0); returnType != typeOfError {
			if reportErr {
				log.Println("method", mname, "returns", returnType.String(), "not error")
			}
			continue
		}
        
        //将方法存储成key-value
		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
	}
	return methods
}
复制代码

2.2 网络调用

// Request 每次rpc调用的请求的头部分
type Request struct {
	ServiceMethod string   // 格式为: "Service.Method"
	Seq           uint64   // 客户端生成的序列号
	next          *Request // server端保持的链表
}

// Response 每次rpc调用的响应的头部分
type Response struct {
	ServiceMethod string    // 对应请求部分的 ServiceMethod
	Seq           uint64    // 对应请求部分的 Seq
	Error         string    // 错误
	next          *Response // server端保持的链表
}

复制代码

如上,网络调用主要用到上面的两个结构体,分别是请求参数以及返回参数,经过编解码器(gob/json)实现二进制到结构体的相互转换.主要涉及到下面几个步骤: app

图

关键代码以下: 取出请求,并获得相应函数的调用参数框架

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
	// Grab the request header.
	req = server.getRequest()
    //编码器读取生成请求
	err = codec.ReadRequestHeader(req)
	if err != nil {
        //错误处理
        ...
		return
	}

	keepReading = true

    //取出服务名以及方法名
	dot := strings.LastIndex(req.ServiceMethod, ".")
	if dot < 0 {
		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
		return
	}
	serviceName := req.ServiceMethod[:dot]
	methodName := req.ServiceMethod[dot+1:]

	//从注册时生成的map中查询出相应的方法的结构
	svci, ok := server.serviceMap.Load(serviceName)
	if !ok {
		err = errors.New("rpc: can't find service " + req.ServiceMethod)
		return
	}
	svc = svci.(*service)

    //获取出方法的类型
	mtype = svc.method[methodName]
	if mtype == nil {
		err = errors.New("rpc: can't find method " + req.ServiceMethod)
	}

复制代码

//循环处理,不断读取连接上的字节流,解密出请求,调用方法,编码响应,回写到客户端.异步

func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	for {
		//读取请求
        service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		if err != nil {
            ...
		}

        //调用
		go service.call(server, sending, mtype, req, argv, replyv, codec)
	}
	codec.Close()
}
复制代码

经过参数进行函数调用

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
	mtype.Lock()
	mtype.numCalls++
	mtype.Unlock()
	function := mtype.method.Func
	// 经过反射进行函数调用
	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
	// 返回值是不为空时,则取出错误的string
	errInter := returnValues[0].Interface()
	errmsg := ""
	if errInter != nil {
		errmsg = errInter.(error).Error()
	}
    
    //发送相应,并释放请求结构
    server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
	server.freeRequest(req)
}


复制代码

3.client端

// 异步调用
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
}

// 同步调用
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
}

复制代码
// Call represents an active RPC.
type Call struct {
	ServiceMethod string      // 服务名及方法名 格式:服务.方法
	Args          interface{} // 函数的请求参数 (*struct).
	Reply         interface{} // 函数的响应参数 (*struct).
	Error         error       // 方法完成后 error的状态.
	Done          chan *Call  // 方法调用结束后的channel.
}

复制代码

client端部分则相对要简单不少,主要提供Call以及Go两个方法,分别表示同步调用以及异步调用,但其实同步调用底层实现其实也是异步调用,调用时主要用到了Call结构,相关解释如上.

3.1 主要流程

图

3.2 关键代码

发送请求部分代码,每次send一次请求,均生成一个call对象,并使用seq做为key保存在map中,服务端返回时从map取出call,进行相应处理.

func (client *Client) send(call *Call) {
    //请求级别的锁
	client.reqMutex.Lock()
	defer client.reqMutex.Unlock()

	// Register this call.
	client.mutex.Lock()
	if client.shutdown || client.closing {
		call.Error = ErrShutdown
		client.mutex.Unlock()
		call.done()
		return
	}

    //生成seq,每次调用均生成惟一的seq,在服务端相应后会经过该值进行匹配
	seq := client.seq
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()

	// 请求并发送请求
	client.request.Seq = seq
	client.request.ServiceMethod = call.ServiceMethod
	err := client.codec.WriteRequest(&client.request, call.Args)
	if err != nil {
        //发送请求错误时,将map中call对象删除.
		client.mutex.Lock()
		call = client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

复制代码

接收响应部分的代码,这里是一个for循环,不断读取tcp上的流,并解码成Response对象以及方法的Reply对象.

func (client *Client) input() {
	var err error
	var response Response
	for err == nil {
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}

        //经过response中的 Seq获取call对象
		seq := response.Seq
		client.mutex.Lock()
		call := client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()

		switch {
		case call == nil:
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
		case response.Error != "":
            //服务端返回错误,直接将错误返回
			call.Error = ServerError(response.Error)
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
			call.done()
		default:
            //经过编码器,将Resonse的body部分解码成reply对象.
			err = client.codec.ReadResponseBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}
	}

	// 客户端退出处理
	client.reqMutex.Lock()
	client.mutex.Lock()
	client.shutdown = true
	closing := client.closing
	if err == io.EOF {
		if closing {
			err = ErrShutdown
		} else {
			err = io.ErrUnexpectedEOF
		}
	}
	for _, call := range client.pending {
		call.Error = err
		call.done()
	}
	client.mutex.Unlock()
	client.reqMutex.Unlock()
	if debugLog && err != io.EOF && !closing {
		log.Println("rpc: client protocol error:", err)
	}
}

复制代码

4.一些缺点

  • 同步调用没法超时
    因为原生rpc只提供两个方法,同步的Call以及异步的Go,同步的Call服务端不返回则会一直阻塞,这里若是存在大量的不返回,会致使协程一直没法释放.

  • 异步调用超时后会内存泄漏
    基于异步调用加channel实现超时功能也会存在泄漏问题,缘由是client的请求会存在map结构中,Go函数退出并不会清理map的内容,所以若是server端不返回的话,map中的请求会一直存储,从而致使内存泄漏.

  • 底层连接状态没法维持
    因为没有keepalive机制,当对底层连接进行复用时会出现连接实际已经不可用,但上层没法感知到的状况,从而致使发出请求,一直没法收到回应.

5. 总结

总的来讲,go原生rpc算是个基础版本的rpc,代码精简,可扩展性高,可是只是实现了rpc最基本的网络通信,像超时熔断,连接管理(保活与重连),服务注册发现,仍是欠缺的,所以仍是达不到生产环境开箱即用,相对来讲grpc则要成熟不少,最近准备基于grpc集成一套微服务通信框架,大部分组件都是开源的,项目见grpc-wrapper.

6. 参考

rpc

相关文章
相关标签/搜索