在上一篇文章中咱们介绍了 Go 标准库net/rpc
的用法。在默认状况下,rpc
库内部使用gob
格式传输数据。咱们仿造gob
的编解码器实现了一个json
格式的。实际上标准库net/rpc/jsonrcp
中已有实现。本文是对上一篇文章的补充。git
标准库无需安装。github
首先是服务端,使用net/rpc/jsonrpc
以后,咱们就不用本身去编写json
的编解码器了:golang
package main import ( "log" "net" "net/rpc" "net/rpc/jsonrpc" ) type Args struct { A, B int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } // 注意这一行 go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) } }
直接调用jsonrpc.NewServerCodec(conn)
建立一个服务端的codec
。客户端也是相似的:json
func main() { conn, err := net.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } // 这里,这里😁 client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn)) args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
先运行服务端程序:数组
$ go run main.go
而后在一个新的控制台中运行客户端程序:服务器
$ go run client.go Multiply: 7*8=56
下面这段代码基本上每一个使用jsonrpc
的程序都要编写:微信
conn, err := net.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
所以jsonrpc
为了方便直接提供了一个Dial
方法。使用Dial
简化上面的客户端程序:负载均衡
func main() { client, err := jsonrpc.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
效果是同样的。tcp
JSON-RPC 1.0 标准在 2005 年发布,通过数年演化,于 2010 年发布了 2.0 版本。JSON-RPC 标准的内容可在https://www.jsonrpc.org/specification查看。Go 标准库net/rpc/jsonrpc
实现了 1.0 版本。关于 2.0 版本的实现能够在pkg.go.dev
上搜索json-rpc+2.0
。本文以 1.0 版本为基础进行介绍。编辑器
JSON-RPC 传输的是单一的对象,序列化为 JSON 格式。请求对象包含如下 3 个属性:
method
:请求调用的方法;params
:一个数组表示传给方法的各个参数;id
:请求 ID。ID 能够是任何类型,在收到响应时根据这个属性判断对应哪一个请求。响应对象包含如下 3 个属性:
result
:方法返回的对象,若是error
非空时,该属性必须为null
;error
:表示调用是否出错;id
:对应请求的 ID。另外标准还定义了一种通知类型,除了id
属性为null
以外,通知对象的属性与请求对象彻底同样。
调用client.Call("echo", "Hello JSON-RPC", &reply)
时:
请求:{ "method": "echo", "params": ["Hello JSON-RPC"], "id": 1} 响应:{ "result": "Hello JSON-RPC", "error": null, "id": 1}
下面咱们使用zookeeper
实现一个简单的客户端侧的负载均衡。zookeeper
中记录全部的可提供服务的服务器,客户端每次请求时都随机挑选一个。咱们的示例中,请求必须是无状态的。首先,咱们改造一下服务端程序,将监听地址提取出来,经过flag
指定:
package main import ( "flag" "log" "net" "net/rpc" "net/rpc/jsonrpc" ) var ( addr *string ) type Args struct { A, B int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func init() { addr = flag.String("addr", ":1111", "addr to listen") } func main() { flag.Parse() l, err := net.Listen("tcp", *addr) if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) } }
关于有哪些服务器可用,咱们存储在zookeeper
中。
首先要启动一个zookeeper
的程序。在 Apache Zookeeper 官网能够下载能直接运行的 Windows 程序。下载以后解压,将conf
文件夹中的样板配置zoo_sample.cfg
复制一份,文件名改成zoo.cfg
。在编辑器中打开zoo.cfg
,将dataDir
改成一个已存在的目录,或建立一个新目录。我在bin
同级目录中建立了一个data
目录,而后设置dataDir=../data
。切换到bin
目录下执行zkServer.bat
,zookeeper
程序就运行起来了。使用zkClient.bat
链接上这个zookeeper
,增长一个节点,设置数据:
$ create /rpcserver $ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112,127.0.0.1:1113
咱们用,
分隔多个服务器地址。
准备工做完成后,接下来就开始编写客户端代码了。咱们实现一个代理类,负责监听zookeeper
的数据变化,根据zookeeper
中新的地址建立到服务器的链接,删除老的链接,将调用请求随机转发到一个服务器处理:
type Proxy struct { zookeeper string clients map[string]*rpc.Client events <-chan zk.Event zookeeperConn *zk.Conn mutex sync.Mutex } func NewProxy(addr string) *Proxy { return &Proxy{ zookeeper: addr, clients: make(map[string]*rpc.Client), } }
这里咱们使用了go-zookeeper
这个库,须要额外安装:
$ go get github.com/samuel/go-zookeeper/zk
程序启动时,代理对象从zookeeper
中获取服务端地址,建立链接:
func (p *Proxy) Connect() { c, _, err := zk.Connect([]string{p.zookeeper}, time.Second) //*10) if err != nil { panic(err) } data, _, event, err := c.GetW("/rpcserver") if err != nil { panic(err) } p.events = event p.zookeeperConn = c p.CreateClients(string(data)) } func (p *Proxy) CreateClients(server string) { p.mutex.Lock() defer p.mutex.Unlock() addrs := strings.Split(server, ",") allAddr := make(map[string]struct{}) for _, addr := range addrs { allAddr[addr] = struct{}{} if _, exist := p.clients[addr]; exist { continue } client, err := jsonrpc.Dial("tcp", addr) if err != nil { log.Println("jsonrpc Dial error:", err) continue } p.clients[addr] = client log.Println("new addr:", addr) } for addr := range p.clients { if _, exist := allAddr[addr]; !exist { // 不在 zookeeper 中的地址,删除对应链接 oldClient.Close() delete(p.clients, addr) log.Println("delete addr", addr) } } }
同时,须要监听zookeeper
中的数据变化,当新增或删除某个服务端地址时,Proxy
要及时更新链接:
func (p *Proxy) Run() { for { select { case event := <-p.events: if event.Type == zk.EventNodeDataChanged { data, _, err := p.zookeeperConn.Get("/rpcserver") if err != nil { log.Println("get zookeeper data failed:", err) continue } p.CreateClients(string(data)) } } } }
客户端主体程序使用Proxy
结构很是方便:
package main import ( "flag" "fmt" "math/rand" ) var ( zookeeperAddr *string ) func init() { zookeeperAddr = flag.String("addr", ":2181", "zookeeper address") } type Args struct { A, B int } func main() { flag.Parse() fmt.Println(*zookeeperAddr) p := NewProxy(*zookeeperAddr) p.Connect() go p.Run() for i := 0; i < 10; i++ { var reply int args := &Args{rand.Intn(1000), rand.Intn(1000)} p.Call("Arith.Multiply", args, &reply) fmt.Printf("%d*%d=%d\n", args.A, args.B, reply) } // sleep 过程当中能够修改 zookeeper 中的数据 time.Sleep(1 * time.Minute) // 使用新的地址作随机 for i := 0; i < 100; i++ { var reply int args := &Args{rand.Intn(1000), rand.Intn(1000)} p.Call("Arith.Multiply", args, &reply) fmt.Printf("%d*%d=%d\n", args.A, args.B, reply) } }
建立一个代理对象,在一个新的 goroutine 中监听zookeeper
事件。而后经过Proxy
的Call
调用远程服务端的方法:
func (p *Proxy) Call(method string, args interface{}, reply interface{}) error { var client *rpc.Client var addr string idx := rand.Int31n(int32(len(p.clients))) var i int32 p.mutex.Lock() for a, c := range p.clients { if i == idx { client = c addr = a break } i++ } p.mutex.Unlock() fmt.Println("use", addr) return client.Call(method, args, reply) }
首先咱们要启动 3 个服务端程序,分别监听端口 11十一、11十二、1113,须要 3 个控制台:
控制台 1:
$ go run main.go -addr :1111
控制台 2:
$ go run main.go -addr :1112
控制台 3:
$ go run main.go -addr :1113
客户端在一个新的控制台启动,指定zookeeper
地址:
$ go run . -addr=127.0.0.1:2181
在输出中,咱们能够看到是怎么随机挑选服务器的。
咱们能够尝试在客户端程序运行的过程当中,将某个服务器地址从zookeeper
中删除。我特地在程序中加了一个 1 分钟的延迟。在sleep
过程当中,经过zkClient.cmd
将127.0.0.1:1113
这个地址从zookeeper
中删除:
$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112
控制台输出:
$ 2020/05/10 23:47:47 delete addr 127.0.0.1:1113
而且后续的请求不会再发到127.0.0.1:1113
这个服务器了。
其实,在实际的项目中,Proxy
通常是一个独立的服务器,而不是放在客户端侧。上面示例这样处理只是为了方便。
RPC 底层可使用各类协议传输数据,JSON/XML/Protobuf 均可以。对 rpc 感兴趣的建议看看rpcx
这个库,https://github.com/smallnest/rpcx。很是强大!
你们若是发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄
欢迎关注个人微信公众号【GoUpUp】,共同窗习,一块儿进步~