RPC(Remote Procedure Call)是远程方法调用的缩写,它能够经过网络调用远程对象的方法。Go 标准库net/rpc
提供了一个简单、强大且高性能的 RPC 实现。仅需编写不多的代码就能实现 RPC 服务。本文就来介绍一下这个库。git
标准库无需安装。github
因为是网络程序,咱们须要编写服务端和客户端两个程序。首先是服务端程序:golang
package main import ( "errors" "log" "net" "net/http" "net/rpc" ) type Args struct { A, B int } type Quotient struct { Quo, Rem int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func (t *Arith) Divide(args *Args, quo *Quotient) error { if args.B == 0 { return errors.New("divide by 0") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main() { arith := new(Arith) rpc.Register(arith) rpc.HandleHTTP() if err := http.ListenAndServe(":1234", nil); err != nil { log.Fatal("serve error:", err) } }
咱们定义了一个Arith
类型,为它编写了两个方法Multiply
和Divide
。建立Arith
类型的对象arith
,调用rpc.Register(arith)
会注册这两个方法。rpc
库对注册的方法有必定的限制,方法必须知足签名func (t *T) MethodName(argType T1, replyType *T2) error
:web
error
类型的值。返回非nil
的值,表示调用出错。rpc.HandleHTTP()
注册 HTTP 路由。http.ListenAndServe(":1234", nil)
在端口1234
上启动一个 HTTP 服务,请求 rpc 方法会交给rpc
内部路由处理。这样咱们就能够经过客户端调用这两个方法了:编程
package main import ( "fmt" "log" "net/rpc" ) type Args struct { A, B int } type Quotient struct { Quo, Rem int } func main() { client, err := rpc.DialHTTP("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) args = &Args{15, 6} var quo Quotient err = client.Call("Arith.Divide", args, &quo) if err != nil { log.Fatal("Divide error:", err) } fmt.Printf("Divide: %d/%d=%d...%d\n", args.A, args.B, quo.Quo, quo.Rem) }
客户端比服务端稍微简单一点,咱们使用rpc.DialHTTP("tcp", ":1234")
链接到服务端的监听地址,返回一个 rpc 的客户端对象。后续就能够调用该对象的Call()
方法调用服务端对象的对应方法,依次传入方法名(须要加上类型限定)、参数、一个指针(用于接收返回值)。首先运行服务端程序:json
$ go run main.go
而后在一个新的控制台中运行客户端程序,输出:浏览器
$ go run client.go Multiply: 7*8=56 Divide: 15/6=2...3
对net/http
包不熟悉的童鞋可能会以为奇怪,rpc.HandleHTTP()
与http.ListenAndServer(":1234", nil)
是怎么联系起来的?咱们简单看一下源码:安全
// src/net/rpc/server.go const ( // Defaults used by HandleHTTP DefaultRPCPath = "/_goRPC_" DefaultDebugPath = "/debug/rpc" ) func (server *Server) HandleHTTP(rpcPath, debugPath string) { http.Handle(rpcPath, server) http.Handle(debugPath, debugHTTP{server}) } func HandleHTTP() { DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) }
实际上,rpc.HandleHTTP()
会调用http.Handle()
在预约义的路径上(/_goRPC_
)注册处理器。这个处理器最终被添加到net/http
包中的默认多路复用器上:服务器
// src/net/http/server.go func Handle(pattern string, handler Handler) { DefaultServeMux.Handle(pattern, handler) }
而http.ListenAndServer()
第二个参数传入nil
时也是使用默认的多路复用器。具体能够看看我以前的文章Go Web 编程之 程序结构。微信
细心的朋友可能发现了,除了默认的路径/_goRPC_
用来处理 RPC 请求,rpc.HandleHTTP()
方法还注册了一个调试路径/debug/rpc
。咱们能够直接在浏览器中访问这个网址(须要服务端程序开启。若是服务端在远程,须要相应地修改地址)localhost:1234,直观的查看各个方法的调用状况:
上面的例子中,咱们在客户端使用了同步的调用方式,即一直等待服务端的响应或出错。在等待的过程当中,客户端就不能处理其它的任务了。固然,咱们也能够采用异步的调用方式:
func main() { client, err := rpc.DialHTTP("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args1 := &Args{7, 8} var reply int multiplyReply := client.Go("Arith.Multiply", args1, &reply, nil) args2 := &Args{15, 6} var quo Quotient divideReply := client.Go("Arith.Divide", args2, &quo, nil) ticker := time.NewTicker(time.Millisecond) defer ticker.Stop() var multiplyReplied, divideReplied bool for !multiplyReplied || !divideReplied { select { case replyCall := <-multiplyReply.Done: if err := replyCall.Error; err != nil { fmt.Println("Multiply error:", err) } else { fmt.Printf("Multiply: %d*%d=%d\n", args1.A, args1.B, reply) } multiplyReplied = true case replyCall := <-divideReply.Done: if err := replyCall.Error; err != nil { fmt.Println("Divide error:", err) } else { fmt.Printf("Divide: %d/%d=%d...%d\n", args2.A, args2.B, quo.Quo, quo.Rem) } divideReplied = true case <-ticker.C: fmt.Println("tick") } } }
异步调用使用client.Go()
方法,参数与同步调用基本同样。它返回一个rpc.Call
对象:
// src/net/rpc/client.go type Call struct { ServiceMethod string Args interface{} Reply interface{} Error error Done chan *Call }
咱们能够经过该对象获取这次调用的信息,如方法名、参数、返回值和错误。咱们经过监听通道Done
是否有值判断调用是否完成。上面代码中使用一个select
语句轮询两次调用的状态。注意一点,若是多个通道都有值,select
执行哪一个case
是随机的。因此可能先输出divide
的信息:
$ go run client.go Divide: 15/6=2...3 Multiply: 7*8=56
服务端能够继续使用一开始的。
默认状况下,rpc.Register()
将方法接收者(receiver
)的类型名做为方法名前缀。咱们也能够本身设置。这时须要调用RegisterName(name string, rcvr interface{}) error
方法:
func main() { arith := new(Arith) rpc.RegisterName("math", arith) rpc.HandleHTTP() if err := http.ListenAndServe(":1234", nil); err != nil { log.Fatal("serve error:", err) } }
上面咱们将注册的方法名前缀改成math
了,客户端调用时传入的方法名也须要相应的修改:
func main() { client, err := rpc.DialHTTP("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args := &Args{7, 8} var reply int err = client.Call("math.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
上面咱们都是使用 HTTP 协议来实现 rpc 服务的,rpc
库也支持直接使用 TCP 协议。首先,服务端先调用net.Listen("tcp", ":1234")
建立一个监听某个 TCP 端口的监听器(Accepter),而后使用rpc.Accept(l)
在此监听器上接受链接并处理:
func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) rpc.Accept(l) }
而后,客户端调用rpc.Dial()
以 TCP 协议链接到服务端:
func main() { client, err := rpc.Dial("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
咱们能够本身接受链接,而后在此链接上应用 rpc 协议:
func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } go rpc.ServeConn(conn) } }
这个客户端与上面 TCP 的客户端同样,不用修改。
默认客户端与服务端之间的数据使用gob
编码,咱们可使用其它的格式来编码。在服务端,咱们要实现rpc.ServerCodec
接口:
// src/net/rpc/server.go type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(interface{}) error WriteResponse(*Response, interface{}) error Close() error }
实际上不用这么麻烦,咱们查看源码看看gobServerCodec
是怎么实现的,而后仿造实现一个就好了。下面我实现了一个 JSON 格式的编解码器:
type JsonServerCodec struct { rwc io.ReadWriteCloser dec *json.Decoder enc *json.Encoder encBuf *bufio.Writer closed bool } func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec { buf := bufio.NewWriter(conn) return &JsonServerCodec{conn, json.NewDecoder(conn), json.NewEncoder(buf), buf, false} } func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error { return c.dec.Decode(r) } func (c *JsonServerCodec) ReadRequestBody(body interface{}) error { return c.dec.Decode(body) } func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { if c.encBuf.Flush() == nil { log.Println("rpc: json error encoding response:", err) c.Close() } return } if err = c.enc.Encode(body); err != nil { if c.encBuf.Flush() == nil { log.Println("rpc: json error encoding body:", err) c.Close() } return } return c.encBuf.Flush() } func (c *JsonServerCodec) Close() error { if c.closed { return nil } c.closed = true return c.rwc.Close() } func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } go rpc.ServeCodec(NewJsonServerCodec(conn)) } }
在for
循环中须要建立编解码器JsonServerCodec
传给ServeCodec
方法。一样的,客户端要实现rpc.ClientCodec
接口,也是仿造gobClientCodec
的实现:
type JsonClientCodec struct { rwc io.ReadWriteCloser dec *json.Decoder enc *json.Encoder encBuf *bufio.Writer } func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec { encBuf := bufio.NewWriter(conn) return &JsonClientCodec{conn, json.NewDecoder(conn), json.NewEncoder(encBuf), encBuf} } func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { return } if err = c.enc.Encode(body); err != nil { return } return c.encBuf.Flush() } func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error { return c.dec.Decode(r) } func (c *JsonClientCodec) ReadResponseBody(body interface{}) error { return c.dec.Decode(body) } func (c *JsonClientCodec) Close() error { return c.rwc.Close() } func main() { conn, err := net.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } client := rpc.NewClientWithCodec(NewJsonClientCodec(conn)) args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
要使用NewClientWithCodec
以指定的编解码器建立客户端。
实际上,上面咱们调用的方法rpc.Register
,rpc.RegisterName
,rpc.ServeConn
,rpc.ServeCodec
都是转而去调用默认DefaultServer
的相关方法:
// src/net/rpc/server.go var DefaultServer = NewServer() func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) } func RegisterName(name string, rcvr interface{}) error { return DefaultServer.RegisterName(name, rcvr) } func ServeConn(conn io.ReadWriteCloser) { DefaultServer.ServeConn(conn) } func ServeCodec(codec ServerCodec) { DefaultServer.ServeCodec(codec) }
可是由于DefaultServer
是全局共享的,若是有第三方库使用了相关方法,而且注册了一些对象的方法,咱们引用这个第三方库以后,就出现两个问题。第一,可能与咱们注册的方法冲突;第二,带来额外的安全隐患(库中方法直接panic
?)。故而推荐作法是本身NewServer
:
func main() { arith := new(Arith) server := rpc.NewServer() server.RegisterName("math", arith) server.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath) if err := http.ListenAndServe(":1234", nil); err != nil { log.Fatal("serve error:", err) } }
这实际上是一个套路,不少库会提供一个默认的实现直接使用,如log
、net/http
这些库。可是也提供了建立和自定义的方法。通常测试时为了方即可以使用默认实现,实践中最好本身建立相应的对象,避免干扰和安全问题。
本文介绍了 Go 标准库中的rpc
,它使用很是简单,性能异常强大。不少rpc
的第三方库都是对rpc
的封装,早期版本的rpcx
就是基于rpc
作的封装。
你们若是发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄
欢迎关注个人微信公众号【GoUpUp】,共同窗习,一块儿进步~