Go Rpc

简介

go 提供了自带的序列化协议gob(go binary),能够进行原生go类型的序列化和反序列化,其中一个应用就是go语言自带的rpc功能,主要在net/rpc包下。java

go 自带的rpc提供了简单的rpc相关的api,用户只须要依照约定实现function而后进行服务注册,就能够在客户端进行调用了。git

首先先列举一下go rpc中的对于服务端提供的方法的相关约束:github

  • the method's type is exported. 方法所属的类型必须是外部可见的golang

  • the method is exported. 方法必须是外部可见的json

  • the method has two arguments, both exported (or builtin) types. 方法参数只能有两个,并且必须是外部可见的类型或者是基本类型。api

  • the method's second argument is a pointer. 方法的第二个参数类型必须是指针缓存

  • the method has return type error.方法的返回值必须是error类型bash

这里结合我的的理解简单解释一下,方法和方法所属的类型必须是exported,也就是必须是外部可见的,相似于java中接口的方法只能是public同样,否则外部也不能调用。参数只能有两个,并且第二个必须是指针,这是由于go rpc约定第一个参数是方法所需的入参,而第二个参数表明方法的实际返回值,因此第二个参数必须是指针类型,由于须要在方法内部修改它的值。返回值必须是error类型,表示方法执行中出现的异常或者rpc过程当中的异常。session

从这几点约束能够看出来,go自带的rpc对相关的条件约束的很紧,这也符合go的“一件问题只有一个解决方式”的理念,经过明确的规定,让开发过程变得更简单。异步

快速上手

根据上面的约束,咱们先实际实现一个echo方法,它将客户端传递来的参数原样的返回:

1.首先服务端提供的方法必须归属于一个类型,并且是外部可见的类型,这里咱们就定义一个EchoService的空结构好了:

type EchoService struct {}
复制代码

2.服务端提供的方法必须也是外部可见的,因此定义一个方法叫作Echo:

func (service EchoService) Echo(arg string, result *string) error {
	*result = arg //在这里直接将第二个参数(也就是实际的返回值)赋值为arg
	return nil //error返回nil,也就是没有发生异常
}
复制代码

3.接下来咱们将Echo方法对外进行暴露:

func RegisterAndServe() {
	err := rpc.Register(&EchoService{})//注册并非注册方法,而是注册EchoService的一个实例
	if err != nil {
		log.Fatal("error registering", err)
		return
	}
	rpc.HandleHTTP() //rpc通讯协议设置为http协议
	err = http.ListenAndServe(":1234", nil) //端口设置为9999
	if err != nil {
		log.Fatal("error listening", err)
	}
}
复制代码

4.而后咱们定义一个客户端:

func CallEcho(arg string) (result string, err error) {
	var client *rpc.Client
	client, err = rpc.DialHTTP("tcp", ":9999") //经过rpc.DialHTTP建立一个client
	if err != nil {
		return "", err
	}
	err = client.Call("EchoService.Echo", arg, &result) //经过类型加方法名指定要调用的方法
	if err != nil {
		return "", err
	}
	return result, err
}
复制代码

5.最后分别启动服务端和客户端进行调用:

func main() {
	done := make(chan int)
	go server.RegisterAndServe() //先启动服务端
	time.Sleep(1e9) //sleep 1s,由于服务端启动是异步的,因此等一等
	go func() { //启动客户端
		result, err := client.CallEcho("hello world")
		if err != nil {
			log.Fatal("error calling", err)
		} else {
			fmt.Println("call echo:", result)
		}
		done <- 1
	}()
	<- done //阻塞等待客户端结束
}
复制代码

此外go自带的rpc还提供rpc over tcp的选项,只须要在listen和dial时使用tcp链接就能够了,rpc over tcp和这里的例子惟一的区别就是创建链接时的区别,实际的rpc over http也并无使用http协议,只是用http server创建链接而已。

go还提供基于json的rpc,只须要在服务端和客户端把rpc.ServeConn和rpc.Dial替换成jsonrpc.ServeConn和jsonrpc.Dial便可。

源码解析

Server端

PRC over HTTP

