gRPC 从学习到生产

视频信息

grpc: From Tutorial to Production by Alan Shreve at GopherCon 2017html

www.youtube.com/watch?v=7FZ…node

博文:about.sourcegraph.com/go/grpc-in-…git

微服务之间应该如何通信?

答案就是:SOAP……好吧,开个玩笑,固然不多是 SOAP 了。github

如今流行的作法是 HTTP + JSON (REST API)golang

Alan 说“若是这辈子不再写另外一个 REST 客户端库的话,那就能够很幸福的死去了……😂”,由于这是最无聊的事情,一遍一遍的在作一样的事情。swift

为何 REST API 很差用?

  • 实现 Stream 太难了
  • 而双向的流就根本不可能
  • 很难对操做创建模型
  • 效率不好,文本表示对于网络来讲并非最好的选择
  • 并且,其实服务内部根本不是 RESTful 的方式,这只是 HTTP endpoint
  • 很难在一个请求中取得多个资源数据 (反例看 GraphQL)
  • 没有正式的(机器可读的)API约束
    • 所以写客户端须要人类
      • 并且由于👷很贵,并且不喜欢写客户端

什么是 gRPC

gPRC 是高性能、开源、通用的 RPC 框架。数组

与其讲解定义,不如来实际作个东西更清楚。浏览器

建一个缓存服务

使用 gRPC 这类东西,咱们并不是开始于写 Go 代码,咱们是从撰写 gRPC 的 IDL 开始的。缓存

app.proto

syntax = "proto3"
package rpc;
service Cache {
  rpc Store(StoreReq) returns (StoreResp) {}
  rpc Get(GetReq) returns (GetResp) {}
}
message StoreReq {
  string key = 1;
  bytes val = 2;
}
message StoreResp {
}
message GetReq {
  string key = 1;
}
message GetResp {
  bytes val = 1;
}
复制代码

当写了这个文件后,咱们马上拥有了 9 种语言的客户端的库。bash

  • C++
  • Java(and Android)
  • Python
  • Go
  • Ruby
  • C#
  • Javascript(node.js)
  • Objective-C (iOS!)
  • PHP

同时,咱们也拥有了 7 种语言的服务端的 API Stub:

  • C++
  • Java
  • Python
  • Go
  • Ruby
  • C#
  • Javascript(node.js)

server.go

func serverMain() {
  if err := runServer(); err != nil {
    fmt.Fprintf(os.Stderr, "Failed to run cache server: %s\n", err)
    os.Exit(1)
  }
}
func runServer() error {
  srv := grpc.NewServer()
  rpc.RegisterCacheServer(srv, &CacheService{})
  l, err := net.Listen("tcp", "localhost:5051")
  if err != nil {
    return err
  }
  //  block
  return srv.Serve(l)
}
复制代码

暂时先不实现 CacheService,先放个空的,稍后再实现。

type CacheService struct {
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
  return nil, fmt.Errorf("unimplemented")
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  return nil, fmt.Errorf("unimplemented")
}
复制代码

client.go

