gRPC主要有4种请求和响应模式,分别是简单模式(Simple RPC)
、服务端流式(Server-side streaming RPC)
、客户端流式(Client-side streaming RPC)
、和双向流式(Bidirectional streaming RPC)
。html
1.简单模式(Simple RPC)
:客户端发起请求并等待服务端响应。git
2.服务端流式(Server-side streaming RPC)
:客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。 场景:.客户端要获取某原油股的实时走势,客户端发送一个请求, 服务端实时返回该股票的走势github
3.客户端流式(Client-side streaming RPC)
:与服务端数据流模式相反,此次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。情景模拟:客户端大量数据上传到服务端golang
4.双向流式(Bidirectional streaming RPC)
:双方使用读写流去发送一个消息序列,两个流独立操做,双方能够同时发送和同时接收。 情景模拟:双方对话(能够一问一答、一问多答、多问一答,形式灵活)api
从上面的定义不难看出,用stream
能够定义一个流式消息。下面咱们就经过实例来演示一下流式通讯的使用方法。服务器
1.首先/api/hello.proto
[以下], 而且生成新的api/hello.pb.go
代码。app
syntax = "proto3"; package api; // Any消息类型容许您将消息做为嵌入类型,而不须要它们 .proto定义。Any包含任意序列化的消息(字节),以及一个URL,该URL充当该消息的全局惟一标识符并解析为该消息的类型。要使用Any类型,你须要导入google/protobuf/any.proto. import "google/protobuf/any.proto"; message HelloRequest { string greeting = 1; map<string, string> infos = 2; } message HelloResponse { string reply = 1; repeated google.protobuf.Any details = 2; } service HelloService { rpc SayHello(HelloRequest) returns (HelloResponse){} rpc ListHello(HelloRequest) returns (stream HelloResponse) {} rpc SayMoreHello(stream HelloRequest) returns (HelloResponse) {} rpc SayHelloChat(stream HelloRequest) returns (stream HelloRequest) {} } message Hello { string msg = 1; } message Error { repeated string msg = 1; }
2.编译指令:tcp
protoc -ID:\Go\include -I. --go_out=plugins=grpc:. ./api/api.proto
3.在生成的代码api.hello.go中,咱们能够看到客户端接口以下:ide
type HelloServiceServer interface { SayHello(context.Context, *HelloRequest) (*HelloResponse, error) ListHello(*HelloRequest, HelloService_ListHelloServer) error SayMoreHello(HelloService_SayMoreHelloServer) error SayHelloChat(HelloService_SayHelloChatServer) error }
4.接口的实现 server/service/service.go以下:工具
package service import ( "context" "fmt" "io" "log" "time" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" api "gogrpcstream/api" ) type SayHelloServer struct{} func (s *SayHelloServer) SayHello(ctx context.Context, in *api.HelloRequest) (res *api.HelloResponse, err error) { log.Printf("Client Greeting:%s", in.Greeting) log.Printf("Client Info:%v", in.Infos) var an *any.Any if in.Infos["hello"] == "world" { an, err = ptypes.MarshalAny(&api.Hello{Msg: "Good Request"}) } else { an, err = ptypes.MarshalAny(&api.Error{Msg: []string{"Bad Request", "Wrong Info Msg"}}) } if err != nil { return } return &api.HelloResponse{ Reply: "Hello World !!", Details: []*any.Any{an}, }, nil } // 服务器端流式 RPC, 接收一次客户端请求,返回一个流 func (s *SayHelloServer) ListHello(in *api.HelloRequest, stream api.HelloService_ListHelloServer) error { log.Printf("Client Say: %v", in.Greeting) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 1"}) time.Sleep(1 * time.Second) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 2"}) time.Sleep(1 * time.Second) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 3"}) time.Sleep(1 * time.Second) return nil } // 客户端流式 RPC, 客户端流式请求,服务器可返回一次 func (s *SayHelloServer) SayMoreHello(stream api.HelloService_SayMoreHelloServer) error { // 接受客户端请求 for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("SayMoreHello Client Say: %v", req.Greeting) } // 流读取完成后,返回 return stream.SendAndClose(&api.HelloResponse{Reply: "SayMoreHello Recv Muti Greeting"}) } //双向 func (s *SayHelloServer) SayHelloChat(stream api.HelloService_SayHelloChatServer) error { n := 1 for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } err = stream.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Server Say Hello %d", n)}) if err != nil { return err } n++ log.Printf("SayHelloChat Client Say: %v", req.Greeting) } return nil }
5. server/main.go 服务端实现:
package main import ( "crypto/tls" "crypto/x509" "io/ioutil" "log" "net" "google.golang.org/grpc/credentials" "google.golang.org/grpc" api "gogrpcstream/api" sv "gogrpcstream/server/service" ) func main() { lis, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } // 加载证书和密钥 (同时能验证证书与私钥是否匹配) cert, err := tls.LoadX509KeyPair("../certs/server.pem", "../certs/server.key") if err != nil { panic(err) } // 将根证书加入证书词 // 测试证书的根若是不加入可信池,那么测试证书将视为不惋惜,没法经过验证。 certPool := x509.NewCertPool() rootBuf, err := ioutil.ReadFile("../certs/ca.pem") if err != nil { panic(err) } if !certPool.AppendCertsFromPEM(rootBuf) { panic("fail to append test ca") } tlsConf := &tls.Config{ ClientAuth: tls.RequireAndVerifyClientCert, Certificates: []tls.Certificate{cert}, ClientCAs: certPool, } serverOpt := grpc.Creds(credentials.NewTLS(tlsConf)) grpcServer := grpc.NewServer(serverOpt) api.RegisterHelloServiceServer(grpcServer, &sv.SayHelloServer{}) log.Println("Server Start...") grpcServer.Serve(lis) }
6.客服端实现:client/main.go
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "io" "io/ioutil" "log" api "gogrpcstream/api" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) func main() { cert, err := tls.LoadX509KeyPair("../certs/client.pem", "../certs/client.key") if err != nil { panic(err) } // 将根证书加入证书池 certPool := x509.NewCertPool() bs, err := ioutil.ReadFile("../certs/ca.pem") if err != nil { panic(err) } if !certPool.AppendCertsFromPEM(bs) { panic("cc") } // 新建凭证 transportCreds := credentials.NewTLS(&tls.Config{ ServerName: "localhost", Certificates: []tls.Certificate{cert}, RootCAs: certPool, }) dialOpt := grpc.WithTransportCredentials(transportCreds) conn, err := grpc.Dial("localhost:8080", dialOpt) if err != nil { log.Fatalf("Dial failed:%v", err) } defer conn.Close() client := api.NewHelloServiceClient(conn) resp1, err := client.SayHello(context.Background(), &api.HelloRequest{ Greeting: "Hello Server 1 !!", Infos: map[string]string{"hello": "world"}, }) if err != nil { log.Fatal(err) } log.Printf("SayHello Resp1:%+v", resp1) resp2, err := client.SayHello(context.Background(), &api.HelloRequest{ Greeting: "Hello Server 2 !!", }) if err != nil { log.Fatalf("%v", err) } log.Printf("SayHello Resp2:%+v", resp2) // 服务器端流式 RPC; recvListHello, err := client.ListHello(context.Background(), &api.HelloRequest{Greeting: "Hello Server List Hello"}) if err != nil { log.Fatalf("ListHello err: %v", err) } for { //Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M) resp, err := recvListHello.Recv() if err == io.EOF { break } if err != nil { log.Fatal(err) } log.Printf("ListHello Server Resp: %v", resp.Reply) } //能够使用CloseSend()关闭stream,这样服务端就不会继续产生流消息 //调用CloseSend()后,若继续调用Recv(),会从新激活stream,接着以前结果获取消息 // 客户端流式 RPC; sayMoreClient, err := client.SayMoreHello(context.Background()) if err != nil { log.Fatal(err) } for i := 0; i < 3; i++ { sayMoreClient.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayMoreHello Hello Server %d", i)}) } //关闭流并获取返回的消息 sayMoreResp, err := sayMoreClient.CloseAndRecv() if err != nil { log.Fatal(err) } log.Printf("SayMoreHello Server Resp: %v", sayMoreResp.Reply) // 双向流式 RPC; sayHelloChat, err := client.SayHelloChat(context.Background()) if err != nil { log.Fatal(err) } for i := 0; i < 3; i++ { err = sayHelloChat.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Hello Server %d", i)}) if err != nil { log.Fatalf("stream request err: %v", err) } res, err := sayHelloChat.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("SayHelloChat get stream err: %v", err) } // 打印返回值 log.Printf("SayHelloChat Server Say: %v", res.Greeting) } }
8.运行结果以下:
D:\GoProject\src\gogrpcstream\server>go run main.go 2021/01/05 17:19:12 Server Start... 2021/01/05 17:20:00 Client Greeting:Hello Server 1 !! 2021/01/05 17:20:00 Client Info:map[hello:world] 2021/01/05 17:20:00 Client Greeting:Hello Server 2 !! 2021/01/05 17:20:00 Client Info:map[] 2021/01/05 17:20:00 Client Say: Hello Server List Hello 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 0 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 1 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 2 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 0 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 1 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 2
D:\GoProject\src\gogrpcstream\client>go run main.go 2021/01/05 17:20:00 SayHello Resp1:reply:"Hello World !!" details:{[type.googleapis.com/api.Hello]:{msg:"Good Request"}} 2021/01/05 17:20:00 SayHello Resp2:reply:"Hello World !!" details:{[type.googleapis.com/api.Error]:{msg:"Bad Request" msg:"Wrong Info Msg"}} 2021/01/05 17:20:00 ListHello Server Resp: ListHello Reply Hello Server List Hello 1 2021/01/05 17:20:01 ListHello Server Resp: ListHello Reply Hello Server List Hello 2 2021/01/05 17:20:02 ListHello Server Resp: ListHello Reply Hello Server List Hello 3 2021/01/05 17:20:03 SayMoreHello Server Resp: SayMoreHello Recv Muti Greeting 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 1 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 2 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 3
证书 能够利用MySSL测试证书生成工具生成两张证书 也能够用openssl来实现。
下载地址 https://github.com/dz45693/gogrpcstrem.git
参考: