最近常收到SOD框架的朋友报告的SOD的SQL日志功能报错:文件句柄丢失。通过分析得知,这些朋友使用SOD框架开发了访问量比较大的系统,因为忘记关闭SQL日志功能因此出现了很高频率的日志写入操做,从而偶然引发错误。后来我建议只记录出错的或者执行时间较长的SQL信息,暂时解决了此问题。可是做为一个热心造轮子的人,必定要看看能不能造一个更好的轮子出来。html
前面说的错误缘由已经很直白了,就是频繁的日志写入致使的,那么解决方案就是将屡次写入操做合并成一次写入操做,而且采用异步写入方式。要保存屡次操做的内容就要有一个相似“队列”的东西来保存,而通常的线程安全的队列,都是“有锁队列”,在性能要求很高的系统中,不但愿在日志记录这个地方耗费多一点计算资源,因此最好有一个“无锁队列”,所以最佳方案就是Ring Buffer(环形缓冲区)了。sql
什么是Ring Buffer?顾名思义,就是一个内存环,每一次读写操做都循环利用这个内存环,从而避免频繁分配和回收内存,减轻GC压力,同时因为Ring Buffer能够实现为无锁的队列,从而总体上大幅提升系统性能。Ring Buffer的示意图以下,有关具体原理,请参考此文《Ring Buffer 有什么特别? 》。数组
上文并无详细说明如何具体读写Ring Buffer,可是原理介绍已经足够咱们怎么写一个Ring Buffer程序了,接下来看看我在 .NET上的实现。安全
首先,定一个存放数据的数组,记住必定要用数组,它是实现Ring Buffer的关键而且CPU友好。数据结构
const int C_BUFFER_SIZE = 10;//写入次数缓冲区大小,每次的实际内容大小不固定 string[] RingBuffer = new string[C_BUFFER_SIZE];
int writedTimes = 0;
变量writedTimes 记录写入次数,它会一直递增,不过为了线程安全的递增且不使用托管锁,须要使用原子锁Interlocked。以后,根据每次 writedTimes 跟环形缓冲区的大小求余数,获得当前要写入的数组位置:多线程
void SaveFile(string fileName, string text) { int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; }
Ring Buffer的核心代码就这么点,调用此方法,会一直往缓冲区写入数据而不会“溢出”,因此写入Ring Buffer效率很高。并发
一个队列若是只生产不消费确定不行的,那么如何及时消费Ring Buffer的数据呢?简单的方案就是当Ring Buffer“写满”的时候一次性将数据“消费”掉。注意这里的“写满”仅仅是指写入位置 index达到了数组最大索引位置,而“消费”也不一样于常见的堆栈,队列等数据结构,只是读取缓冲区的数据而不会移除它。框架
因此前面的代码只须要稍加改造:异步
void SaveFile(string fileName, string text) { int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; if (writeP == 0 ) { string result = string.Concat( RingBuffer); FlushFile(fileName, result); } }
writeP == 0 表示当前一轮的缓冲区已经写满,而后调用函数 FlushFile 将Ring Buffer的数据链接起来,总体写入文件。async
void FlushFile(string fileName, string text) { using (FileStream fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.Write, 2048, FileOptions.Asynchronous)) { byte[] buffer = System.Text.Encoding.UTF8.GetBytes(text); IAsyncResult writeResult = fs.BeginWrite(buffer, 0, buffer.Length, (asyncResult) => { fs.EndWrite(asyncResult); }, fs); //fs.EndWrite(writeResult);//这种方法异步起不到效果 fs.Flush(); } }
在函数 FlushFile 中咱们使用了异步写入文件的技术,注意 FileOptions.Asynchronous ,使用它才能够真正利用Windows的完成端口IOCP,将文件异步写入。
固然这段代码也可使用.NET最新版本支持的 async/await ,不过我要让SOD框架继续支持.NET 2.0,因此只好这样写了。
如今,咱们能够开多线程来测试这个循环队列效果怎么样:
Task[] arrTask = new Task[20]; for (int i = 0; i < arrTask.Length; i++) { arrTask[i] = new Task(obj => SaveFile( (int)obj) ,i); } for (int i = 0; i < arrTask.Length; i++) { arrTask[i].Start(); } Task.WaitAll(arrTask); MessageBox.Show(arrTask.Length +" Task All OK.");
这里开启20个Task任务线程来写入文件,运行此程序,发现20个线程才写入了10条数据,分析好久才发现,文件异步IO太快的话,会有缓冲区丢失,第一次写入的10条数据没法写入文件,多运行几回就没有问题了。因此仍是得想法解决此问题。
一般状况下咱们都是使用托管锁来解决这种并发问题,但本文的目的就是要实现一个“无锁环形缓冲区”,不能在此“功亏一篑”,因此此时“信号量”上场了。
同步能够分为锁定和信号同步,信号同步机制中涉及的类型都继承自抽象类WaitHandle,这些类型有EventWaitHandle(类型化为AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。见下图:
首先声明一个 ManualResetEvent对象:
ManualResetEvent ChangeEvent = new ManualResetEvent(true);
这里咱们将 ManualResetEvent 对象设置成 “终止状态”,意味着程序一开始是容许全部线程不等待的,当咱们须要消费Ring Buffer的时候再将 ManualResetEvent 设置成“非终止状态”,阻塞其它线程。简单说就是当要写文件的时候将环形缓冲区阻塞,直到文件写完才容许继续写入环形缓冲区。
对应的新的代码调整以下:
void SaveFile(string fileName, string text) { ChangeEvent.WaitOne(); int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; if (writeP == 0 ) { ChangeEvent.Reset(); string result = string.Concat( RingBuffer); FlushFile(fileName, result); } }
而后,再FlushFile 方法的 回掉方法中,加入设置终止状态的代码,部分代码以下:
(asyncResult) =>
{
fs.EndWrite(asyncResult);
ChangeEvent.Set();
}
OK,如今咱们的程序具有高性能的安全的写入日志文件的功能了,咱们来看看演示程序测试的日志结果实例:
Arr[0]:Thread index:0--FFFFFFF Arr[1]:Thread index:1--FFFFFFF Arr[2]:Thread index:8--FFFFFFF Arr[3]:Thread index:9--FFFFFFF Arr[4]:Thread index:3--FFFFFFF Arr[5]:Thread index:2--FFFFFFF Arr[6]:Thread index:4--FFFFFFF Arr[7]:Thread index:10--FFFFFFF Arr[8]:Thread index:5--FFFFFFF Arr[9]:Thread index:6--FFFFFFF Arr[0]:Thread index:7--FFFFFFF Arr[1]:Thread index:11--FFFFFFF Arr[2]:Thread index:12--FFFFFFF Arr[3]:Thread index:13--FFFFFFF Arr[4]:Thread index:14--FFFFFFF Arr[5]:Thread index:15--FFFFFFF Arr[6]:Thread index:16--FFFFFFF Arr[7]:Thread index:17--FFFFFFF Arr[8]:Thread index:18--FFFFFFF Arr[9]:Thread index:19--FFFFFFF
测试结果符合预期!
到此,咱们今天的主题就所有介绍完成了,不过要让本文的代码可以符合实际的运行,还要解决每次只写入少许数据而且将它按期写入日志文件的问题,这里贴出真正的局部代码:
PS:有朋友说采用信号量并不能彻底保证程序安全,查阅了MSDN也说若是信号量状态改变尚未来得及应用,那么是起不到做用的,因此还须要检查业务状态标记,也就是在设置非终止状态后,立刻设置一个操做标记,在其它线程中,须要检查此标记,以免“漏网之鱼”引发不指望的结果。
再具体实现上,咱们能够实现一个“自旋锁”,循环检查此状态标记,为了防止发生死锁,还须要有锁超时机制,代码以下:
void SaveFile(string fileName, string text) { ChangeEvent.WaitOne(10000); int currP= Interlocked.Increment(ref WritedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; if (writeP == 0 ) { ChangeEvent.Reset(); IsReading = true; RingBuffer[index] = " Arr[" + index + "]:" + text; LastWriteTime = DateTime.Now; WritingIndex = 0; SaveFile(fileName,RingBuffer); } else if (DateTime.Now.Subtract(LastWriteTime).TotalSeconds > C_WRITE_TIMESPAN) { ChangeEvent.Reset(); IsReading = true; RingBuffer[index] = " Arr[" + index + "]:" + text; int length = index - WritingIndex + 1; if (length <= 0) length = 1; string[] newArr = new string[length]; Array.Copy(RingBuffer, WritingIndex, newArr, 0, length); LastWriteTime = DateTime.Now; WritingIndex = index + 1; SaveFile(fileName, newArr); } else { //防止漏网之鱼的线程在信号量产生做用以前修改数据 //采用“自旋锁”等待 int count = 0; while (IsReading) { if (count++ > 10000000) { Thread.Sleep(50); break; } } RingBuffer[index] = " Arr[" + index + "]:" + text; } }
完整的Ring Buffer代码会在最新版本的SOD框架源码中,有关本篇文章测试程序的完整源码,请加QQ群讨论获取,
群号码:SOD框架高级群 18215717 ,加群请注明 PDF.NET技术交流 ,不然可能被拒绝。