func clientMain() {
  if err != runClient(); err != nil {
    fmt.Fprintf(os.Stderr, "failed: %v\n", err)
    os.Exit(1)
  }
}
func runClient() error {
  //  创建链接
  conn, err := grpc.Dial("localhost:5053", grpc.WithInsecure())
  if err != nil {
    return fmt.Errorf("failed to dial server: %v", err)
  }
  cache := rpc.NewCacheClient(conn)
  //  调用 grpc 的 store() 方法存储键值对 { "gopher": "con" }
  _, err = cache.Store(context.Background(), &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
  if err != nil {
    return fmt.Errorf("failed to store: %v", err)
  }
  //  调用 grpc 的 get() 方法取回键为 `gopher` 的值
  resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
  if err != nil {
    return fmt.Errorf("failed to get: %v", err)
  }
  //  输出
  fmt.Printf("Got cached value %s\n", resp.Val)
  return nil
}
复制代码

这不就是 WSDL 么?

或许有些人会认为这和 WSDL 也太像了,这么想没有错,由于 gRPC 在借鉴以前的 SOAP/WSDL 的错误基础上,也吸收了他们优秀的地方。

  • 和 XML 关系没那么紧(grpc 是可插拔式的,能够换成各类底层表述)
  • 写过 XML/XSD 的人都知道这些服务定义太繁重了,gRPC 没有这个问题
  • WSDL这类有彻底没必要要的复杂度、和基本不须要的功能(两步 commit)
  • WSDL 不灵活、并且没法前向兼容(不像 protobuf
  • SOAP/WSDL 性能太差,以及没法使用流
  • 可是WSDL中的机器能够理解的API定义确实是个好东西

实现具体的 CacheService

server.go

type CacheService struct {
  store map[string][]byte
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
  val := s.store[req.Key]
  return &rpc.GetResp{Val: val}, nil
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  s.store[req.Key] = req.Val
  return &rpc.StoreResp{}, nil
}
复制代码

注意这里没有锁,你能够想一想他们中有,由于未来他们会被并发的调用的。

错误处理

固然,gRPC 支持错误处理。假设改写上面的 Get(),对不存在的键进行报错:

func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
  val, ok := s.store[req.Key]
  if !ok {
    return nil, status.Errorf(code.NotFound, "Key not found %s", req.Key)
  }
  return &rpc.GetResp{Val: val}, nil
}
复制代码

加密传输

若是这样的代码打算去部署的话,必定会被 SRE 拦截下来,由于全部通信必须加密传输。

在 gRPC 中添加 TLS 加密传输很容易。好比咱们修改 runServer() 添加 TLS 加密传输。

func runServer() error {
  tlsCreds, err := credentials.NewServerTLSFromFile("tls.crt", "tls.key")
  if err != nil {
    return err
  }
  srv := grpc.NewServer(grpc.Creds(tlsCreds))
  ...
}
复制代码

一样,咱们也须要修改一下 runClient()。

func runClient() error {
  tlsCreds := credentials.NewTLS(&tls.Config(InsecureSkipVerify: true))
  conn, err := grpc.Dial("localhost:5051", grpc.WithTransportCredentials(tlsCreds))
  ...
}
复制代码

生产环境如何使用 gRPC

  • HTTP/2
  • protobuf serialization (pluggable)
  • 客户端会和 grpc 服务器打开一个长链接
    • 对于每个 RPC 调用都将是一个新的 HTTP/2 stream
    • 容许模拟飞行模式的 RPC 调用
  • 容许客户端 服务端 Streaming

gRPC 的实现

如今有3个高性能的、事件驱动的实现

  • C
    • Ruby, Python, Node.js, PHP, C#, Objective-C, C++ 都是对这个 C core 实现的绑定
    • PHP 则是经过 PECL 和这个实现的绑定
  • Java
    • Netty + BoringSSL 经过 JNI
  • Go
    • 纯 Go 实现,使用了 Go 标准库的 crypto/tls

gRPC 从哪来的

  • 最初是 Google 的一个团队建立的
  • 更早期的是 Google 一个内部项目叫作 stubby
  • 这个 gRPC 是其下一代开源项目,而且如今不只仅是 Google 在使用,不少公司都在贡献代码
    • 固然,Google 仍是主要代码贡献者

生产环境案例:多租户

上线生产后,发现有一部分客户产生了大量的键值,询问得知,有的客户但愿对全部东西都缓存,这显然不是对咱们这个缓存服务很好的事情。

咱们但愿限制这种行为,但对于当前系统而言,没法知足这种需求,所以咱们须要修改实现,对每一个客户发放客户 token,那么咱们就能够约束特定客户最多能够创建多少键值,避免系统滥用。这就成为了多租户的缓存服务。

和以前同样,咱们仍是从 IDL 开始,咱们须要修改接口,增长 account_token 项。

message StoreReq {
  string key = 1;
  bytes val = 2;
  string account_token = 3;
}
复制代码

一样,咱们须要有独立的服务针对帐户服务,来获取帐户所容许的缓存键数:

service Accounts {
  rpc GetByToken(GetByTokenReq) return (GetByTokenResp) {}
}
message GetByTokenReq {
  string token = 1;
}
message GetByTokenResp {
  Account account = 1;
}
message Account {
  int64 max_cache_keys = 1;
}
复制代码

这里创建了一个新的 Accounts 服务,而且有一个 GetByToken() 方法,给入 token,返回一个 Account 类型的结果,而 Account 内有 max_cache_keys 键对应最大可缓存的键值数。

如今咱们进一步修改 client.go

func runClient() error {
  ...
  cache := rpc.NewCacheClient(conn)
  _, err = cache.Store(context.Background(), &rpc.StoreReq{
    AccountToken: "inconshreveable",
    Key:          "gopher",
    Val:          []byte("con"),
  })
  if err != nil {
    return fmt.Errorf("failed to store: %v", err)
  }
  ...
}
复制代码

服务端的改变要稍微大一些,但不过度。

type CacheService struct {
  accounts      rpc.AccountsClient
  store         map[string][]byte
  keysByAccount map[string]int64
}
复制代码

注意这里的 accounts 是一个 grpc 的客户端,由于咱们这个服务,同时也是另外一个 grpc 服务的客户端。因此在接下来的 Store() 实现中,咱们须要先经过 accounts 调用另外一个服务取得帐户信息。

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  //  调用另外一个服务取得帐户信息,包含其键值限制
  resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  if err != nil {
    return nil, err
  }
  //  检查是否超量使用
  if s.keysByAccount[req.AccountToken] >= resp.Account.MaxCacheKeys {
    return nil, status.Errorf(codes.FailedPrecondition, "Account %s exceeds max key limit %d", req.AccountToken, resp.Account.MaxCacheKeys)
  }
  //  若是键不存在,须要新加键值,那么咱们就对计数器加一
  if _, ok := s.store[req.Key]; !ok {
    s.keysByAccount[req.AccountToken] += 1
  }
  //  保存键值
  s.store[req.Key] = req.Val
  return &rpc.StoreResp{}, nil
}
复制代码

