DialOptions 是最重要的一环,负责配置每一次 rpc 请求的时候的一应选择。git
先来看看这个的结构
连接github
// dialOptions configure a Dial call. dialOptions are set by the DialOption // values passed to Dial. type dialOptions struct { unaryInt UnaryClientInterceptor streamInt StreamClientInterceptor chainUnaryInts []UnaryClientInterceptor chainStreamInts []StreamClientInterceptor cp Compressor dc Decompressor bs backoff.Strategy block bool insecure bool timeout time.Duration scChan <-chan ServiceConfig authority string copts transport.ConnectOptions callOptions []CallOption // This is used by v1 balancer dial option WithBalancer to support v1 // balancer, and also by WithBalancerName dial option. balancerBuilder balancer.Builder // This is to support grpclb. resolverBuilder resolver.Builder channelzParentID int64 disableServiceConfig bool disableRetry bool disableHealthCheck bool healthCheckFunc internal.HealthChecker minConnectTimeout func() time.Duration defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string }
因为命名很是规范,加上注释很容易看懂每个 field 配置的哪一条属性。若是掠过看的 大概有 压缩解压器,超时阻塞设置,认证安全转发,负载均衡,服务持久化的信息存储 ,配置,心跳检测等。安全
其一应函数方法都是设置 其中字段的。闭包
这里是 grpc 设计较好的地方,经过函数设置,同时设有生成函数的函数。什么意思呢?首先结合图来理解,这也是整个 grpc 设置的精华部分 app
这里的意思是 , DialOptions 是一个导出接口,实现函数是 apply 同时接受参数 dialOptions 来修改它。
而实际上,是使用 newFuncDialOption 函数包装一个 修改 dialOptions 的方法给 funcDialOption 结构体,在实际 Dial 调用的时候 是使用闭包 调用 funcDialOption 结构体的 apply 方法。
能够在这里看一下 Dial 方法的源码(Dial 调用的是 DialContext
起做用的就是 opt.apply()负载均衡
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), } ··· for _, opt := range opts { opt.apply(&cc.dopts) } ··· }
这里的 options 能够说是 client 发起 rpc 请求的核心中转站。
另外一个重要的接口,同时也集中在 dialOptions 结构体中初始化处理的是 callOptions []CallOption
ide
CallOption 是一个接口,定义在 rpc_util 包内svg
// CallOption configures a Call before it starts or extracts information from // a Call after it completes. type CallOption interface { // before is called before the call is sent to any server. If before // returns a non-nil error, the RPC fails with that error. before(*callInfo) error // after is called after the call has completed. after cannot return an // error, so any failures should be reported via output parameters. after(*callInfo) }
操做的是 callInfo 结构里的数据,其被包含在 dialOptions
结构体中,
即每一次 dial 的时候进行调用。函数
同时它自身定义颇有意思,操做的是 callInfo
结构体ui
// callInfo contains all related configuration and information about an RPC. type callInfo struct { compressorType string failFast bool stream ClientStream maxReceiveMessageSize *int maxSendMessageSize *int creds credentials.PerRPCCredentials contentSubtype string codec baseCodec maxRetryRPCBufferSize int }
能够看到 callInfo 中字段用来表示 单次调用中独有的自定义选项如 压缩,流控,认证,编解码器等。
简单看一个 CallOption 接口的实现
// Header returns a CallOptions that retrieves the header metadata // for a unary RPC. func Header(md *metadata.MD) CallOption { return HeaderCallOption{HeaderAddr: md} } // HeaderCallOption is a CallOption for collecting response header metadata. // The metadata field will be populated *after* the RPC completes. // This is an EXPERIMENTAL API. type HeaderCallOption struct { HeaderAddr *metadata.MD } func (o HeaderCallOption) before(c *callInfo) error { return nil } func (o HeaderCallOption) after(c *callInfo) { if c.stream != nil { *o.HeaderAddr, _ = c.stream.Header() } }
重点看到,实际操做是在 before 和 after 方法中执行,它们会在 Client 发起请求的时候自动执行,顾名思义,一个在调用前执行,一个在调用后执行。
这里能够看出,这里也是经过函数返回一个拥有这两个方法的结构体,注意这一个设计,能够做为你本身的 Option 设计的时候的参考。
有两种方法让 Client 接受你的 CallOption 设置
在 生成 Client 的时候就传递设置。具体以下
// WithDefaultCallOptions returns a DialOption which sets the default // CallOptions for calls over the connection. func WithDefaultCallOptions(cos ...CallOption) DialOption { return newFuncDialOption(func(o *dialOptions) { o.callOptions = append(o.callOptions, cos...) }) }
有没有感受有点很差理解?给大家一个实例
response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))
这里假设 咱们设置了一个 mycodec 的译码器。立刻下面解释它的设计。
值得注意的是, 我好像只提到了在 Client 调用时设置,callOption 只在客户端设置的状况是否是让你们感到困惑。
实际上 gRPC server 端会自动检测 callOption 的设置,并检测本身是否支持此项选择,若是不支持则会返回失败。也就是说,在 Server 端注册的全部 Codec 译码器以后,Client 直接使用相应的设置就行了。
在 gRPC 中 Codec 有两个接口定义,一个是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另外一个是拥有名字的 Codec 定义在 encoding 包内,这是因为在注册 registry 的时候会使用到这个方法。
type Codec interface { // Marshal returns the wire format of v. Marshal(v interface{}) ([]byte, error) // Unmarshal parses the wire format into v. Unmarshal(data []byte, v interface{}) error // String returns the name of the Codec implementation. This is unused by // gRPC. String() string }
就是这个方法
// RegisterCodec registers the provided Codec for use with all gRPC clients and // servers. // // The Codec will be stored and looked up by result of its Name() method, which // should match the content-subtype of the encoding handled by the Codec. This // is case-insensitive, and is stored and looked up as lowercase. If the // result of calling Name() is an empty string, RegisterCodec will panic. See // Content-Type on // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for // more details. // // NOTE: this function must only be called during initialization time (i.e. in // an init() function), and is not thread-safe. If multiple Compressors are // registered with the same name, the one registered last will take effect. func RegisterCodec(codec Codec) { if codec == nil { panic("cannot register a nil Codec") } if codec.Name() == "" { panic("cannot register Codec with empty string result for Name()") } contentSubtype := strings.ToLower(codec.Name()) registeredCodecs[contentSubtype] = codec }
同时 encoding 包中还定义了 Compressor 接口,参照 Codec 理解便可。
// Compressor is used for compressing and decompressing when sending or // receiving messages. type Compressor interface { // Compress writes the data written to wc to w after compressing it. If an // error occurs while initializing the compressor, that error is returned // instead. Compress(w io.Writer) (io.WriteCloser, error) // Decompress reads data from r, decompresses it, and provides the // uncompressed data via the returned io.Reader. If an error occurs while // initializing the decompressor, that error is returned instead. Decompress(r io.Reader) (io.Reader, error) // Name is the name of the compression codec and is used to set the content // coding header. The result must be static; the result cannot change // between calls. Name() string }
这个包对应 context 中的 Value field 也就是 key-value 形式的存储
在其余包中简写是 MD
type MD map[string][]string
实现了完善的存储功能,从单一读写到批量(采用 pair 模式,...string 做为参数,len(string)%2==1 时会报错,因为会有孤立的没有配对的元信息。
另外几个函数是实现了从 context 中的读取和写入(这里的写入是 使用 context.WithValue 方法,即生成 parent context 的 copy。
更为重要的是 在 context 中的读取方式,实际上是 MetaData 结构对应的是 context Value 中的 value 值,而 key 值设为 一个空结构体同时区分输入输入
type mdIncomingKey struct{}<br />
type mdOutgoingKey struct{}