Client 主要是用来执行请求服务和订阅发布事件。是对于broker,Transort的一种封装方便使用。node
Init
初始化客户端函数segmentfault
func (r *rpcClient) Init(opts ...Option) error { size := r.opts.PoolSize ttl := r.opts.PoolTTL for _, o := range opts { o(&r.opts) } // update pool configuration if the options changed if size != r.opts.PoolSize || ttl != r.opts.PoolTTL { r.pool.Lock() r.pool.size = r.opts.PoolSize r.pool.ttl = int64(r.opts.PoolTTL.Seconds()) r.pool.Unlock() } return nil }
==Call==
Call是Client接口中最主要的方法,在以前Go Micro Selector 源码分析app
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { // 复制出options callOpts := g.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // 调用next函数 获取selector next, err := g.next(req, callOpts) if err != nil { return err } // 检查context Deadline d, ok := ctx.Deadline() if !ok { // 没有deadline 建立一个新的 ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // 获取到deadline设置context opt := client.WithRequestTimeout(time.Until(d)) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // 复制call函数 在下面的goroutine中使用 gcall := g.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { gcall = callOpts.CallWrappers[i-1](gcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // select next node node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // 调用call 正式调用服务端接口 err = gcall(ctx, node, req, rsp, callOpts) g.opts.Selector.Mark(req.Service(), node, err) return err } ch := make(chan error, callOpts.Retries+1) var gerr error // 重试 for i := 0; i <= callOpts.Retries; i++ { go func(i int) { // 调动call 返回channel ch <- call(i) }(i) select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr }
Stream
Stream跟call的逻辑几乎是同样的,不过stream调用的是rpc_client.stream函数。这边就不过多的分析了函数
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) { // make a copy of call opts callOpts := r.opts.CallOptions for _, opt := range opts { opt(&callOpts) } next, err := r.next(request, callOpts) if err != nil { return nil, err } // should we noop right here? select { case <-ctx.Done(): return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err())) default: } call := func(i int) (Stream, error) { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, request, i) if err != nil { return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } node, err := next() if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) } else if err != nil { return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) } stream, err := r.stream(ctx, node, request, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return stream, err } type response struct { stream Stream err error } ch := make(chan response, callOpts.Retries+1) var grr error for i := 0; i <= callOpts.Retries; i++ { go func(i int) { s, err := call(i) ch <- response{s, err} }(i) select { case <-ctx.Done(): return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err())) case rsp := <-ch: // if the call succeeded lets bail early if rsp.err == nil { return rsp.stream, nil } retry, rerr := callOpts.Retry(ctx, request, i, rsp.err) if rerr != nil { return nil, rerr } if !retry { return nil, rsp.err } grr = rsp.err } } return nil, grr }
Publish
Client中的Publish主要是调用broker中的publish:r.opts.Broker.Publish
然而在client的publish函数中,获取了topic准备了body 最后调用broker的publishoop
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error { options := PublishOptions{ Context: context.Background(), } for _, o := range opts { o(&options) } md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) } id := uuid.New().String() md["Content-Type"] = msg.ContentType() md["Micro-Topic"] = msg.Topic() md["Micro-Id"] = id // set the topic topic := msg.Topic() // get proxy if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { options.Exchange = prx } // get the exchange if len(options.Exchange) > 0 { topic = options.Exchange } // encode message body cf, err := r.newCodec(msg.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } b := &buffer{bytes.NewBuffer(nil)} if err := cf(b).Write(&codec.Message{ Target: topic, Type: codec.Event, Header: map[string]string{ "Micro-Id": id, "Micro-Topic": msg.Topic(), }, }, msg.Payload()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } r.once.Do(func() { r.opts.Broker.Connect() }) return r.opts.Broker.Publish(topic, &broker.Message{ Header: md, Body: b.Bytes(), }) }