生产环境案例:性能

上面的问题解决了,咱们服务又恢复了正常,不会有用户创建过多的键值了。可是很快,咱们就又收到了其余用户发来的新的 issue,不少人反应说新系统变慢了,没有达到 SLA 的要求。

但是咱们根本不知道到底发生了什么,因而意识到了,咱们的程序没有任何可观察性(Observability),换句话说,咱们的程序没有任何计量系统来统计性能相关的数据。

咱们先从最简单的作起,添加日志。

咱们先从 client.go 开始,增长一些测量和计数以及日志输出。

...
//  开始计时
start := time.Now()
_, err = cache.Store(context.Background(), &rpc.StoreReq{
  AccountToken: "inconshreveable",
  Key:          "gopher",
  Val:          []byte("con"),
})
//  计算 cache.Store() 调用时间
log.Printf("cache.Store duration %s", time.Since(start))
if err != nil {
  return fmt.Errorf("failed to store: %v", err)
}
//  再次开始计时
start = time.Now()
//  调用 grpc 的 get() 方法取回键为 `gopher` 的值
resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
//  计算 cache.Get() 调用时间
log.Printf("cache.Get duration %s", time.Since(start))
if err != nil {
  return fmt.Errorf("failed to get: %v", err)
}
复制代码

一样,在服务端也这么处理。

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  //  开始计时
  start := time.Now()
  //  调用另外一个服务取得帐户信息,包含其键值限制
  resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  //  输出 account.GetByToken() 的调用时间
  log.Printf("accounts.GetByToken duration %s", time.Since(start))
  ...
}
复制代码

通过这些修改后,咱们发现同样的事情在反反复复的作,那么有什么办法能够改变这种无聊的作法么?查阅 grpc 文档后,看到有一个叫作 Client Interceptor 的东西。

这至关因而一个中间件,可是是在客户端。当客户端进行 rpc 调用的时候,这个中间件先会被调用,所以这个中间件能够对调用进行一层包装,而后再进行调用。

为了实现这个功能,咱们建立一个新的文件,叫作 interceptor.go:

func WithClientInterceptor() grpc.DialOption {
  return grpc.WithUnaryInterceptor(clientInterceptor)
}
func clientInterceptor(
  ctx context.Context,
  method string,
  req interface{},
  reply interface{},
  cc *grpc.ClientConn,
  invoker grpc.UnaryInvoker,
  opts ...grpc.CallOption,
) error {
  start := time.Now()
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Printf("invoke remote method=%s duration=%s error=%v", method, time.Since(start), err)
  return err
}
复制代码

咱们有了这个 WithClientInterceptor() 以后,能够在 grpc.Dial() 的时候注册进去。 client.go

func runClient() error {
  ...
  conn, err := grpc.Dial("localhost:5051",
    grpc.WithTransportCredentials(tlsCreds),
    WithClientInterceptor())
  ...
}
复制代码

注册以后,全部的 grpc 调用都会通过咱们注册的 clientInterceptor(),所以全部的时间就都有统计了,而不用每一个函数内部反反复复的添加时间、计量、输出。

