原文以下:html
rpcx是一个相似阿里巴巴 Dubbo 和微博 Motan 的分布式的RPC服务框架,基于Golang net/rpc实现。git
谈起分布式的RPC框架,比较出名的是阿里巴巴的dubbo,包括由当当网维护的dubbox。
不知道dubbo在阿里的内部竞争中败给了HSF,仍是阿里有意将其闭源了,官方的代码使用的spring还停留在2.5.6.SEC03的版本,dubbox的spring也只升级到3.2.9.RELEASE。
无论怎样,dubbo仍是在电商企业获得普遍的应用,京东也有部分在使用dubbo开发。程序员
DUBBO是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治理方案的核心框架,天天为2,000+个服务提供3,000,000,000+次访问量支持,并被普遍应用于阿里巴巴集团的各成员站点。微博的RPC框架 Motan 也正式开源了,如张雷所说:github
2013 年微博 RPC 框架 Motan 在前辈大师们(福林、fishermen、小麦、王喆等)的精心设计和辛勤工做中诞生,向各位大师们致敬,也获得了微博各个技术团队的鼎力支持及不断完善,现在 Motan 在微博平台中已经普遍应用,天天为数百个服务完成近千亿次的调用。golang
这两个个优秀的框架都是使用Java开发的,国外的互联网企业也有很是出名的的RPC框架如 thrift 、 finagle 。算法
本项目 rpcx 的目标就是实现一个Go生态圈的Dubbo,为Go生态圈提供一个分布式的、多插件的、带有服务治理功能的产品级的RPC框架。spring
Go生态圈已经有一些RPC库,如官方的 net/rpc 、 grpc-go 、 gorilla-rpc 等,为何还要开发 rpcx 呢?apache
缘由在于尽管这些框架都是为Go实现的RPC库,可是它们的功能比较单一,只是实现了点对点(End-to-End)的通信框架。缺少服务治理的功能,好比服务注册和发现、负载均衡、容灾、服务监控等功能。所以我基于Go net/rpc框架实现了一个相似Dubbo的分布式框架。编程
和rpcx比较相似的Go RPC框架是 go-micro ,可是rpcx提供了更丰富的功能,基于TCP的通信协议性能更好。json
远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通讯协议。该协议容许运行于一台计算机的程序调用另外一台计算机的子程序,而程序员无需额外地为这个交互做用编程。若是涉及的软件采用面向对象编程,那么远程过程调用亦可称做远程调用或远程方法调用,例:Java RMI。简单地说就是能使应用像调用本地方法同样的调用远程的过程或服务。很显然,这是一种client-server的交互形式,调用者(caller)是client,执行者(executor)是server。典型的实现方式就是request–response通信机制。
RPC 是进程之间的通信方式(inter-process communication, IPC), 不一样的进程有不一样的地址空间。
若是client和server在同一台机器上,尽管物理地址空间是相同的,可是虚拟地址空间不一样。
若是它们在不一样的主机上,物理地址空间也不一样。
RPC的实现的技术各不相同,也不必定兼容。
一个正常的RPC过程能够分红下面几步:
RPC只是描绘了 Client 与 Server 之间的点对点调用流程,包括 stub、通讯、RPC 消息解析等部分,在实际应用中,还须要考虑服务的高可用、负载均衡等问题,因此产品级的 RPC 框架除了点对点的 RPC 协议的具体实现外,还应包括服务的发现与注销、提供服务的多台 Server 的负载均衡、服务的高可用等更多的功能。目前的 RPC 框架大体有两种不一样的侧重方向,一种偏重于服务治理,另外一种偏重于跨语言调用。
服务治理型的 RPC 框架有 Dubbo、DubboX、Motan 等,这类的 RPC 框架的特色是功能丰富,提供高性能的远程调用以及服务发现及治理功能,适用于大型服务的微服务化拆分以及管理,对于特定语言(Java)的项目能够十分友好的透明化接入。但缺点是语言耦合度较高,跨语言支持难度较大。
跨语言调用型的 RPC 框架有 Thrift、gRPC、Hessian、Hprose 等,这一类的 RPC 框架重点关注于服务的跨语言调用,可以支持大部分的语言进行语言无关的调用,很是适合于为不一样语言提供通用远程服务的场景。但这类框架没有服务发现相关机制,实际使用时通常须要代理层进行请求转发和负载均衡策略控制。
本项目 rpcx 属于服务治理类型,是一个基于 Go 开发的高性能的轻量级 RPC 框架,Motan 提供了实用的服务治理功能和基于插件的扩展能力。
rpcx使用Go实现,适合使用Go语言实现RPC的功能。
rpcx目标是轻量级的,小而简单,可是指望全部的功能均可以经过插件的方式搭积木的方式完成。
rpcx中有服务提供者 RPC Server,服务调用者 RPC Client 和服务注册中心 Registry 三个角色。
当前rpcx支持zookeeper, etcd等注册中心,Consul注册中心正在开发中。
rpcx基于Go net/rpc的底层实现, Client和Server之间通信是经过TCP进行通信的,它们之间经过Client发送Request,Server返回Response实现。
Request和Response消息的格式都是 Header+Body
的格式。Header和Body具体的格式根据编码方式的不一样而不一样,能够是二进制,也能够是结构化数据如JSON。
rpcx拥有众多特性。
rpcx当前支持多种序列化/反序列化的方式,能够根据需求选择合适的编码库。
特性 | 功能描述 |
---|---|
gob | 官方提供的序列化方式,基于一个包含元数据的流 |
jsonrpc | 也是官方提供的编码库,以JSON格式传输 |
msgp | 相似json格式的编码,可是更小更快,能够直接编码struct |
gencode | 一个超级快的序列化库,须要定义schema,可是定义方式和struct相似 |
protobuf | Google推出的广受关注的序列化库,推荐使用 gogo-protobuf ,能够得到更高的性能 |
在数据结构简单的状况下,这几种库均可以知足需求,参照本文中的benchmark测试。可是若是追求性能,建议采用后面三种序列化库。
序列化库的选择对于RPC服务的影响是巨大的,我建立了另一个项目专门比较各序列化库的性能: gosercomp 。
新的序列化库的实现也很是简单,只需实现下面两个方法便可:
funcNewXXXXXServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { …… } funcNewXXXXXClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { …… }
编码库负责marshal/unmarshal Reuqest/Response, 包括消息中的Header和Body。若是你想,你也能够对Header和Body实现不一样的编码。
目前提供了两种注册中心:
注册中心的配置只需在服务器初始化的时候增长如下代码,服务的实现无需作任何的改动,也不须要额外的配置。
plugin := &ZooKeeperRegisterPlugin{ ServiceAddress: "tcp@127.0.0.1:1234", ZooKeeperServers: []string{"127.0.0.1:2181"}, BasePath: "/betterrpc", metrics: metrics.NewRegistry(), Services: make([]string,1), updateInterval: time.Minute, } server.PluginContainer.Add(plugin)
其中ServiceAddress为本机(Server)要暴露给Client地址。由于ZooKeeper的节点名不支持"/",因此此处用"@"代替"://"。
ZooKeeperServers为ZK集群的地址。
BasePath为一个服务组,此组下的服务对于Client都是可见的。
etcd能够经过TTL判断服务器的存活,另外此插件也会定时把调用次数定时更新到etcd。
此插件可使用下面的代码配置:
plugin := &EtcdRegisterPlugin{ ServiceAddress: "tcp@127.0.0.1:1234", EtcdServers: []string{"http://127.0.0.1:2379"}, BasePath: "/betterrpc", metrics: metrics.NewRegistry(), Services: make([]string,1), updateInterval: time.Minute, } server.PluginContainer.Add(plugin)
注意注册中心插件必须在配置服务以前设置,不然注册中心没法获取要注册的服务信息。
当前rpcx为server提供了如下扩展点:
你能够根据这些扩展点编写本身的插件,只需实现相应的接口便可。定义的接口你能够看godoc的IXXXXXXPlugin的定义。
上面介绍的注册中心就是经过插件的方式实现。同时rpcx还实现了其它的插件,以下面的介绍。
负载均衡是经过不一样的ClientSelector来实现的。
负载均衡器 | 功能描述 |
---|---|
DirectClientSelector | 点对点的直连,客户端直接链接一个服务器 |
MultiClientSelector | 多对多的直连,一个客户端能够从一组固定的服务器中选择一个直连,无需注册中心 |
ZooKeeperClientSelector | 从ZK注册中心选择一个服务器链接 |
EtcdClientSelector | 从Etcd注册中心选择一个服务器链接 |
一个Selector须要实现ClientSelector接口:
typeClientSelectorinterface{ Select(clientCodecFunc ClientCodecFunc) (*rpc.Client, error) }
Client的序列化方式必须和服务器的序列化方式保持一致。
Client提供了两种容错方式: Failfast
、 Failover
、 Failtry
:
对于多个服务器,重选发送支持:
Client的扩展点以下:
点对点的实现和Go net/rpc的使用基本一致。
packagemain import"github.com/smallnest/rpcx" typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } typeArithint func(t *Arith) Mul(args *Args, reply *Reply) error { reply.C = args.A * args.B returnnil } func(t *Arith) Error(args *Args, reply *Reply) error { panic("ERROR") } funcmain() { server := rpcx.NewServer() server.RegisterName("Arith",new(Arith)) server.Serve("tcp","127.0.0.1:8972") }
同步方式:
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { s := &rpcx.DirectClientSelector{Network: "tcp", Address:"127.0.0.1:8972", Timeout:10* time.Second} client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
异步方式(经过Channel得到执行结果):
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { s := &rpcx.DirectClientSelector{Network: "tcp", Address:"127.0.0.1:8972", Timeout:10* time.Second} client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply divCall := client.Go("Arith.Mul", args, &reply,nil) replyCall := <-divCall.Done // will be equal to divCall ifreplyCall.Error !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, replyCall.Error) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
这里例子启动了两个服务器,其中一个服务器故意将 7 * 8
计算成 560
,以便和另一个服务器进行区分,咱们能够观察计算结果。
packagemain import"github.com/smallnest/rpcx" typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } typeArithint func(t *Arith) Mul(args *Args, reply *Reply) error { reply.C = args.A * args.B returnnil } func(t *Arith) Error(args *Args, reply *Reply) error { panic("ERROR") } typeArith2int func(t *Arith2) Mul(args *Args, reply *Reply) error { reply.C = args.A * args.B *10 returnnil } func(t *Arith2) Error(args *Args, reply *Reply) error { panic("ERROR") } funcmain() { server1 := rpcx.NewServer() server1.RegisterName("Arith",new(Arith)) server1.Start("tcp","127.0.0.1:8972") server2 := rpcx.NewServer() server2.RegisterName("Arith",new(Arith2)) server2.Serve("tcp","127.0.0.1:8973") }
随机选取服务器的例子:
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" "github.com/smallnest/rpcx/clientselector" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { server1 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8972"} server2 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8973"} servers := []clientselector.ServerPair{server1, server2} s := clientselector.NewMultiClientSelector(servers, rpcx.RandomSelect,10*time.Second) fori :=0; i <10; i++ { callServer(s) } } funccallServer(s rpcx.ClientSelector) { client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
RoundRobin选取服务器的例子
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" "github.com/smallnest/rpcx/clientselector" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { server1 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8972"} server2 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8973"} servers := []clientselector.ServerPair{server1, server2} s := clientselector.NewMultiClientSelector(servers, rpcx.RoundRobin,10*time.Second) fori :=0; i <10; i++ { callServer(s) } } funccallServer(s rpcx.ClientSelector) { client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
Failover
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" "github.com/smallnest/rpcx/clientselector" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { server1 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8972"} server2 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8973"} server3 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8974"} servers := []clientselector.ServerPair{server1, server2, server3} s := clientselector.NewMultiClientSelector(servers, rpcx.RoundRobin,10*time.Second) fori :=0; i <10; i++ { callServer(s) } } funccallServer(s rpcx.ClientSelector) { client := rpcx.NewClient(s) client.FailMode = rpcx.Failover args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
rpcx基于Go net/rpc框架实现,它的插件机制并不会带来多少性能的损失,以下面的测试,rpcx性能和官方的Go net/rpc持平。
[root@localhostrpcx]# go test -bench . -test.benchmem PASS BenchmarkNetRPC_gob-1610000018742ns/op321B/op9allocs/op BenchmarkNetRPC_jsonrpc-1610000021360ns/op1170B/op31allocs/op BenchmarkNetRPC_msgp-1610000018617ns/op776B/op35allocs/op BenchmarkRPCX_gob-1610000018718ns/op320B/op9allocs/op BenchmarkRPCX_json-1610000021238ns/op1170B/op31allocs/op BenchmarkRPCX_msgp-1610000018635ns/op776B/op35allocs/op BenchmarkRPCX_gencodec-1610000018454ns/op4485B/op17allocs/op BenchmarkRPCX_protobuf-1610000017234ns/op733B/op13allocs/op
参考资料:
RPCX: http://www.tuicool.com/m/articles/vYB3euv
go-micro: https://github.com/micro/go-micro
https://blog.micro.mu/2016/03/28/go-micro.html