Go gRPC流模式的实现和TLS加密通讯

gRPC主要有4种请求和响应模式,分别是简单模式(Simple RPC)服务端流式(Server-side streaming RPC)客户端流式(Client-side streaming RPC)、和双向流式(Bidirectional streaming RPC)html

1.简单模式(Simple RPC):客户端发起请求并等待服务端响应。git

2.服务端流式(Server-side streaming RPC):客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。 场景:.客户端要获取某原油股的实时走势,客户端发送一个请求, 服务端实时返回该股票的走势github

3.客户端流式(Client-side streaming RPC):与服务端数据流模式相反,此次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。情景模拟:客户端大量数据上传到服务端golang

4.双向流式(Bidirectional streaming RPC):双方使用读写流去发送一个消息序列,两个流独立操做,双方能够同时发送和同时接收。 情景模拟:双方对话(能够一问一答、一问多答、多问一答,形式灵活)api

从上面的定义不难看出,用stream能够定义一个流式消息。下面咱们就经过实例来演示一下流式通讯的使用方法。服务器

1.首先/api/hello.proto [以下], 而且生成新的api/hello.pb.go代码。app

syntax = "proto3";
 
package api;
// Any消息类型容许您将消息做为嵌入类型,而不须要它们 .proto定义。Any包含任意序列化的消息(字节),以及一个URL,该URL充当该消息的全局惟一标识符并解析为该消息的类型。要使用Any类型,你须要导入google/protobuf/any.proto.
import "google/protobuf/any.proto";
 
message HelloRequest {
  string greeting = 1;
  map<string, string> infos  = 2;
}
 
message HelloResponse {
  string reply = 1;
  repeated google.protobuf.Any details = 2;
}
 
service HelloService {
  rpc SayHello(HelloRequest) returns (HelloResponse){}
  rpc ListHello(HelloRequest) returns (stream HelloResponse) {}
  rpc SayMoreHello(stream HelloRequest) returns (HelloResponse) {}
  rpc SayHelloChat(stream HelloRequest) returns (stream HelloRequest) {}
}
 
message Hello {
    string msg = 1;
}
 
message Error {
    repeated string msg = 1;
}

2.编译指令:tcp

protoc -ID:\Go\include -I. --go_out=plugins=grpc:. ./api/api.proto

3.在生成的代码api.hello.go中,咱们能够看到客户端接口以下:ide

type HelloServiceServer interface {
    SayHello(context.Context, *HelloRequest) (*HelloResponse, error)
    ListHello(*HelloRequest, HelloService_ListHelloServer) error
    SayMoreHello(HelloService_SayMoreHelloServer) error
    SayHelloChat(HelloService_SayHelloChatServer) error
}

4.接口的实现 server/service/service.go以下:工具

package service
 
import (
    "context"
    "fmt"
    "io"
    "log"
    "time"
 
    "github.com/golang/protobuf/ptypes"
    "github.com/golang/protobuf/ptypes/any"
 
    api "gogrpcstream/api"
)
 
type SayHelloServer struct{}
 
func (s *SayHelloServer) SayHello(ctx context.Context, in *api.HelloRequest) (res *api.HelloResponse, err error) {
    log.Printf("Client Greeting:%s", in.Greeting)
    log.Printf("Client Info:%v", in.Infos)
 
    var an *any.Any
    if in.Infos["hello"] == "world" {
        an, err = ptypes.MarshalAny(&api.Hello{Msg: "Good Request"})
    } else {
        an, err = ptypes.MarshalAny(&api.Error{Msg: []string{"Bad Request", "Wrong Info Msg"}})
    }
 
    if err != nil {
        return
    }
    return &api.HelloResponse{
        Reply:   "Hello World !!",
        Details: []*any.Any{an},
    }, nil
}
 