添加了客户端的这个计量后,天然而然就联想到服务端是否是也能够作一样的事情?通过查看文档,能够,有个叫作 Server Interceptor 的东西。

一样的作法,咱们在服务端添加 interceptor.go,而且添加 ServerInterceptor() 函数。

func ServerInterceptor() grpc.ServerOption {
  return grpc.UnaryInterceptor(serverInterceptor)
}
func serverInterceptor(
  ctx context.Context,
  req interface{},
  info *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (interface{}, error) {
  start := time.Now()
  resp, err := handler(ctx, req)
  log.Printf("invoke server method=%s duration=%s error=%v",
    info.FullMethod,
    time.Since(start),
    err)
  return resp, err
}
复制代码

和客户端同样,须要在 runServer() 的时候注册咱们定义的这个中间件。

func runServer() error {
  ...
  srv := grpc.NewServer(grpc.Creds(tlsCreds), ServerInterceptor())
  ...
}
复制代码

生产环境案例:超时

添加了日志后,咱们终于在日志中发现,/rpc.Accounts/GetByToken/ 花了好长的时间。咱们须要对这个操做设置超时。 server.go

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  accountsCtx, _ := context.WithTimeout(context.Background(), 2 * time.Second)
  resp, err := s.accounts.GetByToken(accountsCtx, &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  ...
}
复制代码

这里操做很简单,直接使用标准库中 context.WithTimeout() 就能够了。

生产环境案例:上下文传递

通过上面修改后,客户依旧抱怨说没有知足 SLA,仔细一想也对。就算这里约束了 2 秒钟,客户端调用还须要时间,别的代码在中间也有时间开销。并且有的客户说,咱们这里须要1秒钟,而不是2秒钟。

好吧,让咱们把这个时间设定推向调用方。

首先咱们要求在客户端进行调用时间约束的设定: client.go

func runClient() error {
  ...
  ctx, _ := context.WithTimeout(context.Background(), time.Second)
  _, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
  ...
  ctx, _ = context.WithTimeout(context.Background(), 50*time.Millisecond)
  resp, err := cache.Get(ctx, &rpc.GetReq{Key: "gopher"})
  ...
}
复制代码

而后在服务端,咱们将上下文传递。直接取调用方的 ctx。

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  ...
}
复制代码

生产环境案例:GRPC Metadata

上面的问题都解决了,终于能够松一口气了。但是客户又提新的需求了……😅,说咱们能不能增长一个 Dry Run 的标志,就是说我但愿你作全部须要作的事情,除了真的修改键值库。

GRPC metadata,也称为 GRPC 的 Header。就像 HTTP 头同样,能够有一些 Metadata 信息传递过来。使用 metadata,可让咱们的 Dry Run 的实现变得更简洁,没必要每一个 RPC 方法内都实现一遍检查 Dry Run 标志的逻辑,咱们能够独立出来。

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  if !dryRun(ctx) {
    if _, ok := s.store[req.Key]; !ok {
      s.keysByAccount[req.AccountToke] += 1
    }
    s.store[req.Key] = req.Val
  }
  return &rpc.StoreResp{}, nil
}
func dryRun(ctx context.Context) bool {
  md, ok := metadata.FromContext(ctx)
  if !ok {
    return false
  }
  val, ok := md["dry-run"]
  if !ok {
    return false
  }
  if len(val) < 1 {
    return false
  }
  return val[0] == "1"
}
复制代码

固然,这么作是有妥协的,由于通用化后就失去了类型检查的能力。

在客户端调用的时候,则须要根据状况添加 dry-run 参数给 metadata。

func runClient() error {
  ...
  ctx, _ := context.WithTimeout(context.Background(), time.Second)
  ctx = metadata.NewContext(ctx, metadata.Pairs("dry-run", "1"))
  _, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
  ...
}
复制代码

生产环境案例:Retry

实现了 Dry Run 觉得能够休息了,以前抱怨慢的客户又来抱怨了,虽然有超时控制,知足 SLA,可是服务那边仍是慢,总超时不成功。检查了一下,发现是网络上的事情,咱们没有太多能够作的事情。为了解决客户的问题,咱们来添加一个重试的机制。

咱们能够对每个 gRPC 调用添加一个 Retry 机制,咱们也能够像以前计时统计那样,使用 Interceptor 吧?

