【.NET Core项目实战-统一认证平台】开篇及目录索引html
RPC是“远程调用(Remote Procedure Call)”的一个名称的缩写,并非任何规范化的协议,也不是大众都认知的协议标准,咱们更多时候使用时都是建立的自定义化(例如Socket,Netty)的消息方式进行调用,相比http协议,咱们省掉了很多http中无用的消息内容。所以不少系统内部调用仍然采用自定义化的RPC调用模式进行通讯,毕竟速度和性能是内网的关键指标之一,而标准化和语义无关性在外网中举足轻重。因此,为什么API网关没法工做在RPC上,由于它没有一个像HTTP/HTTPS那样的通用标准。nginx
CzarRpc是做者基于Dotnetty实现的RPC通信框架,参考了Surging
和Tars.Net
优秀设计,目前正在内部使用中,下面就CzarRpc调用方式作一个简单介绍,测试结构以下:
git
一、服务接口github
新建一个Czar.Rpc.Common
类库,首先须要引用Czar.Rpc
Nuget包。web
Install-Package Czar.Rpc
而后定义测试接口IHelloRpc.cs
,也是目前支持的调用方式。sql
using Czar.Rpc.Attributes; using Czar.Rpc.Exceptions; using Czar.Rpc.Metadata; using System; using System.Collections.Generic; using System.Threading.Tasks; namespace Czar.Rpc.Common { /// <summary> /// 测试Rpc实体 /// </summary> [BusinessExceptionInterceptor] [CzarRpc("Demo.Rpc.Hello")] public interface IHelloRpc: IRpcBaseService { string Hello(int no, string name); void HelloHolder(int no, out string name); Task<string> HelloTask(int no, string name); ValueTask<string> HelloValueTask(int no, string name); [CzarOneway] void HelloOneway(int no, string name); Task TestBusinessExceptionInterceptor(); DemoModel HelloModel(int D1, string D2, DateTime D3); Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3); DemoModel HelloSendModel(DemoModel model); DemoModel HelloSendModelParm(string name,DemoModel model); List<DemoModel> HelloSendModelList(List<DemoModel> model); } public class DemoModel { /// <summary> /// 测试1 /// </summary> public int T1 { get; set; } /// <summary> /// 测试2 /// </summary> public string T2 { get; set; } /// <summary> /// 测试3 /// </summary> public DateTime T3 { get; set; } public ChildModel Child { get; set; } } public class ChildModel { public string C1 { get; set; } } }
2.服务端json
新建一个控制台程序Czar.Rpc.Server
,而后实现服务接口,由于都是测试数据,因此就随意实现了方法。c#
HelloRpcServer.cs
后端
using Czar.Rpc.Exceptions; using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Linq; using System.Net; using Czar.Rpc.Common; namespace Demo.Rpc.Server { public class HelloRpcServer: IHelloRpc { public EndPoint CzarEndPoint { get; set; } public string Hello(int no, string name) { string result = $"{no}: Hi, {name}"; Console.WriteLine(result); return result + " callback"; } public void HelloHolder(int no, out string name) { name = no.ToString() + " out"; } public void HelloOneway(int no, string name) { /* 耗时操做 */ Console.WriteLine($"From oneway - {no}: Hi, {name}"); } public Task<string> HelloTask(int no, string name) { return Task.FromResult(Hello(no, name)); } public ValueTask<string> HelloValueTask(int no, string name) { return new ValueTask<string>(Hello(no, name)); } public Task TestBusinessExceptionInterceptor() { throw new BusinessException() { CzarCode = "1", CzarMessage = "test" }; } public DemoModel HelloModel(int D1, string D2, DateTime D3) { return new DemoModel() { T1 = D1 + 1, T2 = D2 + "2", T3 = D3.AddDays(1) }; } public async Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3) { return await Task.FromResult( new DemoModel() { T1 = D1 + 1, T2 = D2 + "77777", T3 = D3.AddDays(1) } ); } public DemoModel HelloSendModel(DemoModel model) { model.T1 = model.T1 + 10; model.T2 = model.T2 + "11"; model.T3 = model.T3.AddDays(12); return model; } public DemoModel HelloSendModelParm(string name, DemoModel model) { model.T1 = model.T1 + 10; model.T2 = model.T2 + "11"; model.T3 = model.T3.AddDays(12); if (model.Child != null) { model.Child.C1 = name+"说:"+ model.Child.C1; } return model; } public List<DemoModel> HelloSendModelList(List<DemoModel> model) { return model.Select(t => new DemoModel() { T1=t.T1+10,T2=t.T2+"13",T3=t.T3.AddYears(1),Child=t.Child }).ToList(); } } }
而后启动服务端监听。缓存
class Program { static void Main(string[] args) { var host = new HostBuilder() .ConfigureHostConfiguration(i => i.AddJsonFile("CzarConfig.json")) .ConfigureLogging((hostContext, configLogging) => { configLogging.AddConsole(); }) .UseCodec<JsonCodec>() .UseLibuvTcpHost() .UseProxy() .UseConsoleLifetime() .Build(); host.RunAsync().Wait(); } }
启用外部使用CzarConfig.json的配置文件,注意须要设置成始终复制。
{ "CzarHost": { "Port": 7711, //监听端口 "QuietPeriodSeconds": 2, //退出静默时间 DotNetty特性 "ShutdownTimeoutSeconds": 2, //关闭超时时间 DotNetty特性 "IsSsl": "false", //是否启用 SSL, 客户端须要保持一致 "PfxPath": "cert/datasync.pfx", //证书 "PfxPassword": "123456" //证书密钥 } }
到此服务器端搭载完成。
三、客户端
新建客户端控制台程序Czar.Rpc.Client
,而后配置Rpc调用信息。
{ "CzarHost": { "ProxyEndPoint": true, //是否启用动态服务地址,就是指定服务端IP "IsSsl": "false", //是否启用SSL "PfxPath": "cert/datasync.pfx", //证书 "PfxPassword": "123456", //证书密钥 "ClientConfig": { //客户端配置 "Demo.Rpc.Hello": { //对应服务[CzarRpc("Demo.Rpc.Hello")] 值 "Host": "127.0.0.1", //服务端IP 若是ProxyEndPoint=false 时使用 "Port": 7711, //服务端端口 若是ProxyEndPoint=false 时使用 "Timeout": 10, //调用超时时间 "WriterIdleTimeSeconds";30 //空闲超时时间,默认为30秒,非内网环境建议设置成5分钟内。 } } } }
如今开始启用客户端信息。
class Program { public static IServiceProvider service; public static IConfiguration config; static async Task Main(string[] args) { try { var builder = new ConfigurationBuilder(); config = builder.AddJsonFile("CzarConfig.json").Build(); service = new ServiceCollection() .AddSingleton(config) .AddLogging(j => j.AddConsole()) .AddLibuvTcpClient(config) .AddProxy() .BuildDynamicProxyServiceProvider(); var rpc = service.GetRequiredService<IHelloRpc>(); //使用的内部指定的服务器地址 rpc.CzarEndPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 7711); var result = string.Empty; string t = "基本调用"; result = rpc.Hello(18, t); Console.WriteLine(result); result = "无返回结果"; rpc.HelloHolder(1, out result); Console.WriteLine(result); result = await rpc.HelloTask(2, "异步任务"); Console.WriteLine(result); result = "单向"; rpc.HelloOneway(3, "单向调用"); Console.WriteLine(result); result = await rpc.HelloValueTask(4, "ValueTask任务"); Console.WriteLine(result); var modelResult = rpc.HelloModel(5, "返回实体", DateTime.Now); Console.WriteLine($"{modelResult.T1} {modelResult.T2} {modelResult.T3.ToLongDateString()}"); var modelResult1 = await rpc.HelloModelAsync(6, "返回Task实体", DateTime.Now); Console.WriteLine($"{modelResult1.T1} {modelResult1.T2} {modelResult1.T3.ToLongDateString()}"); var mm = new DemoModel() { T1 = 7, T2 = "传实体返回实体", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子类1" } }; var model2 = rpc.HelloSendModel(mm); Console.WriteLine($"{model2.T1} {model2.T2} {model2.T3.ToLongDateString()} {model2.Child.C1}"); var list = new List<DemoModel>(); var mm1 = new DemoModel() { T1 = 8, T2 = "传List返回List", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子类2" } }; var mm3 = new DemoModel() { T1 = 9, T2 = "传List返回List", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子类3" } }; list.Add(mm1); list.Add(mm3); var list3 = rpc.HelloSendModelList(list); Console.WriteLine($"{list3[0].T1} {list3[0].T2} {list3[0].T3.ToLongDateString()} {list3[0].Child?.C1}"); var mm4 = new DemoModel() { T1 = 9, T2 = "HelloSendModelParm", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子类4" } }; var dd = rpc.HelloSendModelParm("HelloSendModelParm", mm4); Console.WriteLine($"{dd.T1} {dd.T2} {dd.T3.ToLongDateString()} {dd.Child.C1}"); //异常调用 await rpc.TestBusinessExceptionInterceptor(); } catch (BusinessException e) { Console.WriteLine($"CzarCode:{e.CzarCode} CzarMessage:{e.CzarMessage}"); } catch (Exception ex) { Console.WriteLine(ex); } Console.ReadLine(); } }
如今整个RPC调用搭建完毕,而后分别启动服务器端和客户端,就能够看到屏幕输出内容以下。
客户端输出:
服务器端输出:
至此整个CzarRpc的基本使用已经介绍完毕,感兴趣的朋友能够自行测试。
有了CzarRpc
的通信框架后,如今在Ocelot
上实现Rpc
功能简直易如反掌,如今开始添加咱们的Rpc
中间件,也让咱们扩展的网关灵活起来。
还记得我介绍网关篇时添加中间件的步骤吗?若是不记得的能够先回去回顾下。
首先如何让网关知道这个后端调用是http
仍是Rpc
呢?这时应该会想到Ocelot
路由配置里的DownstreamScheme
,能够在这里判断咱们定义的是http
仍是rpc
便可。同时咱们但愿以前定义的全部中间件都生效,最后一步请求时若是配置下端路由rpc
,使用rpc
调用,不然使用http
调用,这样能够重复利用以前全部的中间件功能,减小重复开发。
在以前的开发的自定义限流和自定义受权中间件开发中,咱们知道开发完的中间件放到哪里使用,这里就不介绍原理了,直接添加到BuildCzarOcelotPipeline
里以下代码。
public static OcelotRequestDelegate BuildCzarOcelotPipeline(this IOcelotPipelineBuilder builder, OcelotPipelineConfiguration pipelineConfiguration) { // 注册一个全局异常 builder.UseExceptionHandlerMiddleware(); // 若是请求是websocket使用单独的管道 builder.MapWhen(context => context.HttpContext.WebSockets.IsWebSocketRequest, app => { app.UseDownstreamRouteFinderMiddleware(); app.UseDownstreamRequestInitialiser(); app.UseLoadBalancingMiddleware(); app.UseDownstreamUrlCreatorMiddleware(); app.UseWebSocketsProxyMiddleware(); }); // 添加自定义的错误管道 builder.UseIfNotNull(pipelineConfiguration.PreErrorResponderMiddleware); //使用自定义的输出管道 builder.UseCzarResponderMiddleware(); // 下游路由匹配管道 builder.UseDownstreamRouteFinderMiddleware(); //增长自定义扩展管道 if (pipelineConfiguration.MapWhenOcelotPipeline != null) { foreach (var pipeline in pipelineConfiguration.MapWhenOcelotPipeline) { builder.MapWhen(pipeline); } } // 使用Http头部转换管道 builder.UseHttpHeadersTransformationMiddleware(); // 初始化下游请求管道 builder.UseDownstreamRequestInitialiser(); // 使用自定义限流管道 builder.UseRateLimiting(); //使用请求ID生成管道 builder.UseRequestIdMiddleware(); //使用自定义受权前管道 builder.UseIfNotNull(pipelineConfiguration.PreAuthenticationMiddleware); //根据请求判断是否启用受权来使用管道 if (pipelineConfiguration.AuthenticationMiddleware == null) { builder.UseAuthenticationMiddleware(); } else { builder.Use(pipelineConfiguration.AuthenticationMiddleware); } //添加自定义限流中间件 2018-11-18 金焰的世界 builder.UseCzarClientRateLimitMiddleware(); //添加自定义受权中间件 2018-11-15 金焰的世界 builder.UseAhphAuthenticationMiddleware(); //启用自定义的认证以前中间件 builder.UseIfNotNull(pipelineConfiguration.PreAuthorisationMiddleware); //是否使用自定义的认证中间件 if (pipelineConfiguration.AuthorisationMiddleware == null) { builder.UseAuthorisationMiddleware(); } else { builder.Use(pipelineConfiguration.AuthorisationMiddleware); } // 使用自定义的参数构建中间件 builder.UseIfNotNull(pipelineConfiguration.PreQueryStringBuilderMiddleware); // 使用负载均衡中间件 builder.UseLoadBalancingMiddleware(); // 使用下游地址建立中间件 builder.UseDownstreamUrlCreatorMiddleware(); // 使用缓存中间件 builder.UseOutputCacheMiddleware(); //判断下游的是否启用rpc通讯,切换到RPC处理 builder.MapWhen(context => context.DownstreamReRoute.DownstreamScheme.Equals("rpc", StringComparison.OrdinalIgnoreCase), app => { app.UseCzarRpcMiddleware(); }); //使用下游请求中间件 builder.UseCzaHttpRequesterMiddleware(); return builder.Build(); }
这里是在最后请求前判断使用的下游请求方式,若是DownstreamScheme
使用的rpc
,就使用rpc
中间件处理。
Rpc处理的完整逻辑是,如何从http请求中获取想要解析的参数,这里须要设置匹配的优先级,目前设计的优先级为。
一、首先提取路由参数,若是匹配上就是用路由参数名称为key,值为value,按顺序组成第一批参数。
二、提取query参数,若有有值按顺序组成第二批参数。
三、若是非Get请求,提取body内容,若是非空,组成第三批参数
四、从配置库里提取rpc路由调用的服务名称和函数名称,以及是否单向调用。
五、按照获取的数据进行rpc调用并等待返回。
看了上面的设计是否是思路很清晰了呢?
一、rpc路由表设计
CREATE TABLE AhphReRouteRpcConfig ( RpcId int IDENTITY(1,1) NOT NULL, ReRouteId int, //路由表主键 ServantName varchar(100) NOT NULL, //调用的服务名称 FuncName varchar(100) NOT NULL, //调用的方法名称 IsOneway bit NOT NULL //是否单向调用 )
二、提取远程调用方法
根据上游路由获取远程调用的配置项目
public interface IRpcRepository { /// <summary> /// 根据模板地址获取RPC请求方法 /// </summary> /// <param name="UpUrl">上游模板</param> /// <returns></returns> Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl); } public class SqlServerRpcRepository : IRpcRepository { private readonly CzarOcelotConfiguration _option; public SqlServerRpcRepository(CzarOcelotConfiguration option) { _option = option; } /// <summary> /// 获取RPC调用方法 /// </summary> /// <param name="UpUrl"></param> /// <returns></returns> public async Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"select T4.* from AhphGlobalConfiguration t1 inner join AhphConfigReRoutes T2 on T1.AhphId=t2.AhphId inner join AhphReRoute T3 on T2.ReRouteId=T3.ReRouteId INNER JOIN AhphReRouteRpcConfig T4 ON T3.ReRouteId=T4.ReRouteId where IsDefault=1 and T1.InfoStatus=1 AND T3.InfoStatus=1 AND UpstreamPathTemplate=@URL"; var result = await connection.QueryFirstOrDefaultAsync<RemoteInvokeMessage>(sql, new { URL = UpUrl }); return result; } } }
三、重写返回结果
因为rpc调用后是返回的Json封装的信息,须要解析成对应的HttpContent。
using System.IO; using System.Net; using System.Net.Http; using System.Threading.Tasks; namespace Czar.Gateway.Rpc { public class RpcHttpContent : HttpContent { private string result; public RpcHttpContent(string result) { this.result = result; } public RpcHttpContent(object result) { this.result = Newtonsoft.Json.JsonConvert.SerializeObject(result); } protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) { var writer = new StreamWriter(stream); await writer.WriteAsync(result); await writer.FlushAsync(); } protected override bool TryComputeLength(out long length) { length = result.Length; return true; } } }
四、rpc中间件逻辑处理
有了前面的准备信息,如今基本能够完成逻辑代码的开发了,详细的中间件代码以下。
using Czar.Gateway.Errors; using Czar.Rpc.Clients; using Ocelot.Logging; using Ocelot.Middleware; using Ocelot.Responses; using System.Collections.Generic; using System.Net; using System.Threading.Tasks; namespace Czar.Gateway.Rpc.Middleware { public class CzarRpcMiddleware : OcelotMiddleware { private readonly OcelotRequestDelegate _next; private readonly IRpcClientFactory _clientFactory; private readonly ICzarRpcProcessor _czarRpcProcessor; public CzarRpcMiddleware(OcelotRequestDelegate next, IRpcClientFactory clientFactory, IOcelotLoggerFactory loggerFactory, ICzarRpcProcessor czarRpcProcessor) : base(loggerFactory.CreateLogger<CzarRpcMiddleware>()) { _next = next; _clientFactory = clientFactory; _czarRpcProcessor = czarRpcProcessor; } public async Task Invoke(DownstreamContext context) { var httpStatusCode = HttpStatusCode.OK; var _param = new List<object>(); //一、提取路由参数 var tmpInfo = context.TemplatePlaceholderNameAndValues; if (tmpInfo != null && tmpInfo.Count > 0) { foreach (var tmp in tmpInfo) { _param.Add(tmp.Value); } } //二、提取query参数 foreach (var _q in context.HttpContext.Request.Query) { _param.Add(_q.Value.ToString()); } //三、从body里提取内容 if (context.HttpContext.Request.Method.ToUpper() != "GET") { context.DownstreamRequest.Scheme = "http"; var requert = context.DownstreamRequest.ToHttpRequestMessage(); if (requert.Content!=null) { var json = "{}"; json = await requert.Content.ReadAsStringAsync(); _param.Add(json); } } //从缓存里提取 var req = await _czarRpcProcessor.GetRemoteMethodAsync(context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue); if (req != null) { req.Parameters = _param.ToArray(); var result = await _clientFactory.SendAsync(req, GetEndPoint(context.DownstreamRequest.Host, context.DownstreamRequest.Port)); OkResponse<RpcHttpContent> httpResponse; if (result.CzarCode == Czar.Rpc.Utilitys.RpcStatusCode.Success) { httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result.CzarResult?.ToString())); } else { httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result)); } context.HttpContext.Response.ContentType = "application/json"; context.DownstreamResponse = new DownstreamResponse(httpResponse.Data, httpStatusCode, httpResponse.Data.Headers, "OK"); } else {//输出错误 var error = new InternalServerError($"请求路由 {context.HttpContext.Request.Path}未配置后端转发"); Logger.LogWarning($"{error}"); SetPipelineError(context, error); } } private EndPoint GetEndPoint(string ipaddress, int port) { if (IPAddress.TryParse(ipaddress, out IPAddress ip)) { return new IPEndPoint(ip, port); } else { return new DnsEndPoint(ipaddress, port); } } } }
五、启动Rpc客户端配置
目前Rpc的客户端配置咱们还没启动,只须要在AddCzarOcelot
中添加相关注入便可。
var service = builder.First(x => x.ServiceType == typeof(IConfiguration)); var configuration = (IConfiguration)service.ImplementationInstance; //Rpc应用 builder.AddSingleton<ICzarRpcProcessor, CzarRpcProcessor>(); builder.AddSingleton<IRpcRepository, SqlServerRpcRepository>(); builder.AddLibuvTcpClient(configuration);
六、配置客户端
最后别忘了配置Rpc客户端信息是否启用证书信息,为了配置信息的内容。
{ "CzarHost": { "ProxyEndPoint": true, "IsSsl": "false", "PfxPath": "cert/datasync.pfx", "PfxPassword": "bl123456", "ClientConfig": { "Demo.Rpc.Hello": { "Host": "127.0.0.1", "Port": 7711, "Timeout": 20 } } } }
如今让网关集成Rpc功能所有配置完毕。
本次测试我在原有的网关基础上,增长不一样类型的Rpc调用,就按照不一样维度测试Rpc调用功能,本次测试案例是创建在Czar.Rpc 服务端基础上,正好能够测试。
一、测试路由参数
请求路径/hello/{no}/{name}
,调用的服务端方法Hello
,传入的两个参数分别是no ,name
。
能够在服务器端添加断点调试,发现确实接收到请求信息,并正常返回,下面是PostMan
测试结果。
二、使用Query方式传递参数
请求路径/rpc/query
,调用的服务端方法仍是Hello
,参数分别是no ,name
。
三、使用Post方式传递Json
请求路径/rpc/body
,调用的服务器方法是HelloSendModel
。
四、混合参数使用
请求的路径/rpc/bodyparm/{name}
,调用的服务器端方法是HelloSendModelParm
。
全部的返回结果可自行调试测试,发现都能达到预期结果。
同时此网关仍是支持默认的http请求的,这里就不一一测试了。
本篇我介绍了什么是Rpc,以及Czar.Rpc的基本使用,而后使用Czar.Rpc框架集成到咱们基于Ocelot扩展网关中,并实现了不能方式的Rpc调用,能够在几乎不改变现有流程的状况下很快速的集成进去,这也是Ocelot开发框架的魅力所在。
若是在使用过程当中有什么问题或建议,能够在.NET Core项目实战交流群(637326624)
中联系做者。
最后本文涉及的全部的源代码可在https://github.com/jinyancao/czar.gateway中下载预览。