上篇文章我介绍了如何在网关上增长自定义客户端受权功能,从设计到编码实现,一步一步详细讲解,相信你们也掌握了自定义中间件的开发技巧了,本篇咱们将介绍如何实现自定义客户端的限流功能,来进一步完善网关的基础功能。html
.netcore项目实战交流群(637326624),有兴趣的朋友能够在群里交流讨论。mysql
限流就是为了保证网关在高并发或瞬时并发时,在服务能承受范围内,牺牲部分请求为代价,保证系统的总体可用性而作的安全策略,避免单个服务影响总体网关的服务能力。
好比网关有商品查询接口 ,能接受的极限请求是每秒100次查询,若是此时不限流,可能由于瞬时请求太大,形成服务卡死或崩溃的状况,这种状况可使用Ocelot
客户端全局限流便可知足需求,如今又有一个需求,我须要把接口开放给A公司,他们也要查询这个商品接口,这时A公司请求频率也是咱们设置的每秒100次请求,显然咱们不但愿A公司有这么高的请求频率,我只会给A公司最大每秒一次的请求,那怎么实现呢?这时咱们就没法经过Ocelot
配置限流来进行自定义控制了,这块就须要咱们增长自定义限流管道来实现功能。redis
下面咱们就该功能如何实现展开讲解,但愿你们先理解下功能需求,而后在延伸到具体实现。sql
限流这块设计表结构和关系以下。
数据库
主要有限流规则表、路由限流规则表、限流组表、限流组策略表、客户端受权限流组表、客户端白名单表组成,设计思想就是客户端请求时先检查是否在白名单,若是白名单不存在,就检查是否在限流组里,若是在限流组里校验限流的规则是什么,而后比对这个规则和当前请求次数看是否可以继续访问,若是超过限流策略直接返回429状态,不然路由到下端请求。c#
梳理下后发现流程不是很复杂,最起码实现的思路很是清晰,而后咱们就运用上篇自定义受权中间件的方式来开发咱们第二个中间件,自定义限流中间件。后端
一、功能开启配置缓存
网关应该支持自定义客户端限流中间件是否启用,由于一些小型项目是不须要对每一个客户端进行单独限流的,中型和大型项目才有可能遇到自定义配置状况,因此咱们须要在配置文件增长配置选项。在AhphOcelotConfiguration.cs
配置类中增长属性,默认不开启。安全
/// <summary> /// 金焰的世界 /// 2018-11-18 /// 是否开启自定义限流,默认不开启 /// </summary> public bool ClientRateLimit { get; set; } = false; /// <summary> /// 金焰的世界 /// 2018-11-18 /// 客户端限流缓存时间,默认30分钟 /// </summary> public int ClientRateLimitCacheTime { get; set; } = 1800;
那咱们如何把自定义的限流增长到网关流程里呢?这块咱们就须要订制本身的限流中间件。并发
二、实现客户端限流中间件
首先咱们定义一个自定义限流中间件AhphClientRateLimitMiddleware
,须要继承OcelotMiddleware
,而后咱们要实现Invoke
方法,详细代码以下。
using Ctr.AhphOcelot.Configuration; using Ctr.AhphOcelot.Errors; using Ocelot.Logging; using Ocelot.Middleware; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Ctr.AhphOcelot.RateLimit.Middleware { /// <summary> /// 金焰的世界 /// 2018-11-18 /// 自定义客户端限流中间件 /// </summary> public class AhphClientRateLimitMiddleware : OcelotMiddleware { private readonly IClientRateLimitProcessor _clientRateLimitProcessor; private readonly OcelotRequestDelegate _next; private readonly AhphOcelotConfiguration _options; public AhphClientRateLimitMiddleware(OcelotRequestDelegate next, IOcelotLoggerFactory loggerFactory, IClientRateLimitProcessor clientRateLimitProcessor, AhphOcelotConfiguration options) : base(loggerFactory.CreateLogger<AhphClientRateLimitMiddleware>()) { _next = next; _clientRateLimitProcessor = clientRateLimitProcessor; _options = options; } public async Task Invoke(DownstreamContext context) { var clientId = "client_cjy"; //使用默认的客户端 if (!context.IsError) { if (!_options.ClientRateLimit) { Logger.LogInformation($"未启用客户端限流中间件"); await _next.Invoke(context); } else { //非认证的渠道 if (!context.DownstreamReRoute.IsAuthenticated) { if (context.HttpContext.Request.Headers.Keys.Contains(_options.ClientKey)) { clientId = context.HttpContext.Request.Headers[_options.ClientKey].First(); } } else {//认证过的渠道,从Claim中提取 var clientClaim = context.HttpContext.User.Claims.FirstOrDefault(p => p.Type == _options.ClientKey); if (!string.IsNullOrEmpty(clientClaim?.Value)) { clientId = clientClaim?.Value; } } //路由地址 var path = context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue; //一、校验路由是否有限流策略 //二、校验客户端是否被限流了 //三、校验客户端是否启动白名单 //四、校验是否触发限流及计数 if (await _clientRateLimitProcessor.CheckClientRateLimitResultAsync(clientId, path)) { await _next.Invoke(context); } else { var error = new RateLimitOptionsError($"请求路由 {context.HttpContext.Request.Path}触发限流策略"); Logger.LogWarning($"路由地址 {context.HttpContext.Request.Path} 触发限流策略. {error}"); SetPipelineError(context, error); } } } else { await _next.Invoke(context); } } } }
首先咱们来分析下咱们的代码,为了知道是哪一个客户端请求了咱们网关,须要提取clientId
,分别从无需受权接口和须要受权接口两个方式提取,若是提取不到值直接给定默认值,放到全局限流里,防止绕过限流策略。而后根据客户端经过4步检验下是否容许访问(后面会介绍这4步怎么实现),若是知足限流策略直接返回限流错误提醒。
有了这个中间件,那么如何添加到Ocelot的管道里呢?上一篇介绍的很是详细,这篇我就不介绍了,自定义限流中间件扩展AhphClientRateLimitMiddlewareExtensions
,代码以下。
using Ocelot.Middleware.Pipeline; using System; using System.Collections.Generic; using System.Text; namespace Ctr.AhphOcelot.RateLimit.Middleware { /// <summary> /// 金焰的世界 /// 2018-11-18 /// 限流中间件扩展 /// </summary> public static class AhphClientRateLimitMiddlewareExtensions { public static IOcelotPipelineBuilder UseAhphAuthenticationMiddleware(this IOcelotPipelineBuilder builder) { return builder.UseMiddleware<AhphClientRateLimitMiddleware>(); } } }
有了这个中间件扩展后,咱们就在管道的合适地方加入咱们自定义的中间件。咱们添加咱们自定义的管道扩展OcelotPipelineExtensions
,而后把自定义限流中间件加入到认证以后。
//添加自定义限流中间件 2018-11-18 金焰的世界 builder.UseAhphClientRateLimitMiddleware();
如今咱们完成了网关的扩展和应用,是时候把定义的IClientRateLimitProcessor
接口实现了 ,是否是感受作一个中间件很简单呢?并且每一步都是层层关联,只要一步一步按照本身的想法往下写就能实现。
三、结合数据库实现校验及缓存
首先咱们新建AhphClientRateLimitProcessor
类来实现接口,中间增长必要的缓存和业务逻辑,详细代码以下。
using Ctr.AhphOcelot.Configuration; using Ocelot.Cache; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Ctr.AhphOcelot.RateLimit { /// <summary> /// 金焰的世界 /// 2018-11-19 /// 实现客户端限流处理器 /// </summary> public class AhphClientRateLimitProcessor : IClientRateLimitProcessor { private readonly AhphOcelotConfiguration _options; private readonly IOcelotCache<ClientRoleModel> _ocelotCache; private readonly IOcelotCache<RateLimitRuleModel> _rateLimitRuleCache; private readonly IOcelotCache<AhphClientRateLimitCounter?> _clientRateLimitCounter; private readonly IClientRateLimitRepository _clientRateLimitRepository; private static readonly object _processLocker = new object(); public AhphClientRateLimitProcessor(AhphOcelotConfiguration options,IClientRateLimitRepository clientRateLimitRepository, IOcelotCache<AhphClientRateLimitCounter?> clientRateLimitCounter, IOcelotCache<ClientRoleModel> ocelotCache, IOcelotCache<RateLimitRuleModel> rateLimitRuleCache) { _options = options; _clientRateLimitRepository = clientRateLimitRepository; _clientRateLimitCounter = clientRateLimitCounter; _ocelotCache = ocelotCache; _rateLimitRuleCache = rateLimitRuleCache; } /// <summary> /// 校验客户端限流结果 /// </summary> /// <param name="clientid">客户端ID</param> /// <param name="path">请求地址</param> /// <returns></returns> public async Task<bool> CheckClientRateLimitResultAsync(string clientid, string path) { var result = false; var clientRule = new List<AhphClientRateLimitOptions>(); //一、校验路由是否有限流策略 result = !await CheckReRouteRuleAsync(path); if (!result) {//二、校验客户端是否被限流了 var limitResult = await CheckClientRateLimitAsync(clientid, path); result = !limitResult.RateLimit; clientRule = limitResult.rateLimitOptions; } if (!result) {//三、校验客户端是否启动白名单 result = await CheckClientReRouteWhiteListAsync(clientid, path); } if (!result) {//四、校验是否触发限流及计数 result = CheckRateLimitResult(clientRule); } return result; } /// <summary> /// 检验是否启用限流规则 /// </summary> /// <param name="path">请求地址</param> /// <returns></returns> private async Task<bool> CheckReRouteRuleAsync(string path) { var region = _options.RedisKeyPrefix + "CheckReRouteRuleAsync"; var key = region + path; var cacheResult = _ocelotCache.Get(key, region); if (cacheResult != null) {//提取缓存数据 return cacheResult.Role; } else {//从新获取限流策略 var result = await _clientRateLimitRepository.CheckReRouteRuleAsync(path); _ocelotCache.Add(key, new ClientRoleModel() { CacheTime = DateTime.Now, Role = result }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region); return result; } } /// <summary> /// 校验客户端限流规则 /// </summary> /// <param name="clientid">客户端ID</param> /// <param name="path">请求地址</param> /// <returns></returns> private async Task<(bool RateLimit, List<AhphClientRateLimitOptions> rateLimitOptions)> CheckClientRateLimitAsync(string clientid, string path) { var region = _options.RedisKeyPrefix + "CheckClientRateLimitAsync"; var key = region + clientid + path; var cacheResult = _rateLimitRuleCache.Get(key, region); if (cacheResult != null) {//提取缓存数据 return (cacheResult.RateLimit, cacheResult.rateLimitOptions); } else {//从新获取限流策略 var result = await _clientRateLimitRepository.CheckClientRateLimitAsync(clientid, path); _rateLimitRuleCache.Add(key, new RateLimitRuleModel() { RateLimit=result.RateLimit, rateLimitOptions=result.rateLimitOptions }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region); return result; } } /// <summary> /// 校验是否设置了路由白名单 /// </summary> /// <param name="clientid">客户端ID</param> /// <param name="path">请求地址</param> /// <returns></returns> private async Task<bool> CheckClientReRouteWhiteListAsync(string clientid, string path) { var region = _options.RedisKeyPrefix + "CheckClientReRouteWhiteListAsync"; var key = region +clientid+ path; var cacheResult = _ocelotCache.Get(key, region); if (cacheResult != null) {//提取缓存数据 return cacheResult.Role; } else {//从新获取限流策略 var result = await _clientRateLimitRepository.CheckClientReRouteWhiteListAsync(clientid,path); _ocelotCache.Add(key, new ClientRoleModel() { CacheTime = DateTime.Now, Role = result }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region); return result; } } /// <summary> /// 校验完整的限流规则 /// </summary> /// <param name="rateLimitOptions">限流配置</param> /// <returns></returns> private bool CheckRateLimitResult(List<AhphClientRateLimitOptions> rateLimitOptions) { bool result = true; if (rateLimitOptions != null && rateLimitOptions.Count > 0) {//校验策略 foreach (var op in rateLimitOptions) { AhphClientRateLimitCounter counter = new AhphClientRateLimitCounter(DateTime.UtcNow, 1); //分别对每一个策略校验 var enablePrefix = _options.RedisKeyPrefix + "RateLimitRule"; var key = AhphOcelotHelper.ComputeCounterKey(enablePrefix, op.ClientId, op.Period, op.RateLimitPath); var periodTimestamp = AhphOcelotHelper.ConvertToSecond(op.Period); lock (_processLocker) { var rateLimitCounter = _clientRateLimitCounter.Get(key, enablePrefix); if (rateLimitCounter.HasValue) {//提取当前的计数状况 // 请求次数增加 var totalRequests = rateLimitCounter.Value.TotalRequests + 1; // 深拷贝 counter = new AhphClientRateLimitCounter(rateLimitCounter.Value.Timestamp, totalRequests); } else {//写入限流策略 _clientRateLimitCounter.Add(key, counter,TimeSpan.FromSeconds(periodTimestamp), enablePrefix); } } if (counter.TotalRequests > op.Limit) {//更新请求记录,并标记为失败 result = false; } if (counter.TotalRequests > 1 && counter.TotalRequests <= op.Limit) {//更新缓存配置信息 //获取限流剩余时间 var cur = (int)(counter.Timestamp.AddSeconds(periodTimestamp) - DateTime.UtcNow).TotalSeconds; _clientRateLimitCounter.Add(key, counter, TimeSpan.FromSeconds(cur), enablePrefix); } } } return result; } } }
咱们来分析下这块代码,里面涉及了限流的提取和实现规则,首先咱们注入了数据库实体接口和缓存信息,实现步骤是参照以前的流程。
主要流程以下:
一、路由是否启用限流,若是未启用直接完成校验,若是进行第2步判断.
二、客户端对应的路由是否设置了限流规则,若是未设置,直接完成校验,不然进入第3步判断.
三、客户端是否开启了路由白名单功能,若是开启了直接完成校验,不然进入第4步。
四、使用Redis来进行限流的判断。使用的就是计数器方法,结合redis设置key的过时时间来实现的。
为了减小后端请求,在数据库提取的方法前都加入了缓存,如今咱们须要把用到的接口添加到入口进行注入。
builder.Services.AddSingleton<IOcelotCache<RateLimitRuleModel>, InRedisCache<RateLimitRuleModel>>(); builder.Services.AddSingleton<IOcelotCache<AhphClientRateLimitCounter?>, InRedisCache<AhphClientRateLimitCounter?>>();
如今咱们还剩下IClientRateLimitRepository
接口未实现,如今只要实现这个接口,而后注入下,咱们就完成了限流中间件的开发了,咱们根据限流的流程,梳理了实现,如今有3个方法须要进行实现。
新建SqlServerClientRateLimitRepository
类,来开始实现咱们与数据库的操做,有了上面的分析思路,如今就是把一个一个详细肯定的方法实现而已,太简单了,只要花了几分钟后,就能够瞬间写出以下代码。
using Ctr.AhphOcelot.Configuration; using Ctr.AhphOcelot.RateLimit; using Dapper; using System; using System.Collections.Generic; using System.Data.SqlClient; using System.Text; using System.Threading.Tasks; namespace Ctr.AhphOcelot.DataBase.SqlServer { /// <summary> /// 金焰的世界 /// 2018-11-19 /// 客户端限流信息提取 /// </summary> public class SqlServerClientRateLimitRepository : IClientRateLimitRepository { private readonly AhphOcelotConfiguration _option; public SqlServerClientRateLimitRepository(AhphOcelotConfiguration option) { _option = option; } /// <summary> /// 校验客户端限流规则 /// </summary> /// <param name="clientid">客户端ID</param> /// <param name="path">请求地址</param> /// <returns></returns> public async Task<(bool RateLimit, List<AhphClientRateLimitOptions> rateLimitOptions)> CheckClientRateLimitAsync(string clientid, string path) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"SELECT DISTINCT UpstreamPathTemplate AS RateLimitPath,LimitPeriod AS Period,LimitNum AS Limit,ClientId FROM AhphReRoute T1 INNER JOIN AhphReRouteLimitRule T2 ON T1.ReRouteId=T2.ReRouteId INNER JOIN AhphLimitRule T3 ON T2.RuleId=T3.RuleId INNER JOIN AhphLimitGroupRule T4 ON T2.ReRouteLimitId=T4.ReRouteLimitId INNER JOIN AhphLimitGroup T5 ON T4.LimitGroupId=T5.LimitGroupId INNER JOIN AhphClientLimitGroup T6 ON T5.LimitGroupId=T6.LimitGroupId INNER JOIN AhphClients T7 ON T6.Id=T7.Id WHERE T1.InfoStatus=1 AND T1.UpstreamPathTemplate=@path AND T3.InfoStatus=1 AND T5.InfoStatus=1 AND ClientId=@clientid AND Enabled=1"; var result = (await connection.QueryAsync<AhphClientRateLimitOptions>(sql, new { clientid, path }))?.AsList(); if (result != null && result.Count > 0) { return (true, result); } else { return (false, null); } } } /// <summary> /// 校验是否设置了路由白名单 /// </summary> /// <param name="clientid">客户端ID</param> /// <param name="path">请求地址</param> /// <returns></returns> public async Task<bool> CheckClientReRouteWhiteListAsync(string clientid, string path) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"SELECT COUNT(1) FROM AhphReRoute T1 INNER JOIN AhphClientReRouteWhiteList T2 ON T1.ReRouteId=T2.ReRouteId INNER JOIN AhphClients T3 ON T2.Id=T3.Id WHERE T1.InfoStatus=1 AND UpstreamPathTemplate=@path AND ClientId=@clientid AND Enabled=1"; var result = await connection.QueryFirstOrDefaultAsync<int>(sql, new { clientid,path }); return result > 0; } } /// <summary> /// 校验是否启用限流规则 /// </summary> /// <param name="path">请求地址</param> /// <returns></returns> public async Task<bool> CheckReRouteRuleAsync(string path) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"SELECT COUNT(1) FROM AhphReRoute T1 INNER JOIN AhphReRouteLimitRule T2 ON T1.ReRouteId=T2.ReRouteId INNER JOIN AhphLimitRule T3 ON T2.RuleId=T3.RuleId WHERE T1.InfoStatus=1 AND UpstreamPathTemplate=@path AND T3.InfoStatus=1"; var result = await connection.QueryFirstOrDefaultAsync<int>(sql, new { path }); return result > 0; } } } }
主要就是注意下表之间的关系,把实现注入到AddAhphOcelot
里,如今就能够测试开始自定义客户端限流中间件。
builder.Services.AddSingleton<IClientRateLimitRepository, SqlServerClientRateLimitRepository>();
四、测试限流中间件
为了把把全部状况都测试一遍,先从开启限流,什么都不写入看是否可以正常运行。
option.ClientRateLimit = true;
还记得咱们上篇的两个客户端和能访问的页面吗?就用它们来测试,结果显示正常,说明不开启限流没有影响。
开启/cjy/values
2个限流规则,一个每1分钟访问1次,一个每1分钟访问60次。
--一、插入限流规则 INSERT INTO AhphLimitRule VALUES('每1分钟访问1次','1m',1,1); INSERT INTO AhphLimitRule VALUES('每1分钟访问60次','1m',60,1); --二、应用到/cjy/values路由 INSERT INTO AhphReRouteLimitRule VALUES(1,1); INSERT INTO AhphReRouteLimitRule VALUES(2,1);
由于还未给客户端应用规则,因此应该也是能够正常访问,可使用PostMan
测试下,测试时须要注意下缓存,由于全部的访问都启用的默认缓存策略,经测试获得预期效果。
如今开始把限流分别应用到客户端1和客户端2,看下限流效果。
--三、插入测试分组 INSERT INTO AhphLimitGroup VALUES('限流分组1','',1); INSERT INTO AhphLimitGroup VALUES('限流分组2','',1); --四、分组应用策略 INSERT INTO AhphLimitGroupRule VALUES(1,1); INSERT INTO AhphLimitGroupRule VALUES(2,2); --五、客户端应用限流分组 INSERT INTO AhphClientLimitGroup VALUES(2,1); INSERT INTO AhphClientLimitGroup VALUES(3,2);
而后使用PostMan
测试客户端1和客户端2,结果以下,超过设置的频率后不返回结果,达到预期目的,可是返回的是404错误,强迫症患者表示这不优雅啊,应该是429 Too Many Requests
,那咱们如何修改呢?
这里就须要了解下错误信息是如何输出的,须要查看Ocelot
源码,您会发现IErrorsToHttpStatusCodeMapper
接口和ErrorsToHttpStatusCodeMapper
实现,代码以下,
using System.Collections.Generic; using System.Linq; using Ocelot.Errors; namespace Ocelot.Responder { public class ErrorsToHttpStatusCodeMapper : IErrorsToHttpStatusCodeMapper { public int Map(List<Error> errors) { if (errors.Any(e => e.Code == OcelotErrorCode.UnauthenticatedError)) { return 401; } if (errors.Any(e => e.Code == OcelotErrorCode.UnauthorizedError || e.Code == OcelotErrorCode.ClaimValueNotAuthorisedError || e.Code == OcelotErrorCode.ScopeNotAuthorisedError || e.Code == OcelotErrorCode.UserDoesNotHaveClaimError || e.Code == OcelotErrorCode.CannotFindClaimError)) { return 403; } if (errors.Any(e => e.Code == OcelotErrorCode.RequestTimedOutError)) { return 503; } if (errors.Any(e => e.Code == OcelotErrorCode.UnableToFindDownstreamRouteError)) { return 404; } if (errors.Any(e => e.Code == OcelotErrorCode.UnableToCompleteRequestError)) { return 500; } return 404; } } }
能够发现由于未定义RateLimitOptionsError
错误的状态码,增长一个判断便可,那咱们重写下把,而后集成在咱们本身的中间件里,这块在后期有不少扩展可以用到,增长以下代码。
if (errors.Any(e => e.Code == OcelotErrorCode.RateLimitOptionsError)) { return 429; }
而后从新注入下。
builder.Services.AddSingleton<IErrorsToHttpStatusCodeMapper, AhphErrorsToHttpStatusCodeMapper>();
在从新测试下访问限流地址。
奈斯,达到了咱们预期的效果,.netcore
开发魅力体现出来了吗?
咱们增长客户端1的路由白名单,而后再继续测试看是否解除限流限制?
--六、设置客户端1/cjy/values路由白名单 INSERT INTO AhphClientReRouteWhiteList VALUES(1,2);
注意测试时清除缓存
经测试不受限流控制,达到了咱们最终目的,到此限流功能所有实现。
五、增长mysql支持
直接重写IClientRateLimitRepository
实现,而后注入实现。
builder.Services.AddSingleton<IClientRateLimitRepository, MySqlClientRateLimitRepository>();
本篇咱们讲解的是网关如何实现自定义客户端限流功能,从设计到实现一步一步详细讲解,虽然只用一篇就写完了,可是涉及的知识点仍是很是多的,但愿你们认真理解实现的思想,看我是如何从规划到实现的,为了更好的帮助你们理解。你们能够根据博客内容本身手动实现下,有利于消化,若是在操做中遇到什么问题,能够加.NET Core项目实战交流群(QQ群号:637326624)
咨询做者。
从下一篇开始介绍IdentityServer4
的相关应用,并配合咱们的网关实现认证,在跟我教程学习的朋友,能够本身先预习下。