base in https://github.com/grpc/grpc-...
摘要了一部分文件目录,用来描述在grpc中不一样目录层级的主要做用。
grpc ├── 顶层目录(package grpc, 主要包含一些grpc提供的接口文件和涉及到具体实现的一些包装器文件 ├── clientconn.go // grpc接口文件,主要提供 Dial 接口。 ├── balancer_conn_wrappers.go // 各类包装器 *_wrappers ├── resolver_conn_wrapper.go ├── balancer │ ├── balancer.go ├── resolver // 次级目录(主要用于描述接口 │ └── resolver.go //resolver的接口文件 ├── internal // 内部目录(主要提供各类具体实现 │ ├── backoff │ │ └── backoff.go //退避策略的具体实现 │ ├── buffer │ │ ├── unbounded.go //内部提供的一些组件 │ ├── resolver │ │ ├── dns │ │ │ ├── dns_resolver.go //dns_resolver的实现 *_resolver.go
文件层级 | ||
---|---|---|
顶层目录 | 主要提供grpc接口以及各类包装器文件 | grpc.Dial() *_wrapper.go |
次级目录 | 这里主要是提供grpc的一些功能组件定义,一般是接口文件 | type Resolver interface {} |
内部目录 | 这里主要提供功能组件的具体实现 | dns_resolver.go |
在grpc中咱们会看到不少相似以下这种代码, 通常后面会须要接收参数 opts ...Option, 这种接口方式被称为选项模式(options-pattern ,主要是为了构建接口提供灵活的可选项下面咱们用本身的伪代码模拟一次这种逻辑(摘自 https://github.com/pojol/brai...git
// 配置项 type config struct { Tracing bool } // 配置Option的包装函数 type Option func(*Server) // 添加开启tracing的可选项 func WithTracing() Option { return func(r *Server) { r.cfg.Tracing = true } } // 使用可选项进行构建 func New(name string, opts ...Option) IServer { const ( defaultTracing = false ) server = &Server{ cfg: config{ Tracing: defaultTracing, // 进行默认的初始化赋值 }, } // 查看是否有可选项,若是有则使用可选项将默认值覆盖。 for _, opt := range opts { opt(server) } }
总结 经过这种options模式,能够没必要每次定义全部的选项,只需选择本身想要的改动便可。
grpc中使用Wrapper把接口的实现和其依赖的对象聚合到一块儿,经过水平组合的方式完成一些接口的实现。
type ccResolverWrapper struct { cc *ClientConn // 包含了 ClientConn resolverMu sync.Mutex resolver resolver.Resolver // 包含了 Resolver interface done *grpcsync.Event // 完成事件(这个下面有详细解释 curState resolver.State // 状态 pollingMu sync.Mutex // 轮询锁 polling chan struct{} // 一个channel主要用于判断是否处于轮询中 }
上面是一个Wrapper的结构,它主要包含了ClientConn的指针,以及Resolver接口,另外还包含了一些自身逻辑须要的状态和锁它主要实现了resolver.ClientConn interface, 使用这个包装器主要是为了
聚合
前面的那些组件,完成一些须要相互依赖调度的逻辑。不过这未必是值得借鉴的,这里先简单路过一下。github
如上图所示,咱们使用了Resolver来展现grpc是如何使用插件式编程方式组织代码的。编程
- 接口定义文件 resolver.go
// Resolver 构建器的定义 type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string } // 名字解析 Resolver 提供的接口定义 type Resolver interface {} // 注册不一样的resolver实现 func Register(b Builder) {} // 经过scheme获取相关的resolver实现 func Get(scheme string) Builder {}
- 实现文件 internal/resolver/dns/dns_resolver.go
// 经过init函数,将实现注册到resolver func init() { resolver.Register(NewBuilder()) } // 实现resolver.Builder接口的 Build 函数(在这里进行真正的构建操做 func Build() {} // 返回当前resolver解决的解析样式 func Scheme() string { return "dns" }
- 应用 resolver clientconn.go
// 经过解析用户传入的target 得到scheme cc.parsedTarget = grpcutil.ParseTarget(cc.target) // 经过target的scheme获取对应的resolver.Builder func (cc *ClientConn) getResolver(scheme string) resolver.Builder { for _, rb := range cc.dopts.resolvers { if scheme == rb.Scheme() { return rb } } return resolver.Get(scheme) }
总结 经过以上的关键代码,咱们知道了组件是如何完成接口定义
以及实现
和使用
的
在grpc中有很多的代码是使用这种插件式的方式进行编程,这种编码方式能够方便的隔离实现
,使用户专一在本身的实现上。另外也支持用户编写本身的实现
注册到grpc中。能够阅读 策略模式 & 开闭原则 加深对这种编码形式的理解。安全
主要用于在异步逻辑中判断一次性事件(开关)线程安全,在grpc中不少模块的退出逻辑都依赖于这个Event实现来自 /internal/grpcsync/event.go架构
type Event struct { fired int32 // 用于标记是否被触发 c chan struct{} // 用于发送触发信号 o sync.Once // 保证只被执行一次 } func (e *Event) Fire() bool {} // 触发事件 func (e *Event) Done() <-chan struct{} {} // 被触发信号 func (e *Event) HasFired() bool {} // 是否被触发 // 构建Event func NewEvent() *Event { return &Event{c: make(chan struct{})} } // 模拟使用,建立一个服务,而后这个服务会开启一个goroutine从管道中接收消息来处理业务 // 以下的话能够是一些新节点信息,而后经过done来处理退出的逻辑,当外部关闭这个balancer,会当即通知到这个goroutine而后退出。 func newBalancer() { b := Balancer{ done : NewEvent(), // 构建 } // watcher go func() { for { select { case <- otherCh: //todo case <- b.done.Done(): // 监听到终止信号,退出goroutine。 return } } }() } func (b *Balancer)close() { b.done.Fire() // 触发信号 }
前面有说到grpcsync.Event是用来控制退出逻辑,这里的unbounded则用于多个goroutine之间的消息传递。
这是一个很是不错的channel实践,它不用考虑channel的各类阻塞状况(这里主要是channel溢出的状况。方便了channel的应用。实现来自
/internal/buffer/unbounded.goUnbounded
/internal/transport/transport.gorecvBuffer
这二者的实现逻辑是同样的,只是Unbounded包装的interface{} ,而recvBuffer会被高频调用因此使用了具体的类型recvMsgapp
type Unbounded struct { c chan interface{} backlog []interface{} sync.Mutex } func NewUnbounded() *Unbounded { return &Unbounded{c: make(chan interface{}, 1)} } // 往管道中写入消息(生产端 func (b *Unbounded) Put(t interface{}) { b.Lock() // 判断是否有积压消息,若是没有则直接写入管道后退出 // 若是有,则写入到积压队列中(先进先出队列 if len(b.backlog) == 0 { select { case b.c <- t: b.Unlock() return default: } } b.backlog = append(b.backlog, t) b.Unlock() } func (b *Unbounded) Load() { b.Lock() // 这里主要是判断积压队列是否有消息,若是有则左移一位 // 并将移出的消息,写入channel中。 if len(b.backlog) > 0 { select { case b.c <- b.backlog[0]: b.backlog[0] = nil b.backlog = b.backlog[1:] default: } } b.Unlock() } // 管道的读信号(消费端 func (b *Unbounded) Get() <-chan interface{} { return b.c }
最后宣传一下个人开源框架 https://github.com/pojol/braid 一个轻量的微服务框架目标是帮助用户能够更容易的使用和理解微服务架构。框架