func clientInterceptor(...) error {
  var (
    start     = time.Now()
    attempts  = 0
    err       error
    backoff   retryBackOff
  )
  for {
    attempts += 1
    select {
    case <-ctx.Done():
      err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
    case <-backoff.Next():
      startAttempt := time.Now()
      err = invoker(ctx, method, req, reply, cc, opts...)
      if err != nil {
        log.Printf(...)
        continue
      }
    }
    break
  }
  log.Printf(...)
  return err
}
复制代码

看起来还不错,而后就打算发布这个代码了。结果提交审核的时候被打回来了,说这个代码不合理,由于若是是非幂等(non-idempotent) 的操做,这样就会致使屡次执行,改变指望结果了。

看来咱们得针对幂等和非幂等操做区别对待了。

silo.FireZeMissiles(NotIdempotent(ctx), req)
复制代码

嗯,固然,没这个东西。因此咱们须要本身来创造一个标记,经过 context,来标明操做是否幂等。

func NotIdempotent(ctx context.Context) context.Context {
  return context.WithValue(ctx, "idempotent", false)
}
func isIdempotent(ctx context.Context) bool {
  val, ok := ctx.Value("idempotent").(bool)
  if !ok {
    return true
  }
  return val
}
复制代码

而后在咱们的 clientInterceptor() 实现中加入 isIdempotent() 判断:

func clientInterceptor(...) error {
  var (
    start     = time.Now()
    attempts  = 0
    err       error
    backoff   retryBackOff
  )
  for {
    attempts += 1
    select {
    case <-ctx.Done():
      err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
    case <-backoff.Next():
      startAttempt := time.Now()
      err = invoker(ctx, method, req, reply, cc, opts...)
      if err != nil && isIdempotent(ctx) {
        log.Printf(...)
        continue
      }
    }
    break
  }
  log.Printf(...)
  return err
}
复制代码

这样当调用失败后,客户端检查发现是幂等的状况,才重试,不然不重试。避免了非幂等操做的反复操做。

生产环境案例:结构化错误

感受没啥问题了,因而部署上线了。但是运行一段时间后,发现有些不对劲。全部成功的RPC调用,也就是说这个操做自己是正确的,都没有问题,超时重试也正常。可是全部失败的 RPC 调用都不对了,全部失败的 RPC 调用,都返回超时,而不是错误自己。这里说的失败,不是说网络问题致使超时啥的,而是说请求自己的失败,好比以前提到的,Get() 不存在的键,应该返回错误;或者 Store() 超过了配额,应该返回错误,这类错误在日志中都没看到,反而都对应了超时。

通过分析发现,服务端该报错都报错,没啥问题,可是客户端不对,本应该返回错误给调用方的地方,客户端代码反而又开始重试这个操做了。看来以前重试的代码还有问题。

err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil && isIdempotent(ctx) {
  log.Printf(...)
  continue
}
复制代码

若是仔细观察这部分代码,会发现,不管 err 是什么,只要非 nil,咱们就重试。其实这是不对的,咱们只有针对某些错误重试,好比网络问题之类的,而不该该对咱们但愿返回给调用方的错误重试,那没有意义。

那么问题就变成了,咱们到底应该怎么对 err 判断来决定是否重试?

  • 可使用不一样的 Error Code,特定的 Code 须要 Retry,其它的不须要,那就须要自定义 gRPC 错误码;
  • 咱们也能够定义一个 Error 类型的数据,里面包含了某种标志位,来告知是否值得 retry
  • 或者干脆把错误码放到 Response 的消息里,确保每一个消息都有一个咱们定义的错误码,来标明是否须要 retry。

因此,咱们须要的是一个完整的结构化的错误信息,而不是简单的一个 Error Code 和字符串。固然这条路很差走,可是咱们已经作了这么多了,坚持一下仍是能够克服的。

这里咱们仍是从 IDL 开始:

message Error {
  int64 code = 1;
  string messsage = 2;
  bool temporary = 3;
  int64 userErrorCode = 4;
}
复制代码

而后咱们实现这个 Error 类型。 rpc/error.go

func (e *Error) Error() string {
  return e.Message
}
func Errorf(code codes.Code, temporary bool, msg string, args ..interface{}) error {
  return &Error{
    Code:      int64(code),
    Message:   fmt.Sprintf(msg, args...),
    Temporary: temporary,
  }
}
复制代码

