DialOptions 是最重要的一环,负责配置每一次 rpc 请求的时候的一应选择。 git
先来看看这个的结构
连接github
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor
cp Compressor
dc Decompressor
bs backoff.Strategy
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
authority string
copts transport.ConnectOptions
callOptions []CallOption
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
channelzParentID int64
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
healthCheckFunc internal.HealthChecker
minConnectTimeout func() time.Duration defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string } 复制代码
因为命名很是规范,加上注释很容易看懂每个 field 配置的哪一条属性。若是掠过看的 大概有 压缩解压器,超时阻塞设置,认证安全转发,负载均衡,服务持久化的信息存储 ,配置,心跳检测等。安全
这里是 grpc 设计较好的地方,经过函数设置,同时设有生成函数的函数。什么意思呢?首先结合图来理解,这也是整个 grpc 设置的精华部分app
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
···
for _, opt := range opts {
opt.apply(&cc.dopts)
}
···
}
复制代码
这里的 options 能够说是 client 发起 rpc 请求的核心中转站。
另外一个重要的接口,同时也集中在 dialOptions 结构体中初始化处理的是 callOptions []CallOption
负载均衡
CallOption 是一个接口,定义在 rpc_util 包内 ide
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
// before is called before the call is sent to any server. If before
// returns a non-nil error, the RPC fails with that error.
before(*callInfo) error
// after is called after the call has completed. after cannot return an
// error, so any failures should be reported via output parameters.
after(*callInfo)
}
复制代码
操做的是 callInfo 结构里的数据,其被包含在 dialOptions
结构体中,
即每一次 dial 的时候进行调用。svg
同时它自身定义颇有意思,操做的是 callInfo
结构体函数
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
compressorType string
failFast bool
stream ClientStream
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
}
复制代码
能够看到 callInfo 中字段用来表示 单次调用中独有的自定义选项如 压缩,流控,认证,编解码器等。ui
简单看一个 CallOption 接口的实现
// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
return HeaderCallOption{HeaderAddr: md}
}
// HeaderCallOption is a CallOption for collecting response header metadata.
// The metadata field will be populated *after* the RPC completes.
// This is an EXPERIMENTAL API.
type HeaderCallOption struct {
HeaderAddr *metadata.MD
}
func (o HeaderCallOption) before(c *callInfo) error { return nil }
func (o HeaderCallOption) after(c *callInfo) {
if c.stream != nil {
*o.HeaderAddr, _ = c.stream.Header()
}
}
复制代码
重点看到,实际操做是在 before 和 after 方法中执行,它们会在 Client 发起请求的时候自动执行,顾名思义,一个在调用前执行,一个在调用后执行。
这里能够看出,这里也是经过函数返回一个拥有这两个方法的结构体,注意这一个设计,能够做为你本身的 Option 设计的时候的参考。
有两种方法让 Client 接受你的 CallOption 设置
// WithDefaultCallOptions returns a DialOption which sets the default
// CallOptions for calls over the connection.
func WithDefaultCallOptions(cos ...CallOption) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.callOptions = append(o.callOptions, cos...)
})
}
复制代码
有没有感受有点很差理解?给大家一个实例
这里假设 咱们设置了一个 mycodec 的译码器。立刻下面解释它的设计。
值得注意的是, 我好像只提到了在 Client 调用时设置,callOption 只在客户端设置的状况是否是让你们感到困惑。
实际上 gRPC server 端会自动检测 callOption 的设置,并检测本身是否支持此项选择,若是不支持则会返回失败。也就是说,在 Server 端注册的全部 Codec 译码器以后,Client 直接使用相应的设置就行了。
在 gRPC 中 Codec 有两个接口定义,一个是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另外一个是拥有名字的 Codec 定义在 encoding 包内,这是因为在注册 registry 的时候会使用到这个方法。
type Codec interface {
// Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the wire format into v.
Unmarshal(data []byte, v interface{}) error
// String returns the name of the Codec implementation. This is unused by
// gRPC.
String() string
}
复制代码
就是这个方法
// RegisterCodec registers the provided Codec for use with all gRPC clients and
// servers.
//
// The Codec will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the Codec. This
// is case-insensitive, and is stored and looked up as lowercase. If the
// result of calling Name() is an empty string, RegisterCodec will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Compressors are
// registered with the same name, the one registered last will take effect.
func RegisterCodec(codec Codec) {
if codec == nil {
panic("cannot register a nil Codec")
}
if codec.Name() == "" {
panic("cannot register Codec with empty string result for Name()")
}
contentSubtype := strings.ToLower(codec.Name())
registeredCodecs[contentSubtype] = codec
}
复制代码
同时 encoding 包中还定义了 Compressor 接口,参照 Codec 理解便可。
// Compressor is used for compressing and decompressing when sending or
// receiving messages.
type Compressor interface {
// Compress writes the data written to wc to w after compressing it. If an
// error occurs while initializing the compressor, that error is returned
// instead.
Compress(w io.Writer) (io.WriteCloser, error)
// Decompress reads data from r, decompresses it, and provides the
// uncompressed data via the returned io.Reader. If an error occurs while
// initializing the decompressor, that error is returned instead.
Decompress(r io.Reader) (io.Reader, error)
// Name is the name of the compression codec and is used to set the content
// coding header. The result must be static; the result cannot change
// between calls.
Name() string
}
复制代码
这个包对应 context 中的 Value field 也就是 key-value 形式的存储
type MD map[string][]string
复制代码
实现了完善的存储功能,从单一读写到批量(采用 pair 模式,...string 做为参数,len(string)%2==1 时会报错,因为会有孤立的没有配对的元信息。
另外几个函数是实现了从 context 中的读取和写入(这里的写入是 使用 context.WithValue 方法,即生成 parent context 的 copy。
type mdIncomingKey struct{}<br />
type mdOutgoingKey struct{}