在前一文 《ASP.NET Core 3.0 使用gRPC》中有提到 gRPC 支持双向流调用,支持实时推送消息,这也是 gRPC的一大特色,且 gRPC 在对双向流的控制支持上也是很是强大的。html
gRPC 有四种服务类型,分别是:简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Client streaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。它们主要有如下特色:java
服务类型 | 特色 |
---|---|
简单 RPC | 通常的rpc调用,传入一个请求对象,返回一个返回对象 |
服务端流式 RPC | 传入一个请求对象,服务端能够返回多个结果对象 |
客户端流式 RPC | 客户端传入多个请求对象,服务端返回一个结果对象 |
双向流式 RPC | 结合客户端流式RPC和服务端流式RPC,能够传入多个请求对象,返回多个结果对象 |
gRPC 通讯是基于 HTTP/2 实现的,它的双向流映射到 HTTP/2 流。HTTP/2 具备流的概念,流是为了实现HTTP/2的多路复用。流是服务器和客户端在HTTP/2链接内用于交换帧数据的独立双向序列,逻辑上可看作一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程;一个业务处理单元,在一个流内进行处理完毕,这个流生命周期完结。git
特色以下:github
摘自 HTTP/2笔记之流和多路复用 by 聂永服务器
咱们在前文中编写的RPC属于简单RPC,没有包含流调用,下面直接讲一下双向流,根据第二小节列举的四种服务类型,若是咱们掌握了简单RPC和双向流RPC,那么服务端流式 RPC和客户端流式 RPC天然也就没有问题了。dom
这里咱们继续使用前文的代码,要实现的目标是一次给多个猫洗澡。async
① 首先在 LuCat.proto
定义两个rpc,一个 Count 用于统计猫的数量,一个 双向流 RPC BathTheCat 用于给猫洗澡ide
syntax = "proto3"; option csharp_namespace = "AspNetCoregRpcService"; import "google/protobuf/empty.proto"; package LuCat; //定义包名 //定义服务 service LuCat{ //定义给猫洗澡双向流rpc rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp); //定义统计猫数量简单rpc rpc Count(google.protobuf.Empty) returns (CountCatResult); } message SuckingCatResult{ string message=1; } message BathTheCatReq{ int32 id=1; } message BathTheCatResp{ string message=1; } message CountCatResult{ int32 Count=1; }
② 添加服务的实现google
这里安利下Resharper,很是方便spa
private readonly ILogger<LuCatService> _logger; private static readonly List<string> Cats=new List<string>(){"英短银渐层","英短金渐层","美短","蓝猫","狸花猫","橘猫"}; private static readonly Random Rand=new Random(DateTime.Now.Millisecond); public LuCatService(ILogger<LuCatService> logger) { _logger = logger; } public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context) { var bathQueue=new Queue<int>(); while (await requestStream.MoveNext()) { //将要洗澡的猫加入队列 bathQueue.Enqueue(requestStream.Current.Id); _logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue."); } //遍历队列开始洗澡 while (bathQueue.TryDequeue(out var catId)) { await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" }); await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果 } } public override Task<CountCatResult> Count(Empty request, ServerCallContext context) { return Task.FromResult(new CountCatResult() { Count = Cats.Count }); }
BathTheCat 方法会接收多个客户端发来的CatId,而后将他们加入队列中,等客户端发送完成后开始依次洗澡并返回给客户端。
③ 客户端实现
随机发送10个猫Id给服务端,而后接收10个洗澡结果。
var channel = GrpcChannel.ForAddress("https://localhost:5001"); var catClient = new LuCat.LuCatClient(channel); //获取猫总数 var catCount = await catClient.CountAsync(new Empty()); Console.WriteLine($"一共{catCount.Count}只猫。"); var rand = new Random(DateTime.Now.Millisecond); var bathCat = catClient.BathTheCat(); //定义接收吸猫响应逻辑 var bathCatRespTask = Task.Run(async() => { await foreach (var resp in bathCat.ResponseStream.ReadAllAsync()) { Console.WriteLine(resp.Message); } }); //随机给10个猫洗澡 for (int i = 0; i < 10; i++) { await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)}); } //发送完毕 await bathCat.RequestStream.CompleteAsync(); Console.WriteLine("客户端已发送完10个须要洗澡的猫id"); Console.WriteLine("接收洗澡结果:"); //开始接收响应 await bathCatRespTask; Console.WriteLine("洗澡完毕");
④ 运行
能够看到双向流调用成功,客户端发送了10个猫洗澡请求对象,服务端返回了10个猫洗澡结果对象。且是实时推送的,这就是 gRPC 的双向流调用。
这里你们能够自行改进来演变成客户端流式或者服务端流式调用。客户端发送一个猫Id列表,而后服务端返回每一个猫洗澡结果,这就是服务端流式调用。客户端依次发送猫Id,而后服务端一次性返回全部猫的洗澡结果(给全部猫洗澡看作是一个业务,返回这个业务的结果),就是客户端流式调用。这里我就再也不演示了。
gRPC 的流式调用支持对流进行主动取消的控制,进而能够衍生出流超时限制等控制。
在流式调用是,能够传一个 CancellationToken 参数,它就是咱们用来对流进行取消控制的:
改造一下咱们在第四小节的代码:
① 客户端
var cts = new CancellationTokenSource(); //指定在2.5s后进行取消操做 cts.CancelAfter(TimeSpan.FromSeconds(2.5)); var bathCat = catClient.BathTheCat(cancellationToken: cts.Token); //定义接收吸猫响应逻辑 var bathCatRespTask = Task.Run(async() => { try { await foreach (var resp in bathCat.ResponseStream.ReadAllAsync()) { Console.WriteLine(resp.Message); } } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { Console.WriteLine("Stream cancelled."); } });
② 服务端
//遍历队列开始洗澡 while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId)) { _logger.LogInformation($"Cat {catId} Dequeue."); await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" }); await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果 }
③ 运行
设置的是双向流式调用2.5s后取消流,从客户端调用结果看到,并无收到所有10个猫的洗澡返回结果,流就已经被取消了,这就是 gRPC 的流控制。
这里流式调用能够实现实时推送,服务端到客户端或者客户端到服务端短实时推送消息,可是这个和传统意义上的长链接主动推送、广播消息不同,无论你是服务端流式、客户端流式仍是双向流式,必需要由客户端进行发起,经过客户端请求来创建流通讯。
GRPC的四种服务类型 by twtydgo
HTTP/2笔记之流和多路复用 by 聂永