有这两个函数,咱们能够显示和构造这个 Error 类型的变量了,可是咱们该怎么把错误消息传回客户端呢?而后问题就开始变的繁琐起来了: rpc/error.go

func MarshalError (err error, ctx context.Context) error {
  rerr, ok := err.(*Error)
  if !ok {
    return err
  }
  pberr, marshalerr := pb.Marshal(rerr)
  if marshalerr == nil {
    md := metadata.Pairs("rpc-error", base64.StdEncoding.EncodeToString(pberr))
    _ = grpc.SetTrailer(ctx, md)
  }
  return status.Errorf(codes.Code(rerr.Code), rerr.Message)
}
func UnmarshalError(err error, md metadata.MD) *Error {
  vals, ok := md["rpc-error"]
  if !ok {
    return nil
  }
  buf, err := base64.StdEncoding.DecodeString(vals[0])
  if err != nil {
    return nil
  }
  var rerr Error
  if err := pb.Unmarshal(buf, &rerr); err != nil {
    return nil
  }
  return &rerr
}
复制代码

interceptor.go

func serverInterceptor (
  ctx context.Context,
  req interface{},
  info *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (interface{}, error) {
  start := time.Now()
  resp, err := handler(ctx, req)
  err = rpc.MarshalError(err, ctx)
  log.Print(...)
  return resp, err
}
复制代码

it’s ugly,but works.

这是在 gRPC 不支持高级 Error 的状况下,怎么去 work around 这个问题,而且凑合用起来。如今这么作,错误就能够跨主机边界传递了。

生产环境案例:Dump

又有客户前来提需求了,有的客户说咱们能够存、也能够取,可是如何才能把里面全部的数据都获取下来?因而有了需求,但愿实现 Dump() 操做,能够取回全部数据。

如今已经轻车熟路了,咱们先改 IDL,添加一个 Dump() 函数。

service Cache {
  rpc Store(StoreReq) returns (StoreResp) {}
  rpc Get(GetReq) returns (GetResp) {}
  rpc Dump(DumpReq) returns (DumpResp) {}
}
message DumpReq{
}
message DumpResp {
  repeated DumpItem items = 1;
}
message DumpItem {
  string key = 1;
  bytes val = 2;
}
复制代码

这里 DumpResp 里面用的是 repeated,由于 protobuf 里面不知道为啥不叫 array。

生产环境案例:流量控制

新功能 Dump 上线了,结果发现你们都很喜欢 Dump,有不少人在 Dump,结果服务器的内存开始不够了。因而咱们须要一些限制手段,能够控制流量。

查阅了文档后,发现咱们能够控制同时最大有多少并发能够访问,以及能够多频繁的来访问服务。 server.go

func runServer() error {
  ...
  srv := grpc.NewServer(grpc.Creds(tlsCreds),
    ServerInterceptor(),
    grpc.MaxConcurrentStreams(64),
    grpc.InTapHandle(NewTap().Handler))
  rpc.RegisterCacheServer(srv, NewCacheService(accounts))
  l, err := net.Listen("tcp", "localhost:5051")
  if err != nil {
    return err
  }
  l = netutil.LimitListener(l, 1024)
  return srv.Serve(l)
}
复制代码

这里使用了 netutil.LimitListener(l, 1024) 控制了总共能够有多少个链接,而后用 grpc.MaxConcurrentStreams(64) 指定了每一个 grpc 的链接能够有多少个并发流(stream)。这两个结合起来基本控制了并发的总数。

可是 gRPC 里没有地方限定能够多频繁的访问。所以这里用了 grpc.InTapHandle(NewTap().Handler)) 来进行定制,这是在更靠前的位置执行的。

tap.go

type Tap struct {
  lim *rate.Limiter
}
func NewTap() *Tap {
  return &Tap(rate.NewLimiter(150, 5))
}
func (t *Tap) Handler(ctx context.Context, info *tap.Info) (context.Context, error) {
  if !t.lim.Allow() {
    return nil, status.Errorf(codes.ResourceExhausted, "service is over rate limit")
  }
  return ctx, nil
}
复制代码

生产环境案例:Streaming

以前的方案部署后,内存终于降下来了,可是还没休息,就发现你们愈来愈喜欢用这个缓存服务,内存又不够用了。这个时候咱们就开始思考,是否是能够调整一下设计,不是每次 Dump 就当即在内存生成完整的返回数组,而是以流的形式,按需发回。 app.proto