// 服务器端流式 RPC, 接收一次客户端请求,返回一个流
func (s *SayHelloServer) ListHello(in *api.HelloRequest, stream api.HelloService_ListHelloServer) error {
    log.Printf("Client Say: %v", in.Greeting)
 
    stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 1"})
    time.Sleep(1 * time.Second)
    stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 2"})
    time.Sleep(1 * time.Second)
    stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 3"})
    time.Sleep(1 * time.Second)
    return nil
}
 
// 客户端流式 RPC, 客户端流式请求,服务器可返回一次
func (s *SayHelloServer) SayMoreHello(stream api.HelloService_SayMoreHelloServer) error {
    // 接受客户端请求
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
 
        if err != nil {
            return err
        }
 
        log.Printf("SayMoreHello Client Say: %v", req.Greeting)
    }
 
    // 流读取完成后,返回
    return stream.SendAndClose(&api.HelloResponse{Reply: "SayMoreHello Recv Muti Greeting"})
}
 
//双向
func (s *SayHelloServer) SayHelloChat(stream api.HelloService_SayHelloChatServer) error {
    n := 1
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
 
        if err != nil {
            return err
        }
        err = stream.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Server Say Hello %d", n)})
        if err != nil {
            return err
        }
        n++
        log.Printf("SayHelloChat Client Say: %v", req.Greeting)
    }
    return nil
}

5. server/main.go 服务端实现:

package main
 
import (
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "log"
    "net"
 
    "google.golang.org/grpc/credentials"
 
    "google.golang.org/grpc"
 
    api "gogrpcstream/api"
    sv "gogrpcstream/server/service"
)
 
func main() {
    lis, err := net.Listen("tcp", ":8080")
    if err != nil {
        panic(err)
    }
 
    // 加载证书和密钥 (同时能验证证书与私钥是否匹配)
    cert, err := tls.LoadX509KeyPair("../certs/server.pem", "../certs/server.key")
    if err != nil {
        panic(err)
    }
 
    // 将根证书加入证书词
    // 测试证书的根若是不加入可信池,那么测试证书将视为不惋惜,没法经过验证。
    certPool := x509.NewCertPool()
    rootBuf, err := ioutil.ReadFile("../certs/ca.pem")
    if err != nil {
        panic(err)
    }
 
    if !certPool.AppendCertsFromPEM(rootBuf) {
        panic("fail to append test ca")
    }
 
    tlsConf := &tls.Config{
        ClientAuth:   tls.RequireAndVerifyClientCert,
        Certificates: []tls.Certificate{cert},
        ClientCAs:    certPool,
    }
 
    serverOpt := grpc.Creds(credentials.NewTLS(tlsConf))
    grpcServer := grpc.NewServer(serverOpt)
 
    api.RegisterHelloServiceServer(grpcServer, &sv.SayHelloServer{})
 
    log.Println("Server Start...")
    grpcServer.Serve(lis)
}

6.客服端实现:client/main.go

package main
 
import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io"
    "io/ioutil"
    "log"
 
    api "gogrpcstream/api"
 
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
)
 
