如今,由于种种因素,你必须对一个请求或者方法进行频率上的访问限制。
好比, 你对外提供了一个API接口,注册用户每秒钟最多能够调用100次,非注册用户每秒钟最多能够调用10次。
好比, 有一个很是吃服务器资源的方法,在同一时刻不能超过10我的调用这个方法,不然服务器满载。
好比, 有一些特殊的页面,访客并不能频繁的访问或发言。
好比, 秒杀活动等进行。
好比 ,防范DDOS,当达到必定频率后调用脚本iis服务器ip黑名单,防火墙黑名单。
如上种种的举例,也就是说,如何从一个切面的角度对调用的方法进行频率上的限制。而对频率限制,服务器层面都有最直接的解决方法,如今我说的则是代码层面上的频率管控。html
本文给出两个示例,一个是基于单机环境的实现,第二个则是基于分布式的Redis实现。web
--------------------redis
以第一个API接口需求为例,先说下单机环境下的实现。
按照惯性思惟,咱们天然会想到缓存的过时策略这种方法,可是严格来说就HttpRuntime.Cache而言,经过缓存的过时策略来对请求进行频率的并发控制是不合适的。
HttpRuntime.Cache 是应用程序级别的Asp.Net的缓存技术,经过这个技术能够申明多个缓存对象,能够为每一个对象设置过时时间,当过时时间到达后该缓存对象就会消失(也就是当你访问该对象的时候为Null)算法
为何这样说呢?好比对某个方法(方法名:GetUserList)咱们要进行1秒钟最多10次的限制,如今咱们就新建一个int型的Cache对象,而后设置1秒钟后过时消失。那么每当访问GetUserList方法前,咱们就先判断这个Cache对象的值是否大于10,若是大于10就不执行GetUserList方法,若是小于10则容许执行。每当访问该对象的时候若是不存在或者过时就新建,这样周而复始,则该对象永远不可能超过10。sql
1 if ((int)HttpRuntime.Cache["GetUserListNum"] > 10) //大于10请求失败 2 { 3 Console.WriteLine("禁止请求"); 4 } 5 else 6 { 7 HttpRuntime.Cache["GetUserListNum"] = (int)HttpRuntime.Cache["GetUserListNum"] + 1; //不然该缓存对象的值+1 8 Console.WriteLine("容许请求"); 9 }
这样的思想及实现相对来讲很是简单,可是基于这样的一个模型设定,那么就会出现这种状况:数据库
如上图,每一个点表明一次访问请求,我在0秒的时候 新建了一个名字为GetUserListNum的缓存对象。
在0~0.5秒期间 我访问了3次,在0.5~1秒期间,咱们访问了7次。此时,该对象消失,而后咱们接着访问,该对象重置为0.
在第1~1.5秒期间,仍是访问了7次,在第1.5秒~2秒期间访问了3次。缓存
基于这种简单缓存过时策略的模型,在这2秒钟内,咱们虽然平均每秒钟都访问了10次,知足这个规定,可是若是咱们从中取一个期间段,0.5秒~1.5秒期间,也是1秒钟,可是却实实在在的访问了14次!远远超过了咱们设置的 1秒钟最多访问10次的 限制。安全
那么如何科学的来解决上面的问题呢?咱们能够经过模拟会话级别的信号量这一手段,这也就是咱们今天的主题了。
什么是信号量?仅就以代码而言, static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(5); 它的意思就表明在多线程状况下,在任何一时刻,只能同时5个线程去访问。服务器
如今,在实现代码的以前咱们先设计一个模型。网络
假设咱们有一个用户A的管道,这个管道里装着用户A的请求,好比用户A在一秒钟发出了10次请求,那么每个请求过来,管道里的元素都会多一个。可是咱们设定这个管道最多只能容纳10个元素,并且每一个元素的存活期为1秒,1秒后则该元素消失。那么这样设计的话,不管是速率仍是数量的突进,都会有管道长度的限制。这样一来,不管从哪个时间节点或者时间间隔出发,这个管道都能知足咱们的频率限制需求。
而这里的管道,就必须和会话Id来对应了。每当有新会话进来的时候就生成一个新管道。这个会话id根据本身场景所定,能够是sessionId,能够是ip,也能够是token。
那么既然这个管道是会话级别的,咱们确定得须要一个容器,来装这些管道。如今,咱们以IP来命名会话管道,并把全部的管道都装载在一个容器中,如图
而基于刚才的设定,咱们还须要对容器内的每条管道的元素进行处理,把过时的给剔除掉,为此,还须要单独为该容器开辟出一个线程来为每条管道进行元素的清理。而当管道的元素为0时,咱们就清掉该管道,以便节省容器空间。
固然,因为用户量多,一个容器内可能存在上万个管道,这个时候仅仅用一个容器来装载来清理,在效率上显然是不够的。这个时候,咱们就得对容器进行横向扩展了。
好比,咱们能够根据Cpu核心数自动生成对应的数量的容器,而后根据一个算法,对IP来进行导流。我当前cpu是4个逻辑核心,就生成了4个容器,每当用户访问的时候,都会最早通过一个算法,这个算法会对IP进行处理,如192.168.1.11~192.168.1.13这个Ip段进第一个容器,xxx~xxx进第二个容器,依次类推,相应的,也就有了4个线程去分别处理4个容器中的管道。
那么,最终就造成了咱们的4容器4线程模型了。
如今,着眼于编码实现:
首先咱们须要一个能承载这些容器的载体,这个载体相似于链接池的概念,能够根据一些须要自动生成适应数量的容器,若是有特殊要求的话,还能够在容器上切出一个容器管理的面,在线程上切出一个线程管理的面以便于实时监控和调度。若是真要作这样一个系统,那么 容器的调度 和 线程的调度功能 是必不可少的,而本Demo则是完成了主要功能,像容器和线程在代码中我也没剥离开来,为了更好的直观的体现demo的含义,算法也是直接写死的,集合容器的内存管理与元素的选型待优化,实际设计中,这些都得优化,还有多线程模型中,怎样上锁才能让效率最大化也是重中之重的。
而这里为了案例的直观就直接写死成4个容器。
public static List<Container> ContainerList = new List<Container>(); //容器载体 static Factory() { for (int i = 0; i < 4; i++) { ContainerList.Add(new Container(i)); //遍历4次 生成4个容器 } foreach (var item in ContainerList) { item.Run(); //开启线程 } }
如今,咱们假定 有编号为 0 到 40 这样的 41个用户。那么这个导流算法 我也就直接写死,编号0至9的用户 将他们的请求给抛转到第一个容器,编号10~19的用户 放到第二个容器,编号20~29放到第三个容器,编号30~40的用户放到第四个容器。
那么这个代码就是这样的:
static Container GetContainer(int userId, out int i) //获取容器的算法 { if (0 <= userId && userId < 10) //编号0至9的用户 返回第一个容器 依次类推 { i = 0; return ContainerList[0]; } if (10 <= userId && userId < 20) { i = 1; return ContainerList[1]; } if (20 <= userId && userId < 30) { i = 2; return ContainerList[2]; } i = 3; return ContainerList[3]; }
当咱们的会话请求通过算法的导流以后,都必须调用一个方法,用于辨别管道数量。若是管道数量已经大于10,则请求失败,不然成功
public static void Add(int userId) { if (GetContainer(userId, out int i).Add(userId)) Console.WriteLine("容器" + i + " 用户" + userId + " 发起请求"); else Console.WriteLine("容器" + i + " 用户" + userId + " 被拦截"); }
接下来就是容器Container的代码了。
这里,对容器的选型用线程安全的ConcurrentDictionary类。
线程安全:当多个线程同时读写同一个共享元素的时候,就会出现数据错乱,迭代报错等安全问提
ConcurrentDictionary:除了GetOrAdd方法要慎用外,是.Net4.0专为解决Dictionary线程安全而出的新类型
ReaderWriterLockSlim:较ReaderWriterLock优化的读写锁,多个线程同时访问读锁 或 一个线程访问写锁
private ReaderWriterLockSlim obj = new ReaderWriterLockSlim(); //在每一个容器中申明一个读写锁 public ConcurrentDictionary<string, ConcurrentList<DateTime>> dic = new ConcurrentDictionary<string, ConcurrentList<DateTime>>(); //建立该容器 dic
而后当你向容器添加一条管道中的数据是经过这个方法:
public bool Add(int userId) { obj.EnterReadLock();//挂读锁,容许多个线程同时写入该方法 try { ConcurrentList<DateTime> dtList = dic.GetOrAdd(userId.ToString(),t=>{ new ConcurrentList<DateTime>()}); //若是不存在就新建 ConcurrentList return dtList.CounterAdd(10, DateTime.Now); //管道容量10,当临界管道容量后 返回false } finally { obj.ExitReadLock(); } }
这里,为了在后面的线程遍历删除ConcurrentList的管道的时候保证ConcurrentList的安全性,因此此处要加读锁。
而ConcurrentList,由于.Net没有推出List集合类的线程安全(这里我申明下:之因此不用ConcurrentBag是由于要保证count和add的一致性,这里补充一下),因此本身新建了一个继承于List<T>的安全类型,在这里 封装了3个须要使用的方法。
public class ConcurrentList<T> : List<T> { private object obj = new object(); public bool CounterAdd(int num, T value) { lock (obj) { if (base.Count >= num) return false; else base.Add(value); return true; } } public new bool Remove(T value) { lock (obj) { base.Remove(value); return true; } } public new T[] ToArray() { lock (obj) { return base.ToArray(); } } }
最后就是线程的运行方法:
public void Run() { ThreadPool.QueueUserWorkItem(c => { while (true) { if (dic.Count > 0) { foreach (var item in dic.ToArray()) { ConcurrentList<DateTime> list = item.Value; foreach (DateTime dt in list.ToArray()) { if (DateTime.Now.AddSeconds(-3) > dt) { list.Remove(dt); Console.WriteLine("容器" + seat + " 已删除用户" + item.Key + "管道中的一条数据"); } } if (list.Count == 0) { obj.EnterWriteLock(); try { if (list.Count == 0) { if (dic.TryRemove(item.Key, out ConcurrentList<DateTime> i)) { Console.WriteLine("容器" + seat + " 已清除用户" + item.Key + "的List管道"); } } } finally { obj.ExitWriteLock(); } } } } else { Thread.Sleep(100); } } } ); }
最后,是效果图,一个是基于控制台的,还一个是基于Signalr的。
上面介绍了一种频率限制的模型,分布式与单机相比,无非就是载体不一样,咱们只要把这个容器的载体从程序上移植出来,来弄成一个单独的服务或者直接借用Redis也是可行的。
这里就介绍分布式状况下,Redis的实现。
不一样于Asp.Net的多线程模型,大概由于Redis的各类类型的元素很是粒度的操做致使各类加锁的复杂性,因此在网络请求处理这块Redis是单线程的,基于Redis的实现则由于单线程的缘故在编码角度不用太多考虑到与逻辑无关的问题。
简单介绍下,Redis是一个内存数据库,这个数据库属于非关系型数据库,它的概念不一样于通常的咱们认知的Mysql Oracle SqlServer关系型数据库,它没有Sql没有字段名没有表名这些概念,它和HttpRunTime.Cache的概念差很少同样,首先从操做上属于键值对模式,就如 Cache["键名"] 这样就能获取到值相似,并且能够对每一个Key设置过时策略,而Redis中的Key所对应的值并非想存啥就存啥的,它支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及sorted set(有序集合)。
今天要说的是Sorted set有序集合,有序集合相比其它的集合类型的特殊点在于,使用有序集合的时候还能给插入的元素指定一个 积分score,咱们把这个积分score理解为排序列,它内部会对积分进行排序,积分容许重复,而有序集合中的元素则是惟一。
仍是一样的思路,每当有用户访问的时候,都对该用户的 管道(有序集合)中添加一个元素,而后设置该元素的积分为当前时间。接着在程序中开个线程,来对管道中积分小于约定时间的元素进行清理。由于规定有序集合中的元素只能是惟一值,因此在赋值方面只要是知足uuid便可。
那么用Redis来实现的代码那就是相似这种:
经过using语法糖实现IDisposable而包装的Redis分布式锁,而后里面正常的逻辑判断。
这样的代码虽然也能完成功能,但不够友好。Redis是个基于内存的数据库,于性能而言,瓶颈在于网络 IO 上,与Get一次发出一次请求相比,能不能经过一段脚原本实现大部分逻辑呢?
有的,Redis支持 Lua脚本:
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
大体意思就是,直接向Redis发送一段脚本或者让它直接本地读取一段脚本从而直接实现全部的逻辑。
/// <summary> /// 若是 大于10(AccountNum) 就返回1 不然就增长一条集合中的元素 并返回 空 /// </summary> /// <param name="zcardKey"></param> /// <param name="score"></param> /// <param name="zcardValue"></param> /// <param name="AccountNum"></param> /// <returns></returns> public string LuaAddAccoundSorted(string zcardKey, double score, string zcardValue, int AccountNum) { string str = "local uu = redis.call('zcard',@zcardKey) if (uu >=tonumber(@AccountNum)) then return 1 else redis.call('zadd',@zcardKey,@score,@zcardValue) end"; var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str), new { zcardKey = zcardKey, score = score, zcardValue = zcardValue, AccountNum=AccountNum }); return re.ToString(); }
local uu就是申明一个为名uu的变量的意思,redis.call就是redis命令,这段脚本意思就是若是 大于10(AccountNum) 就返回1 不然就增长一条集合中的元素 并返回 空。
管道内元素处理的方法就是:
/// <summary> /// 遍历当前全部前缀的有序集合,若是数量为0,那么就返回1 不然 就删除 知足最大分值条件区间的元素,若是该集合个数为0则消失 /// </summary> /// <param name="zcardPrefix"></param> /// <param name="score"></param> /// <returns></returns> public string LuaForeachRemove(string zcardPrefix, double score) { StringBuilder str = new StringBuilder(); str.Append("local uu = redis.call('keys',@zcardPrefix) "); //声明一个变量 去获取 模糊查询的结果集合 str.Append("if(#uu==0) then"); //若是集合长度=0 str.Append(" return 1 "); str.Append("else "); str.Append(" for i=1,#uu do "); //遍历 str.Append(" redis.call('ZREMRANGEBYSCORE',uu[i],0,@score) "); //删除从0 到 该score 积分区间的元素 str.Append(" if(redis.call('zcard',uu[i])==0) then "); //若是管道长度=0 str.Append(" redis.call('del',uu[i]) "); //删除 str.Append(" end "); str.Append(" end "); str.Append("end "); var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str.ToString()), new { zcardPrefix = zcardPrefix + "*", score = score }); return re.ToString();
这2段代码经过发送Lua脚本的形式来完成了整个过程,由于Redis的网络模型缘由,因此把LuaForeachRemove方法给提出来作个服务来单独处理便可。至于那种多容器多线程的实现,则彻底能够开多个Redis的实例来实现。最后放上效果图。
最后,我把这些都给作成了个Demo。可是没有找到合适的上传网盘,因此你们能够留邮箱(留了就发),或者直接加QQ群文件自取,讨论交流:166843154
我喜欢和我同样的人交朋友,不被环境影响,本身是本身的老师,欢迎加群 .Net web交流群, QQ群:166843154 欲望与挣扎
做者:小曾出处:http://www.cnblogs.com/1996V/p/8127576.html 欢迎转载,但任何转载必须保留完整文章及博客园出处,在显要地方显示署名以及原文连接。.Net交流群, QQ群:166843154 欲望与挣扎