本文以roundRobin为例介绍gRPC负载均衡实现。git
https://github.com/messixukej...
在liangzhiyang/annotate-grpc-go基础上补充了部分注释github
负载均衡器: type Balancer interface{ //启动负载均衡器,dialing的时候调用。 Start(target string, config BalancerConfig) error //通知负载均衡器由新地址链接ok Up(addr Address) (down func(error)) //获取下一个有效的负载均衡地址 Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) //地址更新通知channel,这里返回的是全量地址 Notify() <-chan[]Address Close() error } //地址解析器接口 type Resolver interface{ //解析器Resolver 用于建立 服务地址watcher。 Resolve(target string) (Watcher, error) } //服务地址发现器接口 //watcher用于观察服务地址的变化 type Watcher interface{ //阻塞接口,知道有服务地址变化或者错误发生 Next() ([]*Update, error) Close() }
type testWatcher struct{ //用于接收地址更新 update chan*naming.Update //用于表示多少个更新发生 side chan int //用于通知地址注入者更新读已结束 readDone chanint }
一、注入负载均衡规则数据结构
使用Dial时,使用WithBalance注入负载均衡规则 func WithBalancer(b Balancer) DialOption { return func(o *dialOptions) { o.balancer = b } } 例如,这里注入了RoundRobin 轮寻规则。 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
二、 启动负载均衡器负载均衡
roundRobin.Start -> testNameResolver.Resolve 生成服务地址发现器 -> 创建服务地址发现任务 watchAddrUpdates(新的goroutine循环监控地址)
三、服务地址发现器(开启负载均衡后,无论是初始化仍是过程当中的变化,都是经过服务地址发现器来获取服务端地址)ide
roundRobin.watchAddrUpdates ->阻塞在watcher.Next获取地址变动动做 -> 更新roundRobin.addrs -> 写入roundRobin.addrCh
3.一、开工过程,根据balancer.Notify获取服务端地址 -> 根据地址列表建立链接resetAddrConn
地址注入方式:Resolve中将服务端初始地址注入code
3.二、动态变动地址 ClientConn.lbWatcher,根据balancer.Notify获取服务端地址 -> 新增地址resetAddrConn,删除地址tearDown。
地址注入方式:testWatcher.inject接口
3.3 建立链接细化resetAddrConn->resetTransport->roundRobin.Uprpc
3.4 删除链接细化tearDown->roundRobin.downget
四、提交记录string
Invoke->getTransport->balancer.Get获取有效地址(策略:根据connected状态,循环找到有效地址。若是找不到则阻塞在waitCh) waitCh阻塞解除由3.3触发