你们好,我是煎鱼,本章节将介绍 gRPC 的流式,分为三种类型:git
任何技术,由于有痛点,因此才有了存在的必要性。若是您想要了解 gRPC 的流式调用,请继续github
gRPC Streaming 是基于 HTTP/2 的,后续章节再进行详细讲解golang
流式为何要存在呢,是 Simple RPC 有什么问题吗?经过模拟业务场景,可得知在使用 Simple RPC 时,有以下问题:bash
天天早上 6 点,都有一批百万级别的数据集要同从 A 同步到 B,在同步的时候,会作一系列操做(归档、数据分析、画像、日志等)。这一次性涉及的数据量确实大服务器
在同步完成后,也有人立刻会去查阅数据,为了新的一天筹备。也符合实时性。tcp
二者相较下,这个场景下更适合使用 Streaming RPCide
在讲解具体的 gRPC 流式代码时,会着重在第一节讲解,由于三种模式实际上是不一样的组合。但愿你可以注重理解,触类旁通,其实都是同样的知识点 👍ui
$ tree go-grpc-example
go-grpc-example
├── client
│ ├── simple_client
│ │ └── client.go
│ └── stream_client
│ └── client.go
├── proto
│ ├── search.proto
│ └── stream.proto
└── server
├── simple_server
│ └── server.go
└── stream_server
└── server.go
复制代码
增长 stream_server、stream_client 存放服务端和客户端文件,proto/stream.proto 用于编写 IDLgoogle
在 proto 文件夹下的 stream.proto 文件中,写入以下内容:spa
syntax = "proto3";
package proto;
service StreamService {
rpc List(StreamRequest) returns (stream StreamResponse) {};
rpc Record(stream StreamRequest) returns (StreamResponse) {};
rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
}
message StreamPoint {
string name = 1;
int32 value = 2;
}
message StreamRequest {
StreamPoint pt = 1;
}
message StreamResponse {
StreamPoint pt = 1;
}
复制代码
注意关键字 stream,声明其为一个流方法。这里共涉及三个方法,对应关系为
package main
import (
"log"
"net"
"google.golang.org/grpc"
pb "github.com/EDDYCJY/go-grpc-example/proto"
)
type StreamService struct{}
const (
PORT = "9002"
)
func main() {
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})
lis, err := net.Listen("tcp", ":"+PORT)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
return nil
}
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
return nil
}
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
return nil
}
复制代码
写代码前,建议先将 gRPC Server 的基础模板和接口给空定义出来。如有不清楚可参见上一章节的知识点
package main
import (
"log"
"google.golang.org/grpc"
pb "github.com/EDDYCJY/go-grpc-example/proto"
)
const (
PORT = "9002"
)
func main() {
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
if err != nil {
log.Fatalf("printLists.err: %v", err)
}
err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})
if err != nil {
log.Fatalf("printRecord.err: %v", err)
}
err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
if err != nil {
log.Fatalf("printRoute.err: %v", err)
}
}
func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
复制代码
服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求
简单来说就是客户端发起一次普通的 RPC 请求,服务端经过流式响应屡次发送数据集,客户端 Recv 接收数据集。大体如图:
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
for n := 0; n <= 6; n++ {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: r.Pt.Name,
Value: r.Pt.Value + int32(n),
},
})
if err != nil {
return err
}
}
return nil
}
复制代码
在 Server,主要留意 stream.Send
方法。它看上去能发送 N 次?有没有大小限制?
type StreamService_ListServer interface {
Send(*StreamResponse) error
grpc.ServerStream
}
func (x *streamServiceListServer) Send(m *StreamResponse) error {
return x.ServerStream.SendMsg(m)
}
复制代码
经过阅读源码,可得知是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg
方法,该方法涉及如下过程:
math.MaxInt32
),若超出则提示错误func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.List(context.Background(), r)
if err != nil {
return err
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
}
return nil
}
复制代码
在 Client,主要留意 stream.Recv()
方法。什么状况下 io.EOF
?什么状况下存在错误信息呢?
type StreamService_ListClient interface {
Recv() (*StreamResponse, error)
grpc.ClientStream
}
func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
m := new(StreamResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
复制代码
RecvMsg 会从流中读取完整的 gRPC 消息体,另外经过阅读源码可得知:
(1)RecvMsg 是阻塞等待的
(2)RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF
(3)RecvMsg 当流出现任何错误时,流会被停止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现以下错误:
同时须要注意,默认的 MaxReceiveMessageSize 值为 1024 * 1024 * 4,建议不要超出
运行 stream_server/server.go:
$ go run server.go
复制代码
运行 stream_client/client.go:
$ go run client.go
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024
复制代码
客户端流式 RPC,单向流,客户端经过流式发起屡次 RPC 请求给服务端,服务端发起一次响应给客户端,大体如图:
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
}
if err != nil {
return err
}
log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
}
return nil
}
复制代码
多了一个从未见过的方法 stream.SendAndClose
,它是作什么用的呢?
在这段程序中,咱们对每个 Recv 都进行了处理,当发现 io.EOF
(流关闭) 后,须要将最终的响应结果发送给客户端,同时关闭正在另一侧等待的 Recv
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Record(context.Background())
if err != nil {
return err
}
for n := 0; n < 6; n++ {
err := stream.Send(r)
if err != nil {
return err
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
return nil
}
复制代码
stream.CloseAndRecv
和 stream.SendAndClose
是配套使用的流方法,相信聪明的你已经秒懂它的做用了
重启 stream_server/server.go,再次运行 stream_client/client.go:
$ go run client.go
2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
复制代码
$ go run server.go
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
复制代码
双向流式 RPC,顾名思义是双向流。由客户端以流式的方式发起请求,服务端一样以流式的方式响应请求
首个请求必定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、何时关闭)根据程序编写的方式来肯定(能够结合协程)
假设该双向流是按顺序发送的话,大体如图:
仍是要强调,双向流变化很大,因程序编写的不一样而不一样。双向流图示没法适用不一样的场景
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
n := 0
for {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: "gPRC Stream Client: Route",
Value: int32(n),
},
})
if err != nil {
return err
}
r, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
n++
log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
}
return nil
}
复制代码
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Route(context.Background())
if err != nil {
return err
}
for n := 0; n <= 6; n++ {
err = stream.Send(r)
if err != nil {
return err
}
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
}
stream.CloseSend()
return nil
}
复制代码
重启 stream_server/server.go,再次运行 stream_client/client.go:
$ go run server.go
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
复制代码
$ go run client.go
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6
复制代码
在本文共介绍了三类流的交互方式,能够根据实际的业务场景去选择合适的方式。会事半功倍哦。
若是有任何疑问或错误,欢迎在 issues 进行提问或给予修正意见,若是喜欢或对你有所帮助,欢迎 Star,对做者是一种鼓励和推动。