从零开始实现一个RPC框架(二)

前言

上一篇文章里咱们实现了基本的RPC客户端和服务端,此次咱们开始着手实现更上层的功能。篇幅所限,具体的代码实现参见:代码地址git

基础支撑部分

升级版的Client和Server

client实现github

server实现缓存

首先让咱们来从新定义Client和Server:SGClient和SGServer。SGClient封装了上一节定义的RPCClient的操做,提供服务治理的相关特性;SGServer则由上一节定义的RPCServer升级而来,支持服务治理的相关特性。这里的SG(service governance)表示服务治理。 这里直接贴上相关的定义:服务器

type SGClient interface {
	Go(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) (*Call, error)
	Call(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
}
type sgClient struct {
	shutdown  bool
	option    SGOption
	clients   sync.Map //map[string]RPCClient
	serversMu sync.RWMutex
	servers   []registry.Provider
}
type RPCServer interface {
	Register(rcvr interface{}, metaData map[string]string) error
	Serve(network string, addr string) error
	Services() []ServiceInfo
	Close() error
}
type SGServer struct { //原来的RPCServer
	codec      codec.Codec
	serviceMap sync.Map
	tr         transport.ServerTransport
	mutex      sync.Mutex
	shutdown   bool
	Option Option
}
复制代码

拦截器

在以前的文章提到过,咱们须要提供过滤器同样的使用方式,来达到对扩展开放对修改关闭的目标。咱们这里采用高阶函数的方式来定义方切面和法拦截器,首先定义几个切面:网络

//客户端切面
type CallFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error type GoFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call //服务端切面 type ServeFunc func(network string, addr string) error type ServeTransportFunc func(tr transport.Transport) type HandleRequestFunc func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) 复制代码

以上几个是RPC调用在客户端和服务端会通过的几个函数,咱们将其定义为切面,而后再定义对应的拦截器:app

//客户端拦截器
packege client
type Wrapper interface {
	WrapCall(option *SGOption, callFunc CallFunc) CallFunc
	WrapGo(option *SGOption, goFunc GoFunc) GoFunc
}
//f服务端拦截器
package server
type Wrapper interface {
	WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc
	WrapServeTransport(s *SGServer, transportFunc ServeTransportFunc) ServeTransportFunc
	WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc
}
复制代码

这样一来,用户能够经过实现Wapper接口来对客户端或者服务端的行为进行加强,好比将请求参数和结果记录到日志里,动态的修改参数或者响应等等。咱们的框架自身 的相关功能也能够经过Wrapper实现。目前客户端实现了用于封装元数据的MetaDataWrapper和记录请求和响应的LogWrapper;服务端目前在DefaultWrapper实现了用于服务注册、监听退出信号以及请求计数的逻辑。负载均衡

由于go并不提供抽象类的方式,因此对于某些实现类可能并不须要拦截全部切面(好比只拦截Call不想拦截Go),这种状况直接返回参数里的函数对象就能够了。框架

客户端拦截器实现分布式

服务端拦截器实现ide

服务治理部分

服务注册与发现

在这以前,咱们的RPC服务调用都是经过在客户端指定服务端的ip和端口来调用的,这种方式十分简单但也场景十分有限,估计只能在测试或者demo中使用。因此咱们须要提供服务注册和发现相关的功能,让客户端的配置再也不与实际的IP绑定,而是经过独立的注册中心获取服务端的列表,而且可以在服务端节点变动时得到实时更新。

首先定义相关的接口(代码地址):

//Registry包含两部分功能:服务注册(用于服务端)和服务发现(用于客户端)
type Registry interface {
	Register(option RegisterOption, provider ...Provider) //注册
	Unregister(option RegisterOption, provider ...Provider) //注销
	GetServiceList() []Provider //获取服务列表
	Watch() Watcher //监听服务列表的变化
	Unwatch(watcher Watcher) //取消监听
}
type RegisterOption struct {
	AppKey string //AppKey用于惟一标识某个应用
}
type Watcher interface {
	Next() (*Event, error) //获取下一次服务列表的更新
	Close()
}
type EventAction byte
const (
	Create EventAction = iota
	Update
	Delete
)
type Event struct { //Event表示一次更新
	Action    EventAction
	AppKey    string
	Providers []Provider //具体变化的服务提供者(增量而不是全量)
}
type Provider struct { //某个具体的服务提供者
	ProviderKey string // Network+"@"+Addr
	Network     string
	Addr        string
	Meta        map[string]string
}
复制代码

AppKey

咱们使用AppKey这样一个概念来标识某个服务,好比com.meituan.demo.rpc.server。服务端在启动时将自身的相关信息(包括AppKey、ip、port、方法列表等)注册到注册中心;客户端在须要调用时只须要根据服务端的AppKey到注册中心查找便可。

目前暂时只实现了直连(peer2peer)和基于内存(InMemory)的服务注册,后续再接入其余独立的组件如etcd或者zookeeper等等。

InMemory代码实现地址

负载均衡

有了服务注册与发现以后,一个客户端所面对的可能就不仅有一个服务端了,客户端在发起调用前须要从多个服务端中选择一个出来进行实际的通讯,具体的选择策略有不少,好比随机选择、轮询、基于权重选择、基于服务端负载或者自定义规则等等。

这里先给出接口定义:

//Filter用于自定义规则过滤某个节点
type Filter func(provider registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}) bool type SelectOption struct {
	Filters []Filter
}
type Selector interface {
	Next(providers []registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}, opt SelectOption) (registry.Provider, error)
}
复制代码