func main() {
    cert, err := tls.LoadX509KeyPair("../certs/client.pem", "../certs/client.key")
    if err != nil {
        panic(err)
    }
 
    // 将根证书加入证书池
    certPool := x509.NewCertPool()
    bs, err := ioutil.ReadFile("../certs/ca.pem")
    if err != nil {
        panic(err)
    }
 
    if !certPool.AppendCertsFromPEM(bs) {
        panic("cc")
    }
 
    // 新建凭证
    transportCreds := credentials.NewTLS(&tls.Config{
        ServerName:   "localhost",
        Certificates: []tls.Certificate{cert},
        RootCAs:      certPool,
    })
 
    dialOpt := grpc.WithTransportCredentials(transportCreds)
 
    conn, err := grpc.Dial("localhost:8080", dialOpt)
    if err != nil {
        log.Fatalf("Dial failed:%v", err)
    }
    defer conn.Close()
 
    client := api.NewHelloServiceClient(conn)
    resp1, err := client.SayHello(context.Background(), &api.HelloRequest{
        Greeting: "Hello Server 1 !!",
        Infos:    map[string]string{"hello": "world"},
    })
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("SayHello Resp1:%+v", resp1)
 
    resp2, err := client.SayHello(context.Background(), &api.HelloRequest{
        Greeting: "Hello Server 2 !!",
    })
    if err != nil {
        log.Fatalf("%v", err)
    }
    log.Printf("SayHello Resp2:%+v", resp2)
 
    // 服务器端流式 RPC;
    recvListHello, err := client.ListHello(context.Background(), &api.HelloRequest{Greeting: "Hello Server List Hello"})
    if err != nil {
        log.Fatalf("ListHello err: %v", err)
    }
 
    for {
        //Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M)
        resp, err := recvListHello.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
 
        log.Printf("ListHello Server Resp: %v", resp.Reply)
    }
    //能够使用CloseSend()关闭stream,这样服务端就不会继续产生流消息
    //调用CloseSend()后,若继续调用Recv(),会从新激活stream,接着以前结果获取消息
 
    // 客户端流式 RPC;
    sayMoreClient, err := client.SayMoreHello(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    for i := 0; i < 3; i++ {
        sayMoreClient.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayMoreHello Hello Server %d", i)})
    }
    //关闭流并获取返回的消息
    sayMoreResp, err := sayMoreClient.CloseAndRecv()
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("SayMoreHello Server Resp: %v", sayMoreResp.Reply)
 
    // 双向流式 RPC;
    sayHelloChat, err := client.SayHelloChat(context.Background())
    if err != nil {
        log.Fatal(err)
    }
 
    for i := 0; i < 3; i++ {
        err = sayHelloChat.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Hello Server %d", i)})
        if err != nil {
            log.Fatalf("stream request err: %v", err)
        }
        res, err := sayHelloChat.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("SayHelloChat get stream err: %v", err)
        }
        // 打印返回值
        log.Printf("SayHelloChat Server Say: %v", res.Greeting)
 
    }
 
}

8.运行结果以下:

D:\GoProject\src\gogrpcstream\server>go run main.go
2021/01/05 17:19:12 Server Start...
2021/01/05 17:20:00 Client Greeting:Hello Server 1 !!
2021/01/05 17:20:00 Client Info:map[hello:world]
2021/01/05 17:20:00 Client Greeting:Hello Server 2 !!
2021/01/05 17:20:00 Client Info:map[]
2021/01/05 17:20:00 Client Say: Hello Server List Hello
2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 0
2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 1
2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 2
2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 0
2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 1
2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 2
D:\GoProject\src\gogrpcstream\client>go run main.go
2021/01/05 17:20:00 SayHello Resp1:reply:"Hello World !!"  details:{[type.googleapis.com/api.Hello]:{msg:"Good Request"}}
2021/01/05 17:20:00 SayHello Resp2:reply:"Hello World !!"  details:{[type.googleapis.com/api.Error]:{msg:"Bad Request"  msg:"Wrong Info Msg"}}
2021/01/05 17:20:00 ListHello Server Resp: ListHello Reply Hello Server List Hello 1
2021/01/05 17:20:01 ListHello Server Resp: ListHello Reply Hello Server List Hello 2
2021/01/05 17:20:02 ListHello Server Resp: ListHello Reply Hello Server List Hello 3
2021/01/05 17:20:03 SayMoreHello Server Resp: SayMoreHello Recv Muti Greeting
2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 1
2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 2
2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 3

证书 能够利用MySSL测试证书生成工具生成两张证书 也能够用openssl来实现。

下载地址 https://github.com/dz45693/gogrpcstrem.git

参考:

https://razeencheng.com/post/how-to-use-grpc-in-golang-03

相关文章
相关标签/搜索