grpc: From Tutorial to Production by Alan Shreve at GopherCon 2017html
www.youtube.com/watch?v=7FZ…node
博文:about.sourcegraph.com/go/grpc-in-…git
答案就是:SOAP……好吧,开个玩笑,固然不多是 SOAP 了。github
如今流行的作法是 HTTP + JSON (REST API)golang
Alan 说“若是这辈子不再写另外一个 REST 客户端库的话,那就能够很幸福的死去了……😂”,由于这是最无聊的事情,一遍一遍的在作一样的事情。swift
gPRC 是高性能、开源、通用的 RPC 框架。数组
与其讲解定义,不如来实际作个东西更清楚。浏览器
使用 gRPC 这类东西,咱们并不是开始于写 Go 代码,咱们是从撰写 gRPC 的 IDL 开始的。缓存
syntax = "proto3"
package rpc;
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
}
message StoreReq {
string key = 1;
bytes val = 2;
}
message StoreResp {
}
message GetReq {
string key = 1;
}
message GetResp {
bytes val = 1;
}
复制代码
当写了这个文件后,咱们马上拥有了 9 种语言的客户端的库。bash
同时,咱们也拥有了 7 种语言的服务端的 API Stub:
func serverMain() {
if err := runServer(); err != nil {
fmt.Fprintf(os.Stderr, "Failed to run cache server: %s\n", err)
os.Exit(1)
}
}
func runServer() error {
srv := grpc.NewServer()
rpc.RegisterCacheServer(srv, &CacheService{})
l, err := net.Listen("tcp", "localhost:5051")
if err != nil {
return err
}
// block
return srv.Serve(l)
}
复制代码
暂时先不实现 CacheService,先放个空的,稍后再实现。
type CacheService struct {
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
return nil, fmt.Errorf("unimplemented")
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
return nil, fmt.Errorf("unimplemented")
}
复制代码
func clientMain() {
if err != runClient(); err != nil {
fmt.Fprintf(os.Stderr, "failed: %v\n", err)
os.Exit(1)
}
}
func runClient() error {
// 创建链接
conn, err := grpc.Dial("localhost:5053", grpc.WithInsecure())
if err != nil {
return fmt.Errorf("failed to dial server: %v", err)
}
cache := rpc.NewCacheClient(conn)
// 调用 grpc 的 store() 方法存储键值对 { "gopher": "con" }
_, err = cache.Store(context.Background(), &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
if err != nil {
return fmt.Errorf("failed to store: %v", err)
}
// 调用 grpc 的 get() 方法取回键为 `gopher` 的值
resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
if err != nil {
return fmt.Errorf("failed to get: %v", err)
}
// 输出
fmt.Printf("Got cached value %s\n", resp.Val)
return nil
}
复制代码
或许有些人会认为这和 WSDL 也太像了,这么想没有错,由于 gRPC 在借鉴以前的 SOAP/WSDL 的错误基础上,也吸收了他们优秀的地方。
server.go
type CacheService struct {
store map[string][]byte
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
val := s.store[req.Key]
return &rpc.GetResp{Val: val}, nil
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
s.store[req.Key] = req.Val
return &rpc.StoreResp{}, nil
}
复制代码
注意这里没有锁,你能够想一想他们中有,由于未来他们会被并发的调用的。
固然,gRPC 支持错误处理。假设改写上面的 Get(),对不存在的键进行报错:
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
val, ok := s.store[req.Key]
if !ok {
return nil, status.Errorf(code.NotFound, "Key not found %s", req.Key)
}
return &rpc.GetResp{Val: val}, nil
}
复制代码
若是这样的代码打算去部署的话,必定会被 SRE 拦截下来,由于全部通信必须加密传输。
在 gRPC 中添加 TLS 加密传输很容易。好比咱们修改 runServer() 添加 TLS 加密传输。
func runServer() error {
tlsCreds, err := credentials.NewServerTLSFromFile("tls.crt", "tls.key")
if err != nil {
return err
}
srv := grpc.NewServer(grpc.Creds(tlsCreds))
...
}
复制代码
一样,咱们也须要修改一下 runClient()。
func runClient() error {
tlsCreds := credentials.NewTLS(&tls.Config(InsecureSkipVerify: true))
conn, err := grpc.Dial("localhost:5051", grpc.WithTransportCredentials(tlsCreds))
...
}
复制代码
如今有3个高性能的、事件驱动的实现
上线生产后,发现有一部分客户产生了大量的键值,询问得知,有的客户但愿对全部东西都缓存,这显然不是对咱们这个缓存服务很好的事情。
咱们但愿限制这种行为,但对于当前系统而言,没法知足这种需求,所以咱们须要修改实现,对每一个客户发放客户 token,那么咱们就能够约束特定客户最多能够创建多少键值,避免系统滥用。这就成为了多租户的缓存服务。
和以前同样,咱们仍是从 IDL 开始,咱们须要修改接口,增长 account_token 项。
message StoreReq {
string key = 1;
bytes val = 2;
string account_token = 3;
}
复制代码
一样,咱们须要有独立的服务针对帐户服务,来获取帐户所容许的缓存键数:
service Accounts {
rpc GetByToken(GetByTokenReq) return (GetByTokenResp) {}
}
message GetByTokenReq {
string token = 1;
}
message GetByTokenResp {
Account account = 1;
}
message Account {
int64 max_cache_keys = 1;
}
复制代码
这里创建了一个新的 Accounts 服务,而且有一个 GetByToken() 方法,给入 token,返回一个 Account 类型的结果,而 Account 内有 max_cache_keys 键对应最大可缓存的键值数。
如今咱们进一步修改 client.go
func runClient() error {
...
cache := rpc.NewCacheClient(conn)
_, err = cache.Store(context.Background(), &rpc.StoreReq{
AccountToken: "inconshreveable",
Key: "gopher",
Val: []byte("con"),
})
if err != nil {
return fmt.Errorf("failed to store: %v", err)
}
...
}
复制代码
服务端的改变要稍微大一些,但不过度。
type CacheService struct {
accounts rpc.AccountsClient
store map[string][]byte
keysByAccount map[string]int64
}
复制代码
注意这里的 accounts 是一个 grpc 的客户端,由于咱们这个服务,同时也是另外一个 grpc 服务的客户端。因此在接下来的 Store() 实现中,咱们须要先经过 accounts 调用另外一个服务取得帐户信息。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
// 调用另外一个服务取得帐户信息,包含其键值限制
resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
Token: req.AccountToken,
})
if err != nil {
return nil, err
}
// 检查是否超量使用
if s.keysByAccount[req.AccountToken] >= resp.Account.MaxCacheKeys {
return nil, status.Errorf(codes.FailedPrecondition, "Account %s exceeds max key limit %d", req.AccountToken, resp.Account.MaxCacheKeys)
}
// 若是键不存在,须要新加键值,那么咱们就对计数器加一
if _, ok := s.store[req.Key]; !ok {
s.keysByAccount[req.AccountToken] += 1
}
// 保存键值
s.store[req.Key] = req.Val
return &rpc.StoreResp{}, nil
}
复制代码
上面的问题解决了,咱们服务又恢复了正常,不会有用户创建过多的键值了。可是很快,咱们就又收到了其余用户发来的新的 issue,不少人反应说新系统变慢了,没有达到 SLA 的要求。
但是咱们根本不知道到底发生了什么,因而意识到了,咱们的程序没有任何可观察性(Observability),换句话说,咱们的程序没有任何计量系统来统计性能相关的数据。
咱们先从最简单的作起,添加日志。
咱们先从 client.go 开始,增长一些测量和计数以及日志输出。
...
// 开始计时
start := time.Now()
_, err = cache.Store(context.Background(), &rpc.StoreReq{
AccountToken: "inconshreveable",
Key: "gopher",
Val: []byte("con"),
})
// 计算 cache.Store() 调用时间
log.Printf("cache.Store duration %s", time.Since(start))
if err != nil {
return fmt.Errorf("failed to store: %v", err)
}
// 再次开始计时
start = time.Now()
// 调用 grpc 的 get() 方法取回键为 `gopher` 的值
resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
// 计算 cache.Get() 调用时间
log.Printf("cache.Get duration %s", time.Since(start))
if err != nil {
return fmt.Errorf("failed to get: %v", err)
}
复制代码
一样,在服务端也这么处理。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
// 开始计时
start := time.Now()
// 调用另外一个服务取得帐户信息,包含其键值限制
resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
Token: req.AccountToken,
})
// 输出 account.GetByToken() 的调用时间
log.Printf("accounts.GetByToken duration %s", time.Since(start))
...
}
复制代码
通过这些修改后,咱们发现同样的事情在反反复复的作,那么有什么办法能够改变这种无聊的作法么?查阅 grpc 文档后,看到有一个叫作 Client Interceptor 的东西。
这至关因而一个中间件,可是是在客户端。当客户端进行 rpc 调用的时候,这个中间件先会被调用,所以这个中间件能够对调用进行一层包装,而后再进行调用。
为了实现这个功能,咱们建立一个新的文件,叫作 interceptor.go:
func WithClientInterceptor() grpc.DialOption {
return grpc.WithUnaryInterceptor(clientInterceptor)
}
func clientInterceptor(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("invoke remote method=%s duration=%s error=%v", method, time.Since(start), err)
return err
}
复制代码
咱们有了这个 WithClientInterceptor() 以后,能够在 grpc.Dial() 的时候注册进去。 client.go
func runClient() error {
...
conn, err := grpc.Dial("localhost:5051",
grpc.WithTransportCredentials(tlsCreds),
WithClientInterceptor())
...
}
复制代码
注册以后,全部的 grpc 调用都会通过咱们注册的 clientInterceptor(),所以全部的时间就都有统计了,而不用每一个函数内部反反复复的添加时间、计量、输出。
添加了客户端的这个计量后,天然而然就联想到服务端是否是也能够作一样的事情?通过查看文档,能够,有个叫作 Server Interceptor 的东西。
一样的作法,咱们在服务端添加 interceptor.go,而且添加 ServerInterceptor() 函数。
func ServerInterceptor() grpc.ServerOption {
return grpc.UnaryInterceptor(serverInterceptor)
}
func serverInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("invoke server method=%s duration=%s error=%v",
info.FullMethod,
time.Since(start),
err)
return resp, err
}
复制代码
和客户端同样,须要在 runServer() 的时候注册咱们定义的这个中间件。
func runServer() error {
...
srv := grpc.NewServer(grpc.Creds(tlsCreds), ServerInterceptor())
...
}
复制代码
添加了日志后,咱们终于在日志中发现,/rpc.Accounts/GetByToken/ 花了好长的时间。咱们须要对这个操做设置超时。 server.go
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
accountsCtx, _ := context.WithTimeout(context.Background(), 2 * time.Second)
resp, err := s.accounts.GetByToken(accountsCtx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
...
}
复制代码
这里操做很简单,直接使用标准库中 context.WithTimeout() 就能够了。
通过上面修改后,客户依旧抱怨说没有知足 SLA,仔细一想也对。就算这里约束了 2 秒钟,客户端调用还须要时间,别的代码在中间也有时间开销。并且有的客户说,咱们这里须要1秒钟,而不是2秒钟。
好吧,让咱们把这个时间设定推向调用方。
首先咱们要求在客户端进行调用时间约束的设定: client.go
func runClient() error {
...
ctx, _ := context.WithTimeout(context.Background(), time.Second)
_, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
...
ctx, _ = context.WithTimeout(context.Background(), 50*time.Millisecond)
resp, err := cache.Get(ctx, &rpc.GetReq{Key: "gopher"})
...
}
复制代码
而后在服务端,咱们将上下文传递。直接取调用方的 ctx。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
...
}
复制代码
上面的问题都解决了,终于能够松一口气了。但是客户又提新的需求了……😅,说咱们能不能增长一个 Dry Run 的标志,就是说我但愿你作全部须要作的事情,除了真的修改键值库。
GRPC metadata,也称为 GRPC 的 Header。就像 HTTP 头同样,能够有一些 Metadata 信息传递过来。使用 metadata,可让咱们的 Dry Run 的实现变得更简洁,没必要每一个 RPC 方法内都实现一遍检查 Dry Run 标志的逻辑,咱们能够独立出来。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
if !dryRun(ctx) {
if _, ok := s.store[req.Key]; !ok {
s.keysByAccount[req.AccountToke] += 1
}
s.store[req.Key] = req.Val
}
return &rpc.StoreResp{}, nil
}
func dryRun(ctx context.Context) bool {
md, ok := metadata.FromContext(ctx)
if !ok {
return false
}
val, ok := md["dry-run"]
if !ok {
return false
}
if len(val) < 1 {
return false
}
return val[0] == "1"
}
复制代码
固然,这么作是有妥协的,由于通用化后就失去了类型检查的能力。
在客户端调用的时候,则须要根据状况添加 dry-run 参数给 metadata。
func runClient() error {
...
ctx, _ := context.WithTimeout(context.Background(), time.Second)
ctx = metadata.NewContext(ctx, metadata.Pairs("dry-run", "1"))
_, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
...
}
复制代码
实现了 Dry Run 觉得能够休息了,以前抱怨慢的客户又来抱怨了,虽然有超时控制,知足 SLA,可是服务那边仍是慢,总超时不成功。检查了一下,发现是网络上的事情,咱们没有太多能够作的事情。为了解决客户的问题,咱们来添加一个重试的机制。
咱们能够对每个 gRPC 调用添加一个 Retry 机制,咱们也能够像以前计时统计那样,使用 Interceptor 吧?
func clientInterceptor(...) error {
var (
start = time.Now()
attempts = 0
err error
backoff retryBackOff
)
for {
attempts += 1
select {
case <-ctx.Done():
err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
case <-backoff.Next():
startAttempt := time.Now()
err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
log.Printf(...)
continue
}
}
break
}
log.Printf(...)
return err
}
复制代码
看起来还不错,而后就打算发布这个代码了。结果提交审核的时候被打回来了,说这个代码不合理,由于若是是非幂等(non-idempotent) 的操做,这样就会致使屡次执行,改变指望结果了。
看来咱们得针对幂等和非幂等操做区别对待了。
silo.FireZeMissiles(NotIdempotent(ctx), req)
复制代码
嗯,固然,没这个东西。因此咱们须要本身来创造一个标记,经过 context,来标明操做是否幂等。
func NotIdempotent(ctx context.Context) context.Context {
return context.WithValue(ctx, "idempotent", false)
}
func isIdempotent(ctx context.Context) bool {
val, ok := ctx.Value("idempotent").(bool)
if !ok {
return true
}
return val
}
复制代码
而后在咱们的 clientInterceptor() 实现中加入 isIdempotent() 判断:
func clientInterceptor(...) error {
var (
start = time.Now()
attempts = 0
err error
backoff retryBackOff
)
for {
attempts += 1
select {
case <-ctx.Done():
err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
case <-backoff.Next():
startAttempt := time.Now()
err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil && isIdempotent(ctx) {
log.Printf(...)
continue
}
}
break
}
log.Printf(...)
return err
}
复制代码
这样当调用失败后,客户端检查发现是幂等的状况,才重试,不然不重试。避免了非幂等操做的反复操做。
感受没啥问题了,因而部署上线了。但是运行一段时间后,发现有些不对劲。全部成功的RPC调用,也就是说这个操做自己是正确的,都没有问题,超时重试也正常。可是全部失败的 RPC 调用都不对了,全部失败的 RPC 调用,都返回超时,而不是错误自己。这里说的失败,不是说网络问题致使超时啥的,而是说请求自己的失败,好比以前提到的,Get() 不存在的键,应该返回错误;或者 Store() 超过了配额,应该返回错误,这类错误在日志中都没看到,反而都对应了超时。
通过分析发现,服务端该报错都报错,没啥问题,可是客户端不对,本应该返回错误给调用方的地方,客户端代码反而又开始重试这个操做了。看来以前重试的代码还有问题。
err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil && isIdempotent(ctx) {
log.Printf(...)
continue
}
复制代码
若是仔细观察这部分代码,会发现,不管 err 是什么,只要非 nil,咱们就重试。其实这是不对的,咱们只有针对某些错误重试,好比网络问题之类的,而不该该对咱们但愿返回给调用方的错误重试,那没有意义。
那么问题就变成了,咱们到底应该怎么对 err 判断来决定是否重试?
因此,咱们须要的是一个完整的结构化的错误信息,而不是简单的一个 Error Code 和字符串。固然这条路很差走,可是咱们已经作了这么多了,坚持一下仍是能够克服的。
这里咱们仍是从 IDL 开始:
message Error {
int64 code = 1;
string messsage = 2;
bool temporary = 3;
int64 userErrorCode = 4;
}
复制代码
而后咱们实现这个 Error 类型。 rpc/error.go
func (e *Error) Error() string {
return e.Message
}
func Errorf(code codes.Code, temporary bool, msg string, args ..interface{}) error {
return &Error{
Code: int64(code),
Message: fmt.Sprintf(msg, args...),
Temporary: temporary,
}
}
复制代码
有这两个函数,咱们能够显示和构造这个 Error 类型的变量了,可是咱们该怎么把错误消息传回客户端呢?而后问题就开始变的繁琐起来了: rpc/error.go
func MarshalError (err error, ctx context.Context) error {
rerr, ok := err.(*Error)
if !ok {
return err
}
pberr, marshalerr := pb.Marshal(rerr)
if marshalerr == nil {
md := metadata.Pairs("rpc-error", base64.StdEncoding.EncodeToString(pberr))
_ = grpc.SetTrailer(ctx, md)
}
return status.Errorf(codes.Code(rerr.Code), rerr.Message)
}
func UnmarshalError(err error, md metadata.MD) *Error {
vals, ok := md["rpc-error"]
if !ok {
return nil
}
buf, err := base64.StdEncoding.DecodeString(vals[0])
if err != nil {
return nil
}
var rerr Error
if err := pb.Unmarshal(buf, &rerr); err != nil {
return nil
}
return &rerr
}
复制代码
interceptor.go
func serverInterceptor (
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
err = rpc.MarshalError(err, ctx)
log.Print(...)
return resp, err
}
复制代码
it’s ugly,but works.
这是在 gRPC 不支持高级 Error 的状况下,怎么去 work around 这个问题,而且凑合用起来。如今这么作,错误就能够跨主机边界传递了。
又有客户前来提需求了,有的客户说咱们能够存、也能够取,可是如何才能把里面全部的数据都获取下来?因而有了需求,但愿实现 Dump() 操做,能够取回全部数据。
如今已经轻车熟路了,咱们先改 IDL,添加一个 Dump() 函数。
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
rpc Dump(DumpReq) returns (DumpResp) {}
}
message DumpReq{
}
message DumpResp {
repeated DumpItem items = 1;
}
message DumpItem {
string key = 1;
bytes val = 2;
}
复制代码
这里 DumpResp 里面用的是 repeated,由于 protobuf 里面不知道为啥不叫 array。
新功能 Dump 上线了,结果发现你们都很喜欢 Dump,有不少人在 Dump,结果服务器的内存开始不够了。因而咱们须要一些限制手段,能够控制流量。
查阅了文档后,发现咱们能够控制同时最大有多少并发能够访问,以及能够多频繁的来访问服务。 server.go
func runServer() error {
...
srv := grpc.NewServer(grpc.Creds(tlsCreds),
ServerInterceptor(),
grpc.MaxConcurrentStreams(64),
grpc.InTapHandle(NewTap().Handler))
rpc.RegisterCacheServer(srv, NewCacheService(accounts))
l, err := net.Listen("tcp", "localhost:5051")
if err != nil {
return err
}
l = netutil.LimitListener(l, 1024)
return srv.Serve(l)
}
复制代码
这里使用了 netutil.LimitListener(l, 1024) 控制了总共能够有多少个链接,而后用 grpc.MaxConcurrentStreams(64) 指定了每一个 grpc 的链接能够有多少个并发流(stream)。这两个结合起来基本控制了并发的总数。
可是 gRPC 里没有地方限定能够多频繁的访问。所以这里用了 grpc.InTapHandle(NewTap().Handler)) 来进行定制,这是在更靠前的位置执行的。
tap.go
type Tap struct {
lim *rate.Limiter
}
func NewTap() *Tap {
return &Tap(rate.NewLimiter(150, 5))
}
func (t *Tap) Handler(ctx context.Context, info *tap.Info) (context.Context, error) {
if !t.lim.Allow() {
return nil, status.Errorf(codes.ResourceExhausted, "service is over rate limit")
}
return ctx, nil
}
复制代码
以前的方案部署后,内存终于降下来了,可是还没休息,就发现你们愈来愈喜欢用这个缓存服务,内存又不够用了。这个时候咱们就开始思考,是否是能够调整一下设计,不是每次 Dump 就当即在内存生成完整的返回数组,而是以流的形式,按需发回。 app.proto
syntax = "proto3";
package rpc;
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
rpc Dump(DumpReq) returns (stream DumpItem) {}
}
message DumpReq{
}
message DumpItem {
string key = 1;
bytes val = 2;
}
复制代码
这里再也不使用数组性质的 repeated,而是用 stream,客户端请求 Dump() 后,将结果以流的形式发回去。 server.go
func (s *CacheService) Dump(req *rpc.DumpReq, stream rpc.Cache_DumpServer) error {
for k, v := range s.store {
stream.Send(&rpc.DumpItem{
Key: k,
Val: v,
})
}
return nil
}
复制代码
咱们修改 Dump() 的实现,对于每一个记录,利用 stream.Send() 发送到流。
注意这里咱们没有 context,只有个 stream。 client.go
func runClient() error {
...
stream, err := cache.Dump(context.Background(), &rpc.DumpReq{})
if err != nil {
return fmt.Errorf("failed to dump: %v", err)
}
for {
item, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to stream item: %v", err)
}
}
return nil
}
复制代码
使用流后,服务器性能提升了不少,可是,咱们的服务太吸引人了,用户愈来愈多,结果又内存不够了。这时候咱们审查代码,感受能作的事情都作了,或许是时候从单一服务器,扩展为多个服务器,而后之间使用负载均衡。
gRPC 是长链接性质的通信,所以若是一个客户端链接了一个 gRPC Endpoint,那么他就会一直链接到一个固定的服务器,所以多服务器的负载均衡对同一个客户端来讲是没有意义的,不会由于这个客户端有大量的请求而致使分散请求到不一样的服务器上去。
若是咱们但愿客户端能够利用多服务器的机制,咱们就须要更智能的客户端,让客户端意识到服务器存在多个副本,所以客户端创建多条链接到不一样的服务器,这样就可让单一客户端利用负载均衡的横向扩展能力。
在复杂的环境中,咱们 gRPC 的客户端(甚至服务端)多是不一样语言平台的。这实际上是 gRPC 的优点,能够比较容易的实现跨语言平台的通信。
好比咱们能够作一个 Python 客户端:
import grpc
import rpc_pb2 as rpc
channel = grpc.insecure_channel('localhost:5051')
cache_svc = rpc.CacheStub(channel)
resp = cache_svc.Get(rpc.GetReq(
key="gopher",
))
print resp.val
复制代码
一个不是很爽的地方是虽然 gRPC 的跨语言通信很方便,可是各个语言的实现都比较随意,好比 Go 中叫作 CacheClient(),而 Python 中则叫作 CacheStub()。这里没有什么特别的缘由非不同的名字,就是因为不一样的做者实现的时候按照本身的想法命名的。
本文转载自: blog.lab99.org/post/golang…
我的微信公众号:
我的github:
我的博客: