gRPC源码/负载均衡

本文以roundRobin为例介绍gRPC负载均衡实现。git

代码

https://github.com/messixukej...
在liangzhiyang/annotate-grpc-go基础上补充了部分注释github

关键interface

负载均衡器:
type Balancer interface{
    //启动负载均衡器,dialing的时候调用。
    Start(target string, config BalancerConfig) error
  //通知负载均衡器由新地址链接ok
    Up(addr Address) (down func(error))
    //获取下一个有效的负载均衡地址
    Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
    //地址更新通知channel,这里返回的是全量地址
    Notify() <-chan[]Address
    Close() error
}

//地址解析器接口
type Resolver interface{
    //解析器Resolver 用于建立 服务地址watcher。
    Resolve(target string) (Watcher, error)
}

//服务地址发现器接口
//watcher用于观察服务地址的变化
type Watcher interface{
    //阻塞接口,知道有服务地址变化或者错误发生
    Next() ([]*Update, error)
    
    Close()
}

关键数据结构

type testWatcher struct{
//用于接收地址更新
update chan*naming.Update

//用于表示多少个更新发生
side chan int

//用于通知地址注入者更新读已结束
readDone chanint
}

实现流程(以RoundRobin为例)

一、注入负载均衡规则数据结构

使用Dial时,使用WithBalance注入负载均衡规则
func WithBalancer(b Balancer) DialOption {
    return func(o *dialOptions) {
        o.balancer = b
    }
}

例如,这里注入了RoundRobin 轮寻规则。
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))

二、 启动负载均衡器负载均衡

roundRobin.Start -> testNameResolver.Resolve 生成服务地址发现器 -> 创建服务地址发现任务 watchAddrUpdates(新的goroutine循环监控地址)

三、服务地址发现器(开启负载均衡后,无论是初始化仍是过程当中的变化,都是经过服务地址发现器来获取服务端地址)ide

roundRobin.watchAddrUpdates ->阻塞在watcher.Next获取地址变动动做 -> 更新roundRobin.addrs -> 写入roundRobin.addrCh

3.一、开工过程,根据balancer.Notify获取服务端地址 -> 根据地址列表建立链接resetAddrConn
地址注入方式:Resolve中将服务端初始地址注入code

3.二、动态变动地址 ClientConn.lbWatcher,根据balancer.Notify获取服务端地址 -> 新增地址resetAddrConn,删除地址tearDown。
地址注入方式:testWatcher.inject接口

3.3 建立链接细化resetAddrConn->resetTransport->roundRobin.Uprpc

3.4 删除链接细化tearDown->roundRobin.downget

四、提交记录string

Invoke->getTransport->balancer.Get获取有效地址(策略:根据connected状态,循环找到有效地址。若是找不到则阻塞在waitCh)

waitCh阻塞解除由3.3触发
相关文章
相关标签/搜索