go-grpc-流式接口(streaming rpc)

上一篇咱们介绍了rpc最基本的应用,今天咱们来看看rpc的另一个数据交互方式streaming rpc,也就是流式接口。html

streaming rpc相比于simple rpc来讲能够很好的解决一个接口发送大量数据的场景。git

好比一个订单导出的接口有20万条记录,若是使用simple rpc来实现的话。那么咱们须要一次性接收到20万记录才能进行下一步的操做。可是若是咱们使用streaming rpc那么咱们就能够接收一条记录处理一条记录,直到因此的数据传输完毕。这样能够较少服务器的瞬时压力,也更有及时性github

下面们来看看streaming rpc具体是怎么交互的。golang

IDL

syntax = "proto3";
package proto;

message Order {
    int32 id = 1;
    string orderSn = 2;
    string date = 3;
}
message OrderList{
    Order order = 1;
}
message OrderSearchParams {
}
message Image{
    string fileName = 1;
    string file = 2;
}
message ImageList{
    Image image = 1;
}
message uploadResponse{
}
message SumData{
    int32 number = 1;
}
service StreamService {
    rpc OrderList(OrderSearchParams) returns (stream OrderList){}; //服务端流式
    rpc UploadFile(stream ImageList) returns (uploadResponse){}; //客户端流式
    rpc SumData(stream SumData) returns (stream SumData){}; //双向流式
}

这里定义了三个方法服务器

  • OrderList 服务器流式,客户端普通rpc调用
  • UploadFile 客户端流式,服务端普通rpc
  • SumData 双向流式

基础结构

server
package main

import (
    "google.golang.org/grpc"
    "iris-grpc-example/proto"
    "log"
    "net"
)
type  StreamServices struct {}
func main()  {
    server := grpc.NewServer()
    proto.RegisterStreamServiceServer(server, &StreamServices{})

    lis, err := net.Listen("tcp", "127.0.0.1:9528")
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }
    server.Serve(lis)
}
func (services *StreamServices)OrderList(params *proto.OrderSearchParams,stream proto.StreamService_OrderListServer) error {
    return  nil
}

func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error {
    return  nil
}

func (services *StreamServices)SumData(stream proto.StreamService_SumDataServer) error {
    return  nil
}
clietn
package main
import (
    "github.com/kataras/iris/v12"
    "google.golang.org/grpc"
    "iris-grpc-example/proto"
    "log"
)
var streamClient proto.StreamServiceClient
func main()  {
    app := iris.New()
    app.Logger().SetLevel("debug") //debug
    app.Handle("GET", "/testOrderList", orderList)
    app.Handle("GET", "/testUploadImage", uploadImage)
    app.Handle("GET", "/testSumData", sumData)
    app.Run(iris.Addr("127.0.0.1:8080"))
}
func init() {
    connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure())
    if err != nil {
        log.Fatalln(err)
    }
    streamClient = proto.NewStreamServiceClient(connect)
}

func orderList(ctx iris.Context)  {
    
}

func uploadImage(ctx iris.Context)  {

}

func sumData(ctx iris.Context)  {

}

按照proto中的约定,先实现接口并注册一个服务。接下来咱们依次来实现三个不一样的流式方法。并发

服务端流式 orderList
server
func (services *StreamServices) OrderList(params *proto.OrderSearchParams, stream proto.StreamService_OrderListServer) error {
    for i := 0; i <= 10; i++ {
        order := proto.Order{
            Id:      int32(i),
            OrderSn: time.Now().Format("20060102150405") + "order_sn",
            Date:    time.Now().Format("2006-01-02 15:04:05"),
        }
        err := stream.Send(&proto.StreamOrderList{
            Order: &order,
        })
        if err != nil {
            return err
        }
    }
    return nil
}

gRPC为咱们提供一个流的发送方法。send,这样咱们能够很简单的以流的方式传递数据。
如今咱们来查看streaming.pb.go中的sendapp