目前暂时只实现了随机负载均衡,后续再实现其余策略好比轮询或者一致性哈希等等,用户也能够选择实现本身的负载均衡策略。

容错处理

长链接以及网络重连

为了减小频繁建立和断开网络链接的开销,咱们维持了客户端到服务端的长链接,并把建立好的链接(RPCClient对象)用map缓存起来,key就是对应的服务端的标识。客户端在调用前根据负载均衡的结果检索到缓存好的RPCClient而后发起调用。当咱们检索不到对应的客户端或者发现缓存的客户端已经失效时,须要从新创建链接(从新建立RPCClient对象)。

func (c *sgClient) selectClient(ctx context.Context, ServiceMethod string, arg interface{}) (provider registry.Provider, client RPCClient, err error) {
        //根据负载均衡决定要调用的服务端
	provider, err = c.option.Selector.Next(c.providers(), ctx, ServiceMethod, arg, c.option.SelectOption)
	if err != nil {
		return
	}
	client, err = c.getClient(provider)
	return
}

func (c *sgClient) getClient(provider registry.Provider) (client RPCClient, err error) {
	key := provider.ProviderKey
	rc, ok := c.clients.Load(key)
	if ok {
		client := rc.(RPCClient)
		if client.IsShutDown() {
		    //若是已经失效则清除掉
			c.clients.Delete(key)
		}
	}
        //再次检索
	rc, ok = c.clients.Load(key)
	if ok {
	        //已经有缓存了,返回缓存的RPCClient
		client = rc.(RPCClient)
	} else {
	        //没有缓存,新建一个而后更新到缓存后返回
		client, err = NewRPCClient(provider.Network, provider.Addr, c.option.Option)
		if err != nil {
			return
		}
		c.clients.Store(key, client)
	}
	return
}
复制代码

目前的实现当中,每一个服务提供者只有一个对应的RPCClient,后续能够考虑相似链接池的实现,即每一个服务提供者对应多个RPCClient,每次调用前从链接池中取出一个RPCClient。

集群容错

在分布式系统中,异常是不可避免的,当发生调用失败时,咱们能够选择要采起的处理方式,这里列举了常见的几种:

type FailMode byte
const (
	FailFast FailMode = iota //快速失败
	FailOver //重试其余服务器
	FailRetry //重试同一个服务器
	FailSafe //忽略失败,直接返回
)
复制代码

具体实现比较简单,就是根据配置的容错选项和重试次数决定是否重试;其余包括FailBack(延时一段时间后重发)、Fork以及Broadcast等等暂时没有实现。

优雅退出

在收到程序退出信号时,server端会尝试优先处理完当前还未结束的请求,等请求处理完毕以后再退出,当超出了指定的时间(默认12s)仍未处理完毕时,server端会直接退出。

func (s *SGServer) Close() error {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.shutdown = true
	//等待当前请求处理完或者直到指定的时间
	ticker := time.NewTicker(s.Option.ShutDownWait)
	defer ticker.Stop()
	for {
		if s.requestInProcess <= 0 { //requestInProcess表示当前正在处理的请求数,在wrapper里计数
			break
		}
		select {
		case <-ticker.C:
			break
		}
	}
	return s.tr.Close()
}
复制代码

结语

到这里就是此次的所有内容了,总的来讲是在以前的基础上作了封装,预留了后续的扩展点,而后实现了简单的服务治理相关的功能。总结一下,此次咱们在上一篇文章的基础上作了如下改动:

  1. 从新定义了Client和Server的接口
  2. 提供了拦截器(Wrapper接口)
  3. 提供了服务注册与发现以及负载均衡的接口和简单实现
  4. 实现了简单的容错处理
  5. 实现了简单的优雅退出
  6. 增长了gob序列化方式支持(比较简单,文章里并无提到)

历史连接

从零开始实现一个RPC框架(零)

从零开始实现一个RPC框架(一)

相关文章
相关标签/搜索