micro.newService()中newOptionsnode
func newOptions(opts ...Option) Options { opt := Options{ Auth: auth.DefaultAuth, Broker: broker.DefaultBroker, Cmd: cmd.DefaultCmd, Config: config.DefaultConfig, Client: client.DefaultClient, Server: server.DefaultServer, Store: store.DefaultStore, Registry: registry.DefaultRegistry, Router: router.DefaultRouter, Runtime: runtime.DefaultRuntime, Transport: transport.DefaultTransport, Context: context.Background(), Signal: true, } for _, o := range opts { o(&opt) } return opt }
初始化了一堆基础设置,来看看Client, client.DefaultClient,
这里不要直接去看client/client.go中的newRpcClient()
,由于在micro/defaults.go中已经初始化了client,默认是grpcclient.DefaultClient = gcli.NewClient()
golang
func newClient(opts ...client.Option) client.Client { options := client.NewOptions() // default content type for grpc options.ContentType = "application/grpc+proto" for _, o := range opts { o(&options) } rc := &grpcClient{ opts: options, } rc.once.Store(false) rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c := client.Client(rc) // wrap in reverse for i := len(options.Wrappers); i > 0; i-- { c = options.Wrappers[i-1](c) } return c } func NewClient(opts ...client.Option) client.Client { return newClient(opts...) }
这里作了如下事情web
type grpcClient struct { opts client.Options pool *pool once atomic.Value } // Client is the interface used to make requests to services. // It supports Request/Response via Transport and Publishing via the Broker. // It also supports bidirectional streaming of requests. type Client interface { Init(...Option) error Options() Options NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error String() string }
newClient()中的c := client.Client(rc)
说明一下,segmentfault
grpcClient只有3个属性,client.Client(rc)调用一下,让rc变转Client的实例,在grpc.go中也能够看到grpcClient实现了Client定义的所有方法缓存
结合实例看example/client/main.go中call()
,发起请求app
func call(i int, c client.Client) { // Create new request to service go.micro.srv.example, method Example.Call req := c.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ Name: "John", }) // create context with metadata ctx := metadata.NewContext(context.Background(), map[string]string{ "X-User-Id": "john", "X-From-Id": "script", }) rsp := &example.Response{} // Call service if err := c.Call(ctx, req, rsp); err != nil { fmt.Println("call err: ", err, rsp) return } fmt.Println("Call:", i, "rsp:", rsp.Msg) } func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...) } func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { var opts client.RequestOptions for _, o := range reqOpts { o(&opts) } // set the content-type specified if len(opts.ContentType) > 0 { contentType = opts.ContentType } return &grpcRequest{ service: service, method: method, request: request, contentType: contentType, opts: opts, } }
NewRequest()
最终获得一个grpcRequest{}实例,而后调用grpcClient.Call()
ctx能够附加信息(放在请求的header中),在请求周期中均可获取,trace,auth等组件均可以使用dom
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { if req == nil { return errors.InternalServerError("go.micro.client", "req is nil") } else if rsp == nil { return errors.InternalServerError("go.micro.client", "rsp is nil") } // make a copy of call opts callOpts := g.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) defer cancel() } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := client.WithRequestTimeout(time.Until(d)) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // make copy of call method gcall := g.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { gcall = callOpts.CallWrappers[i-1](gcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // lookup the route to send the reques to route, err := g.lookupRoute(req, callOpts) if err != nil { return err } // pass a node to enable backwards compatability as changing the // call func would be a breaking change. // todo v3: change the call func to accept a route node := ®istry.Node{Address: route.Address} // make the call err = gcall(ctx, node, req, rsp, callOpts) // record the result of the call to inform future routing decisions g.opts.Selector.Record(*route, err) // try and transform the error to a go-micro error if verr, ok := err.(*errors.Error); ok { return verr } return err } ch := make(chan error, callOpts.Retries+1) var gerr error for i := 0; i <= callOpts.Retries; i++ { go func(i int) { ch <- call(i) }(i) select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr }
这里作了如下事情函数
声明call()函数,oop
调用g.lookupRoute()
,返回router.Route{}fetch
opts.Router.Lookup(query...) ->query()(router/default.go)
r.table = newTable(r.fetchRoutes)
设置了table.fetchRoutes。定义registry.Node,调用gcall(),即grpcClient.call()
定义重试次数(默认1),调用第5步的call()
这就是一次call请求的流程,这里没有深刻底层grpc是如何发起请求的,感兴趣的同窗能够查阅下代码
若有错漏,请留言告知,Thanks♪(・ω・)ノ
go micro 分析系列文章
go micro server 启动分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 网关鉴权
go micro 链路追踪
go micro 熔断与限流
go micro wrapper 中间件
go micro metrics 接入Prometheus、Grafana