第一个示例中,咱们调用了rpc.HandleHTTP(),它的做用是将rpc server绑定到http端口上,执行了这个方法以后咱们仍然须要主动调用http.ListenAndServe。rpc.HandleHTTP的具体实现以下:

const (
	// Defaults used by HandleHTTP
	DefaultRPCPath   = "/_goRPC_"
	DefaultDebugPath = "/debug/rpc"
)
// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func HandleHTTP() {
	DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}
复制代码

能够看见,HandleHTTP方法调用了DefaultServer的HandleHTTP方法,DefaultServer是rpc包内定义的一个Server类型变量,Server定义了不少方法:

  • func (server *Server) Accept(lis net.Listener)
  • func (server *Server) HandleHTTP(rpcPath, debugPath string)
  • func (server *Server) ServeCodec(codec ServerCodec)
  • func (server *Server) ServeConn(conn io.ReadWriteCloser)
  • func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
  • func (server *Server) ServeRequest(codec ServerCodec) error

DefaultServer就是一个Server实例:

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

// NewServer returns a new Server.
func NewServer() *Server {
	return &Server{}
}
复制代码

其中HandleHTTP的具体实现以下:

// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
// and a debugging handler on debugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
	http.Handle(rpcPath, server)
	http.Handle(debugPath, debugHTTP{server})
}
复制代码

实际上HandleHTTP就是使用http包的功能,将server自身注册到http的url映射上了;从上面列举的Server类型的部分方法能够看出,Server自身实现了ServeHTTP方法,因此能够处理http请求:

// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != "CONNECT" {
		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
		w.WriteHeader(http.StatusMethodNotAllowed)
		io.WriteString(w, "405 must CONNECT\n")
		return
	}
	conn, _, err := w.(http.Hijacker).Hijack()
	if err != nil {
		log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
		return
	}
	io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
	server.ServeConn(conn)
}
复制代码

能够看到,rpc server收到http链接以后就会调用hijack方法接管这个链接,而后调用ServeConn方法进行处理,而ServeConn方法就和rpc over tcp没区别了。

总的来讲,rpc over http就是利用http包接收来自客户端的链接,后续的流程和rpc over TCP同样。

RPC over TCP

根据上面的第二个例子咱们能够看到,在使用rpc over tcp时,用户须要本身建立一个Listener并调用Accpet,而后调用Server的ServeConn方法。而咱们以前使用的rpc.ServeConn实际上调用了DefaultServer.ServeConn。而ServeConn的具体实现以下:

// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	buf := bufio.NewWriter(conn)
	srv := &gobServerCodec{
		rwc:    conn,
		dec:    gob.NewDecoder(conn),
		enc:    gob.NewEncoder(buf),
		encBuf: buf,
	}
	server.ServeCodec(srv) //构造了一个私有的gobServerCodec而后调用servCodec方法,表示默认使用gob序列化协议
}
复制代码

能够看到,ServeConn其实是构造了一个codec而后调用serveCodec方法,默认的逻辑是采用gobServerCodec,由此能够看出,若是咱们想使用自定义的序列化协议,只须要实现一个本身的ServerCodec就能够了,serverCodec接口定义以下:

// A ServerCodec implements reading of RPC requests and writing of
// RPC responses for the server side of an RPC session.
// The server calls ReadRequestHeader and ReadRequestBody in pairs
// to read requests from the connection, and it calls WriteResponse to
// write a response back. The server calls Close when finished with the
// connection. ReadRequestBody may be called with a nil
// argument to force the body of the request to be read and discarded.
// See NewClient's comment for information about concurrent access.
type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(interface{}) error
	WriteResponse(*Response, interface{}) error
	// Close can be called multiple times and must be idempotent.
	Close() error
}
复制代码

ServerCodec的方法定义里只出现了Request和Response,并无链接相关的定义,说明在链接相关的变量须要设置成ServerCodec的成员变量,每次调用都须要构造新的ServerCodec对象。

回到serveCodec方法,能够看到serveCodec的流程基本上就是:read request - invoke - close

// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	wg := new(sync.WaitGroup)
	for {
		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		if err != nil {
			if debugLog && err != io.EOF {
				log.Println("rpc:", err)
			}
			if !keepReading {
				break
			}
			// send a response if we actually managed to read a header.
			if req != nil {
				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
				server.freeRequest(req)
			}
			continue
		}
		wg.Add(1)
    //每一个请求的处理都在新的goroutine里执行
		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
	}
	// We've seen that there are no more requests.
	// Wait for responses to be sent before closing codec.
	wg.Wait()
	codec.Close()
}
复制代码

这里看到,serveCodec会一直调用ReadRequestHeader和ReadRequestBody方法读取请求,直到客户端链接再也不发送请求,在serveConn方法的注释里也提到了,对于serveConn方法一般建议使用goroutine来执行。

接下来仔细看一下readRequest的实现:

func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
	if err != nil {
		if !keepReading {
			return
		}
		// discard body
		codec.ReadRequestBody(nil)
		return
	}

	// Decode the argument value.
	argIsValue := false // if true, need to indirect before calling.
	if mtype.ArgType.Kind() == reflect.Ptr {
		argv = reflect.New(mtype.ArgType.Elem())
	} else {
		argv = reflect.New(mtype.ArgType)
		argIsValue = true
	}
	// argv guaranteed to be a pointer now.
	if err = codec.ReadRequestBody(argv.Interface()); err != nil {
		return
	}
	if argIsValue {
		argv = argv.Elem()
	}

	replyv = reflect.New(mtype.ReplyType.Elem())

	switch mtype.ReplyType.Elem().Kind() {
	case reflect.Map:
		replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
	case reflect.Slice:
		replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
	}
	return
}

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
	// Grab the request header.
	req = server.getRequest()
	err = codec.ReadRequestHeader(req)
	if err != nil {
		req = nil
		if err == io.EOF || err == io.ErrUnexpectedEOF {
			return
		}
		err = errors.New("rpc: server cannot decode request: " + err.Error())
		return
	}

	// We read the header successfully. If we see an error now,
	// we can still recover and move on to the next request.
	keepReading = true

	dot := strings.LastIndex(req.ServiceMethod, ".")
	if dot < 0 {
		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
		return
	}
	serviceName := req.ServiceMethod[:dot]
	methodName := req.ServiceMethod[dot+1:]

	// Look up the request.
	svci, ok := server.serviceMap.Load(serviceName)
	if !ok {
		err = errors.New("rpc: can't find service " + req.ServiceMethod)
		return
	}
	svc = svci.(*service)
	mtype = svc.method[methodName]
	if mtype == nil {
		err = errors.New("rpc: can't find method " + req.ServiceMethod)
	}
	return
}
复制代码

基本就是依次调用codec的readRequestHeader和readRequestBody,过程当中会使用go自带的gob序列化协议,这里先不深刻看,省得层次太深乱了。

接下来看invoke部分:

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
  //wg由ServeConn方法持有,用于阻塞等待调用方断开,这里每次处理一个请求就count down一次
	if wg != nil {
		defer wg.Done()
	}
  //对方法加锁,就为了把调用次数加一
	mtype.Lock()
  //调用次数加一,暂时没看到是干啥的
	mtype.numCalls++
	mtype.Unlock()
	function := mtype.method.Func
	// Invoke the method, providing a new value for the reply.
	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
	// The return value for the method is an error.
	errInter := returnValues[0].Interface()
	errmsg := ""
	if errInter != nil {
		errmsg = errInter.(error).Error()
	}
	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
	server.freeRequest(req)
}
复制代码

invoke部分就是经过反射调用对应实例的方法,而后将结果经过sendResponse返回给客户端,sendResponse实际上也是调用了codec的WriteResponse方法:

func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
	resp := server.getResponse()
	// Encode the response header
	resp.ServiceMethod = req.ServiceMethod
	if errmsg != "" {
		resp.Error = errmsg
		reply = invalidRequest
	}
	resp.Seq = req.Seq
	sending.Lock()
	err := codec.WriteResponse(resp, reply)
	if debugLog && err != nil {
		log.Println("rpc: writing response:", err)
	}
	sending.Unlock()
	server.freeResponse(resp)
}
复制代码

这里能够看到,服务端在发送数据过程当中是加了锁的,也就是WriteResponse部分是串行的。