func (x *streamServiceOrderListServer) Send(m *StreamOrderList) error {
    return x.ServerStream.SendMsg(m)
}

能够看到最终是使用ServerStream.SendMsg,查看源码,能够发现,最终是使用了一个结构体。tcp

type serverStream struct {
    ctx   context.Context
    ......

    maxReceiveMessageSize int
    maxSendMessageSize    int
    ......
}

这里咱们关心两个值,最大可接收大小,最大发送大小。而再SendMsg中也有对于的大小判断,因此发送的消息大小不是无限制的。学习

// TODO(dfawley): should we be checking len(data) instead?
    if len(payload) > ss.maxSendMessageSize {
        return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
    }

咱们能够服务端建立server的时候经过server := grpc.NewServer(grpc.MaxSendMsgSize())来指定大小,有能够在客服端建立client的时候经过connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure(),grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize()))的时候来指定,具体你们能够下来本身详细了解下配置项。google

client
func orderList(ctx iris.Context) {
    stream, err := streamClient.OrderList(context.Background(), &proto.OrderSearchParams{})
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            ctx.JSON(map[string]string{
                "err": err.Error(),
            })

            return
        }
        ctx.JSON(res)
        log.Println(res)
    }
}

这里在for循环中去读取数据,直到取到一个io.EOF的结束错误占位符。

客户端流式 uploadImage
server
func (services *StreamServices) UploadFile(stream proto.StreamService_UploadFileServer) error {
    for  {
         res,err := stream.Recv()
         //接收消息结束,发送结果,并关闭
        if err == io.EOF {
            return stream.SendAndClose(&proto.UploadResponse{})
        }
        if err !=nil {
            return err
        }
        fmt.Println(res)
    }
    return nil
}

能够看到这里咱们一样使用for结合stream.Recv()来接收数据流,可是这里咱们多一个SendAndClose,表示服务器已经接收消息结束,并发生一个正确的响应给客户端。

client
func uploadImage(ctx iris.Context) {
    stream,err := streamClient.UploadFile(context.Background())
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for i:=1;i<=10 ; i++ {
        img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
        images := &proto.StreamImageList{Image:img}
        err := stream.Send(images)
        if err != nil {
            ctx.JSON(map[string]string{
                "err": err.Error(),
            })
            return
        }
    }
    //发送完毕 关闭并获取服务端返回的消息
    resp, err := stream.CloseAndRecv()
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    ctx.JSON(map[string]interface{}{"result": resp,"message":"success"})
    log.Println(resp)
}

而在客户端发送数据完毕的时候须要使用CloseAndRecv 须要接收服务端接收完毕的通知以及关闭当前通道。

双向流式 uploadImage
server
func (services *StreamServices) SumData(stream proto.StreamService_SumDataServer) error {
    i := 0
    for {
        err := stream.Send(&proto.StreamSumData{Number: int32(i)})
        if err != nil {
            return err
        }
        res, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        log.Printf("res:%d,i:%d,sum:%d\r\n", res.Number, i, int32(i)+res.Number)
        i++
    }
}

服务端在发送消息的同时,并接收服务端发送的消息。

client
func sumData(ctx iris.Context) {
    stream, err := streamClient.SumData(context.Background())
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for i := 1; i <= 10; i++ {
        err = stream.Send(&proto.StreamSumData{Number: int32(i)})
        if err == io.EOF {
            break
        }
        if err != nil {
            return
        }
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return
        }
        log.Printf("res number:%d", res.Number)
    }
    stream.CloseSend()
    return
}

上面咱们能够看到。客户端有一个执行断开链接的标识CloseSend(),而服务器没有,由于服务端断开链接是隐式的,咱们只须要退出循环便可断开链接。能够灵活的控制。

上面就是go-gRPC的流式接口。只是一个简单的例子。若有不妥的地方欢迎指出。感谢。

学习于-煎鱼

期待一块儿交流

qrcode_for_gh_60813539dc23_258.jpg

相关文章
相关标签/搜索