上一篇咱们介绍了rpc
最基本的应用,今天咱们来看看rpc
的另一个数据交互方式streaming rpc
,也就是流式接口。html
streaming rpc
相比于simple rpc
来讲能够很好的解决一个接口发送大量数据的场景。git
好比一个订单导出的接口有20万条记录,若是使用simple rpc
来实现的话。那么咱们须要一次性接收到20万记录才能进行下一步的操做。可是若是咱们使用streaming rpc
那么咱们就能够接收一条记录处理一条记录,直到因此的数据传输完毕。这样能够较少服务器的瞬时压力,也更有及时性github
下面们来看看streaming rpc
具体是怎么交互的。golang
syntax = "proto3"; package proto; message Order { int32 id = 1; string orderSn = 2; string date = 3; } message OrderList{ Order order = 1; } message OrderSearchParams { } message Image{ string fileName = 1; string file = 2; } message ImageList{ Image image = 1; } message uploadResponse{ } message SumData{ int32 number = 1; } service StreamService { rpc OrderList(OrderSearchParams) returns (stream OrderList){}; //服务端流式 rpc UploadFile(stream ImageList) returns (uploadResponse){}; //客户端流式 rpc SumData(stream SumData) returns (stream SumData){}; //双向流式 }
这里定义了三个方法服务器
package main import ( "google.golang.org/grpc" "iris-grpc-example/proto" "log" "net" ) type StreamServices struct {} func main() { server := grpc.NewServer() proto.RegisterStreamServiceServer(server, &StreamServices{}) lis, err := net.Listen("tcp", "127.0.0.1:9528") if err != nil { log.Fatalf("net.Listen err: %v", err) } server.Serve(lis) } func (services *StreamServices)OrderList(params *proto.OrderSearchParams,stream proto.StreamService_OrderListServer) error { return nil } func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error { return nil } func (services *StreamServices)SumData(stream proto.StreamService_SumDataServer) error { return nil }
package main import ( "github.com/kataras/iris/v12" "google.golang.org/grpc" "iris-grpc-example/proto" "log" ) var streamClient proto.StreamServiceClient func main() { app := iris.New() app.Logger().SetLevel("debug") //debug app.Handle("GET", "/testOrderList", orderList) app.Handle("GET", "/testUploadImage", uploadImage) app.Handle("GET", "/testSumData", sumData) app.Run(iris.Addr("127.0.0.1:8080")) } func init() { connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure()) if err != nil { log.Fatalln(err) } streamClient = proto.NewStreamServiceClient(connect) } func orderList(ctx iris.Context) { } func uploadImage(ctx iris.Context) { } func sumData(ctx iris.Context) { }
按照proto
中的约定,先实现接口并注册一个服务。接下来咱们依次来实现三个不一样的流式方法。并发
func (services *StreamServices) OrderList(params *proto.OrderSearchParams, stream proto.StreamService_OrderListServer) error { for i := 0; i <= 10; i++ { order := proto.Order{ Id: int32(i), OrderSn: time.Now().Format("20060102150405") + "order_sn", Date: time.Now().Format("2006-01-02 15:04:05"), } err := stream.Send(&proto.StreamOrderList{ Order: &order, }) if err != nil { return err } } return nil }
gRPC为咱们提供一个流的发送方法。send
,这样咱们能够很简单的以流的方式传递数据。
如今咱们来查看streaming.pb.go
中的send
app
func (x *streamServiceOrderListServer) Send(m *StreamOrderList) error { return x.ServerStream.SendMsg(m) }
能够看到最终是使用ServerStream.SendMsg
,查看源码,能够发现,最终是使用了一个结构体。tcp
type serverStream struct { ctx context.Context ...... maxReceiveMessageSize int maxSendMessageSize int ...... }
这里咱们关心两个值,最大可接收大小,最大发送大小。而再SendMsg
中也有对于的大小判断,因此发送的消息大小不是无限制的。学习
// TODO(dfawley): should we be checking len(data) instead? if len(payload) > ss.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) }
咱们能够服务端建立server
的时候经过server := grpc.NewServer(grpc.MaxSendMsgSize())
来指定大小,有能够在客服端建立client
的时候经过connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure(),grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize()))
的时候来指定,具体你们能够下来本身详细了解下配置项。google
func orderList(ctx iris.Context) { stream, err := streamClient.OrderList(context.Background(), &proto.OrderSearchParams{}) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } for { res, err := stream.Recv() if err == io.EOF { break } if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } ctx.JSON(res) log.Println(res) } }
这里在for
循环中去读取数据,直到取到一个io.EOF
的结束错误占位符。
func (services *StreamServices) UploadFile(stream proto.StreamService_UploadFileServer) error { for { res,err := stream.Recv() //接收消息结束,发送结果,并关闭 if err == io.EOF { return stream.SendAndClose(&proto.UploadResponse{}) } if err !=nil { return err } fmt.Println(res) } return nil }
能够看到这里咱们一样使用for
结合stream.Recv()
来接收数据流,可是这里咱们多一个SendAndClose
,表示服务器已经接收消息结束,并发生一个正确的响应给客户端。
func uploadImage(ctx iris.Context) { stream,err := streamClient.UploadFile(context.Background()) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } for i:=1;i<=10 ; i++ { img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"} images := &proto.StreamImageList{Image:img} err := stream.Send(images) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } } //发送完毕 关闭并获取服务端返回的消息 resp, err := stream.CloseAndRecv() if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } ctx.JSON(map[string]interface{}{"result": resp,"message":"success"}) log.Println(resp) }
而在客户端发送数据完毕的时候须要使用CloseAndRecv
须要接收服务端接收完毕的通知以及关闭当前通道。
func (services *StreamServices) SumData(stream proto.StreamService_SumDataServer) error { i := 0 for { err := stream.Send(&proto.StreamSumData{Number: int32(i)}) if err != nil { return err } res, err := stream.Recv() if err == io.EOF { return nil } log.Printf("res:%d,i:%d,sum:%d\r\n", res.Number, i, int32(i)+res.Number) i++ } }
服务端在发送消息的同时,并接收服务端发送的消息。
func sumData(ctx iris.Context) { stream, err := streamClient.SumData(context.Background()) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } for i := 1; i <= 10; i++ { err = stream.Send(&proto.StreamSumData{Number: int32(i)}) if err == io.EOF { break } if err != nil { return } res, err := stream.Recv() if err == io.EOF { break } if err != nil { return } log.Printf("res number:%d", res.Number) } stream.CloseSend() return }
上面咱们能够看到。客户端有一个执行断开链接的标识CloseSend()
,而服务器没有,由于服务端断开链接是隐式的,咱们只须要退出循环便可断开链接。能够灵活的控制。
上面就是go-gRPC
的流式接口。只是一个简单的例子。若有不妥的地方欢迎指出。感谢。