服务端流程大体就到此为止了,总体思路仍是基本的RPC流程:经过Listener创建链接,调用codec进行编解码,经过反射执行真正的方法。server会读取到的request对象缓存在内存中,具体是一个链表的格式,直到server端逻辑执行完

这里再简单看一下rpc server相关的其余部分:

rpc.Register:register方法经过反射会把参数对应的类型下全部符合规范的方法加载并缓存起来

// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - two arguments, both of exported type
// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {
	return server.register(rcvr, "", false)
}

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (server *Server) RegisterName(name string, rcvr interface{}) error {
	return server.register(rcvr, name, true)
}

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
	s := new(service)
	s.typ = reflect.TypeOf(rcvr)
	s.rcvr = reflect.ValueOf(rcvr)
	sname := reflect.Indirect(s.rcvr).Type().Name()
	if useName {
		sname = name
	}
	if sname == "" {
		s := "rpc.Register: no service name for type " + s.typ.String()
		log.Print(s)
		return errors.New(s)
	}
	if !isExported(sname) && !useName {
		s := "rpc.Register: type " + sname + " is not exported"
		log.Print(s)
		return errors.New(s)
	}
	s.name = sname

	// Install the methods
	s.method = suitableMethods(s.typ, true)

	if len(s.method) == 0 {
		str := ""

		// To help the user, see if a pointer receiver would work.
		method := suitableMethods(reflect.PtrTo(s.typ), false)
		if len(method) != 0 {
			str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
		} else {
			str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
		}
		log.Print(str)
		return errors.New(str)
	}

	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		return errors.New("rpc: service already defined: " + sname)
	}
	return nil
}
复制代码

rpc包内定义的各个struct:service、methodType、Server、Request、Response

type service struct { //保存了服务提供者的各个信息,包括名称、类型、方法等等
	name   string                 // name of service
	rcvr   reflect.Value          // receiver of methods for the service
	typ    reflect.Type           // type of the receiver
	method map[string]*methodType // registered methods
}

type methodType struct {//保存了反射获取到的方法的相关信息,此外还有一个计数器,用来统计调用次数,还有一个继承的Mutext接口,用来作计数器的同步
	sync.Mutex // protects counters
	method     reflect.Method
	ArgType    reflect.Type
	ReplyType  reflect.Type
	numCalls   uint
}

// Server represents an RPC Server.
type Server struct { //server对象
	serviceMap sync.Map   // map[string]*service 保存服务提供者信息的map
	reqLock    sync.Mutex // protects freeReq 用于作freeReq的同步
	freeReq    *Request //rpc 请求
	respLock   sync.Mutex // protects freeResp 用于作freeResp的同步
	freeResp   *Response //rpc响应
}

// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct { //Request仅标识请求头,只包含一些元数据
	ServiceMethod string   // format: "Service.Method"
	Seq           uint64   // sequence number chosen by client
	next          *Request // for free list in Server
}

// Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Response struct {//Response仅标识请求头,只包含一些元数据
	ServiceMethod string    // echoes that of the Request
	Seq           uint64    // echoes that of the request
	Error         string    // error, if any.
	next          *Response // for free list in Server
}
复制代码

咱们能够留意到,Request和Response被定义成了链表同样的结构,并且Server还在request和response上加了同步,这是由于在server中req和resp是复用的,而不是每次处理请求都建立新的对象,具体能够从getRequest/getResponse/freeReqeust/freeResponse看出来:

func (server *Server) getRequest() *Request {
	server.reqLock.Lock()
	req := server.freeReq
	if req == nil {
		req = new(Request)
	} else {
		server.freeReq = req.next
		*req = Request{}
	}
	server.reqLock.Unlock()
	return req
}

func (server *Server) freeRequest(req *Request) {
	server.reqLock.Lock()
	req.next = server.freeReq
	server.freeReq = req
	server.reqLock.Unlock()
}

func (server *Server) getResponse() *Response {
	server.respLock.Lock()
	resp := server.freeResp
	if resp == nil {
		resp = new(Response)
	} else {
		server.freeResp = resp.next
		*resp = Response{}
	}
	server.respLock.Unlock()
	return resp
}

func (server *Server) freeResponse(resp *Response) {
	server.respLock.Lock()
	resp.next = server.freeResp
	server.freeResp = resp
	server.respLock.Unlock()
}
复制代码

Client端

客户端能够经过如下几种方式和服务端创建链接:

  • func Dial(network, address string) (*Client, error) //直接创建tcp链接

  • func DialHTTP(network, address string) (*Client, error) //经过http发送connect请求创建链接,使用默认的PATH

  • func DialHTTPPath(network, address, path string) (*Client, - error)//经过http发送connect请求创建链接,使用自定义的PATH

  • func NewClient(conn io.ReadWriteCloser) *Client //根据给定的链接创建rpc客户端

  • func NewClientWithCodec(codec ClientCodec) *Client //根据给定的ClientCodec创建rpc客户端

客户端的调用方式有两种:Call和Go,其中Call是同步调用,而Go则是异步调用。其中Call的返回值是error类型,而Go的返回值是Call类型。实际上Call也是调用Go方法实现的,只是在Call会阻塞等待Go方法返回结果而已。这里还有一个问题,就是调用时只能经过channel控制超时,底层的逻辑不会有超时,若是server端一直不返回,客户端缓存的请求就会一直不释放,致使泄漏。

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel. If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	client.send(call)
	return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}
复制代码

这里须要注意的是,Go方法接收一个channel类型的done做为结束的标记,并且这个channel必须是有缓冲的,至于为何,我猜想是防止往done里写入发生阻塞,具体还须要再确认下。

与server的实现相似,客户端提供了一个ClientCodec的接口,用来作请求和响应的解析,其中的方法这里就不列举了。

下面看一下客户端构造时的逻辑:

// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
	codec ClientCodec
	reqMutex sync.Mutex // protects following
	request  Request
	mutex    sync.Mutex // protects following
	seq      uint64
	pending  map[uint64]*Call
	closing  bool // user has called Close
	shutdown bool // server has told us to stop
}

// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
//
// The read and write halves of the connection are serialized independently,
// so no interlocking is required. However each half may be accessed
// concurrently so the implementation of conn should protect against
// concurrent reads or concurrent writes.
func NewClient(conn io.ReadWriteCloser) *Client {
	encBuf := bufio.NewWriter(conn)
	client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
	return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
	client := &Client{
		codec:   codec,
		pending: make(map[uint64]*Call),
	}
	go client.input()
	return client
}
复制代码

Client对象中包含一个map类型的pending,用来缓存全部未完成的请求,同时对request和seq作了同步。

能够看到,若是使用默认的NewClient方法,则会构造一个gobClientCodec,它使用gob做为序列化协议;也能够本身指定一个codec。

在构造时,会执行go client.input(),这个input方法就是client接收响应的逻辑了,这个方法会在循环中读取响应,根据响应的seq找到对应的请求,而后经过请求的done发送信号。

func (client *Client) input() {
	var err error
	var response Response
	for err == nil {
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}
		seq := response.Seq
		client.mutex.Lock()
		call := client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()

		switch {
		case call == nil:
			// We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to.
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
		case response.Error != "":
			// We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. call.Error = ServerError(response.Error) err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } call.done() default: err = client.codec.ReadResponseBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } // Terminate pending calls. client.reqMutex.Lock() client.mutex.Lock() client.shutdown = true closing := client.closing if err == io.EOF { if closing { err = ErrShutdown } else { err = io.ErrUnexpectedEOF } } for _, call := range client.pending { call.Error = err call.done() } client.mutex.Unlock() client.reqMutex.Unlock() if debugLog && err != io.EOF && !closing { log.Println("rpc: client protocol error:", err) } } 复制代码

codec

除了go自带的gob序列化,用户还可使用其余的序列化方式,包括前面提到的json。go 提供了json格式的rpc,能够支持跨语言的调用。

其余

值得注意的是,net/rpc下的内容目前已经再也不更新(freeze)了,具体参考:github.com/golang/go/i…

网上有博客说go 自带的rpc性能远优于grpc,再也不更新的缘由可能只是开发团队再也不愿意花费过多精力而已。

相关文章
相关标签/搜索