在以前的两篇文章rpc
和json-rpc
中,咱们介绍了 Go 标准库提供的rpc
实现。在实际开发中,rpc
库的功能仍是有所欠缺。今天咱们介绍一个很是优秀的 Go RPC 库——rpcx
。rpcx
是一位国人大牛开发的,详细开发历程能够在rpcx
官方博客了解。rpcx
拥有媲美,甚至某种程度上超越gRPC
的性能,有完善的中文文档,提供服务发现和治理的插件。git
本文示例使用go modules
。github
首先是安装:golang
$ go get -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...
复制代码
能够看出rpcx
的安装有点特殊。使用go get -v github.com/smallnest/rpcx/...
命令只会安装rpcx
的基础功能。扩展功能都是经过build tags
指定。为了使用方便,通常安装全部的tags
,如上面命令所示。这也是官方推荐的安装方式。json
咱们先编写服务端程序,实际上这个程序与用rpc
标准库编写的程序几乎如出一辙:bash
package main
import (
"context"
"errors"
"github.com/smallnest/rpcx/server"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by 0")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
s := server.NewServer()
s.RegisterName("Arith", new(Arith), "")
s.Serve("tcp", ":8972")
}
复制代码
首先建立一个Server
对象,调用它的RegisterName()
方法在服务路径Arith
下注册Mul
和Div
方法。与标准库相比,rpcx
要求注册方法的第一个参数必须为context.Context
类型。最后调用s.Serve("tcp", ":8972")
监听 TCP 端口 8972。是否是很简单?启动服务器:服务器
$ go run main.go
复制代码
而后是客户端程序:微信
package main
import (
"context"
"flag"
"log"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", ":8972", "service address")
)
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &Args{A:10, B:20}
var reply int
err :=xclient.Call(context.Background(), "Mul", args, &reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
args = &Args{50, 20}
var quo Quotient
err = xclient.Call(context.Background(), "Div", args, &quo)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
复制代码
rpcx
支持多种服务发现的方式让客户端找到服务器。上面代码中咱们使用的是最简单的点到点的方式,也就是直连。要调用服务端的方法,必须先建立一个Client
对象。使用Client
对象来调用远程方法。运行客户端:网络
$ go run main.go
10 * 20 = 200
50 * 20 = 2...10
复制代码
注意到,建立Client
对象的参数有client.Failtry
和client.RandomSelect
。这两个参数分别为失败模式和如何选择服务器。dom
rpcx
支持多种传输协议:tcp
TCP
:TCP 协议,网络名称为tcp
;HTTP
:HTTP 协议,网络名称为http
;UnixDomain
:unix 域协议,网络名称为unix
;QUIC
:是 Quick UDP Internet Connections 的缩写,意为快速UDP网络链接。HTTP/3 底层就是 QUIC 协议,Google 出品。网络名称为quic
;KCP
:快速而且可靠的 ARQ 协议,网络名称为kcp
。rpcx
对这些协议作了很是好的封装。除了在建立服务器和客户端链接时须要指定协议名称,其它时候的使用基本是透明的。咱们将上面的例子改装成使用http
协议的:
服务端改动:
s.Serve("http", ":8972")
复制代码
客户端改动:
d := client.NewPeer2PeerDiscovery("http@"+*addr, "")
复制代码
QUIC
和KCP
的使用有点特殊,QUIC
必须与 TLS 一块儿使用,KCP
也须要作传输加密。使用 Go 语言咱们能很方便地生成一个证书和私钥:
package main
import (
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"os"
"time"
)
func main() {
max := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, _ := rand.Int(rand.Reader, max)
subject := pkix.Name{
Organization: []string{"Go Daily Lib"},
OrganizationalUnit: []string{"TechBlog"},
CommonName: "go daily lib",
}
template := x509.Certificate{
SerialNumber: serialNumber,
Subject: subject,
NotBefore: time.Now(),
NotAfter: time.Now().Add(365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
}
pk, _ := rsa.GenerateKey(rand.Reader, 2048)
derBytes, _ := x509.CreateCertificate(rand.Reader, &template, &template, &pk.PublicKey, pk)
certOut, _ := os.Create("server.pem")
pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
certOut.Close()
keyOut, _ := os.Create("server.key")
pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(pk)})
keyOut.Close()
}
复制代码
上面代码生成了一个证书和私钥,有效期为 1 年。运行程序,获得两个文件server.pem
和server.key
。而后咱们就能够编写使用QUIC
协议的程序了。服务端:
func main() {
cert, _ := tls.LoadX509KeyPair("server.pem", "server.key")
config := &tls.Config{Certificates: []tls.Certificate{cert}}
s := server.NewServer(server.WithTLSConfig(config))
s.RegisterName("Arith", new(Arith), "")
s.Serve("quic", "localhost:8972")
}
复制代码
实际上就是加载证书和密钥,而后在建立Server
对象时做为选项传入。客户端改动:
conf := &tls.Config{
InsecureSkipVerify: true,
}
option := client.DefaultOption
option.TLSConfig = conf
d := client.NewPeer2PeerDiscovery("quic@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
defer xclient.Close()
复制代码
客户端也须要配置 TLS。
有一点须要注意,rpcx
对quic/kcp
这些协议的支持是经过build tags
实现的。默认不会编译quic/kcp
相关文件。若是要使用,必须本身手动指定tags
。先启动服务端程序:
$ go run -tags quic main.go
复制代码
而后切换到客户端程序目录,执行下面命令:
$ go run -tags quic main.go
复制代码
还有一点须要注意,在使用tcp
和http
(底层也是tcp
)协议的时候,咱们能够将地址简写为:8972
,由于默认就是本地地址。可是quic
不行,必须把地址写完整:
// 服务端
s.Serve("quic", "localhost:8972")
// 客户端
addr = flag.String("addr", "localhost:8972", "service address")
复制代码
上面的例子都是调用对象的方法,咱们也能够调用函数。函数的类型与对象方法相比只是没有接收者。注册函数须要指定一个服务路径。服务端:
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func Mul(cxt context.Context, args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func Div(cxt context.Context, args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by 0")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
s := server.NewServer()
s.RegisterFunction("function", Mul, "")
s.RegisterFunction("function", Div, "")
s.Serve("tcp", ":8972")
}
复制代码
只是注册方法由RegisterName
变为了RegisterFunction
,参数由一个对象变为一个函数。咱们须要为注册的函数指定一个服务路径,客户端调用时会根据这个路径查找对应方法。客户端:
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("function", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &Args{A: 10, B: 20}
var reply int
err := xclient.Call(context.Background(), "Mul", args, &reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
args = &Args{50, 20}
var quo Quotient
err = xclient.Call(context.Background(), "Div", args, &quo)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
复制代码
rpcx
支持多种注册中心:
zookeeper
:经常使用的注册中心;Etcd
:Go 语言编写的注册中心;Consul/mDNS
等。咱们以前演示的都是点对点的链接,接下来咱们介绍如何使用zookeeper
做为注册中心。在rpcx
中,注册中心是经过插件的方式集成的。使用ZooKeeperRegisterPlugin
这个插件来集成Zookeeper
。服务端代码:
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
var (
addr = flag.String("addr", ":8972", "service address")
zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
basePath = flag.String("basePath", "/services/math", "service base path")
)
type Arith int
func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
fmt.Println("Mul on", *addr)
*reply = args.A * args.B
return nil
}
func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
fmt.Println("Div on", *addr)
if args.B == 0 {
return errors.New("divide by 0")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
flag.Parse()
p := &serverplugin.ZooKeeperRegisterPlugin{
ServiceAddress: "tcp@" + *addr,
ZooKeeperServers: []string{*zkAddr},
BasePath: *basePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
if err := p.Start(); err != nil {
log.Fatal(err)
}
s := server.NewServer()
s.Plugins.Add(p)
s.RegisterName("Arith", new(Arith), "")
s.Serve("tcp", *addr)
}
复制代码
在ZooKeeperRegisterPlugin
中,咱们指定了本服务地址,zookeeper 集群地址(能够是多个),起始路径等。服务器启动时自动向 zookeeper 注册本服务的信息,客户端可直接从 zookeeper 拉取可用的服务列表。
首先启动 zookeeper 服务器,zookeeper 的安装与启动能够参考个人上一篇文章。分别在 3 个控制台中启动 3 个服务器,指定不一样的端口(注意须要指定-tags zookeeper
):
// 控制台1
$ go run -tags zookeeper main.go -addr 127.0.0.1:8971
// 控制台2
$ go run -tags zookeeper main.go -addr 127.0.0.1:8972
// 控制台3
$ go run -tags zookeeper main.go -addr 127.0.0.1:8973
复制代码
启动以后,咱们观察 zookeeper 路径/services/math
中的内容:
很是棒,可用的服务地址不用咱们手动维护了!
接下来是客户端:
var (
zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
basePath = flag.String("basePath", "/services/math", "service base path")
)
func main() {
flag.Parse()
d := client.NewZookeeperDiscovery(*basePath, "Arith", []string{*zkAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &Args{A: 10, B: 20}
var reply int
err := xclient.Call(context.Background(), "Mul", args, &reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
args = &Args{50, 20}
var quo Quotient
err = xclient.Call(context.Background(), "Div", args, &quo)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
复制代码
咱们经过 zookeeper 读取可用的Arith
服务列表,而后随机选择一个服务发送请求:
$ go run -tags zookeeper main.go
2020/05/26 23:03:40 Connected to 127.0.0.1:2181
2020/05/26 23:03:40 authenticated: id=72057658440744975, timeout=10000
2020/05/26 23:03:40 re-submitting `0` credentials after reconnect
10 * 20 = 200
50 * 20 = 2...10
复制代码
咱们的客户端发送了两条请求。因为使用了client.RandomSelect
策略,因此这两个请求随机发送到某个服务端。我在Mul
和Div
方法中增长了一个打印,能够观察一下各个控制台的输出!
若是咱们关闭了某个服务器,对应的服务地址会从 zookeeper 中移除。我关闭了服务器 1,zookeeper 服务列表变为:
相比上一篇文章中须要手动维护 zookeeper 的内容,rpcx
的自动注册和维护明显要方便太多了!
rpcx
是 Go 语言中数一数二的 rpc 库,功能丰富,性能出众,文档丰富,已经被很多公司和我的采用。本文介绍的只是最基础的功能,rpcx
支持各类路由选择策略、分组、限流、身份认证等高级功能,推荐深刻学习!
你们若是发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄
个人博客:darjun.github.io
欢迎关注个人微信公众号【GoUpUp】,共同窗习,一块儿进步~