syntax = "proto3";
package rpc;
service Cache {
  rpc Store(StoreReq) returns (StoreResp) {}
  rpc Get(GetReq) returns (GetResp) {}
  rpc Dump(DumpReq) returns (stream DumpItem) {}
}
message DumpReq{
}
message DumpItem {
  string key = 1;
  bytes val = 2;
}
复制代码

这里再也不使用数组性质的 repeated,而是用 stream,客户端请求 Dump() 后,将结果以流的形式发回去。 server.go

func (s *CacheService) Dump(req *rpc.DumpReq, stream rpc.Cache_DumpServer) error {
  for k, v := range s.store {
    stream.Send(&rpc.DumpItem{
      Key: k,
      Val: v,
    })
  }
  return nil
}
复制代码

咱们修改 Dump() 的实现,对于每一个记录,利用 stream.Send() 发送到流。

注意这里咱们没有 context,只有个 stream。 client.go

func runClient() error {
  ...
  stream, err := cache.Dump(context.Background(), &rpc.DumpReq{})
  if err != nil {
    return fmt.Errorf("failed to dump: %v", err)
  }
  for {
    item, err := stream.Recv()
    if err == io.EOF {
      break
    }
    if err != nil {
      return fmt.Errorf("failed to stream item: %v", err)
    }
  }
  return nil
}
复制代码

生产环境案例:横向扩展、负载均衡

使用流后,服务器性能提升了不少,可是,咱们的服务太吸引人了,用户愈来愈多,结果又内存不够了。这时候咱们审查代码,感受能作的事情都作了,或许是时候从单一服务器,扩展为多个服务器,而后之间使用负载均衡。

gRPC 是长链接性质的通信,所以若是一个客户端链接了一个 gRPC Endpoint,那么他就会一直链接到一个固定的服务器,所以多服务器的负载均衡对同一个客户端来讲是没有意义的,不会由于这个客户端有大量的请求而致使分散请求到不一样的服务器上去。

若是咱们但愿客户端能够利用多服务器的机制,咱们就须要更智能的客户端,让客户端意识到服务器存在多个副本,所以客户端创建多条链接到不一样的服务器,这样就可让单一客户端利用负载均衡的横向扩展能力。

生产环境案例:多语言协做

在复杂的环境中,咱们 gRPC 的客户端(甚至服务端)多是不一样语言平台的。这实际上是 gRPC 的优点,能够比较容易的实现跨语言平台的通信。

好比咱们能够作一个 Python 客户端:

import grpc
import rpc_pb2 as rpc
channel = grpc.insecure_channel('localhost:5051')
cache_svc = rpc.CacheStub(channel)
resp = cache_svc.Get(rpc.GetReq(
  key="gopher",
))
print resp.val
复制代码

一个不是很爽的地方是虽然 gRPC 的跨语言通信很方便,可是各个语言的实现都比较随意,好比 Go 中叫作 CacheClient(),而 Python 中则叫作 CacheStub()。这里没有什么特别的缘由非不同的名字,就是因为不一样的做者实现的时候按照本身的想法命名的。

gRPC 尚不完美的地方

  • 负载均衡
  • 结构化的错误信息
  • 还不支持浏览器的 JS (某种角度上讲,这是最经常使用的客户端)
  • 还常常发生 API 改变(即便都1.0了)
  • 某些语言实现的文档很是差
  • 没有跨语言的标准化的作法

gRPC 在生产环境中的用例

  • ngrok,全部内部20多个通信都走的是 gRPC
  • Square,将内部的通信都换成了 gRPC,是最先使用 gRPC 的用户和贡献者
  • CoreOS,etcd v3 彻底走的是 gRPC
  • Google,Google Cloud Service(PubSub, Speech Rec)走的是 gRPC
  • Netflix, Yik Yak, VSCO, Cockroach, …

gRPC 将来的变化

  • 想了解将来的变化能够查看:
  • 新的语言支持(SwiftHaskell正在试验阶段)
  • 稳定性、可靠性、性能的提升
  • 增长更多细化的 API 来支持自定义的行为(链接管理、频道跟踪)
  • 浏览器的 JS

本文转载自: blog.lab99.org/post/golang…

我的微信公众号:

我的github:

github.com/jiankunking

我的博客:

jiankunking.com

相关文章
相关标签/搜索