因为项目需求,须要开发一些程序去爬取一些网站的信息,算是小爬虫程序吧。爬网页这东西是要通过网络传输,若是程序运行起来串行执行请求爬取,会很慢,我想没人会这样作。为了提升爬取效率,必须使用多线程并行爬取。这时候就须要线程池了。池的概念,我想作开发的都应该知道,目的就是对资源的合理运用。刚开始的时候,我首先想到的就是 .net 框架下的线程池 ThreadPool,毕竟是自带的,在性能、稳定性方面确定没问题。但在琢磨了几天后,.net 框架下自带的这个 ThreadPool 让我很不放心。1.ThreadPool 是一个静态类!!也就是说,当程序运行起来之后,这个池是整个应用程序域共享的,.net 框架很大,程序运行了之后,除了我们本身往这个共享池里塞任务,谁知道有没有其余的类、代码、任务也会往里塞 workItem 呢?也就是说,假如我设置这个共享池大小为 10,但实际为咱们工做的线程会不到 10 个,这就会致使程序运行时达不到咱们预期的效果。2.目前咱们的爬虫程序设计是像一个服务同样挂着,只要程序启动了之后就会一直运行着,除非手动中止。所以,在没有爬取任务的时候,须要减小甚至清空池内的全部线程,以避免池内线程一直挂着占用系统资源。因为 .net 自带的这个线程池是共享的,我还真不敢随意调整它的大小,对于我这种控制欲极强的程序员来讲,这是万万接受不了的。虽然.net 自带的 ThreadPool 用法简单,功能强大,并且它还能够智能的调节池内线程池数量,但我仍是决定抛弃它,由于,我须要一个可控的线程池!因而开始到网上处处查找有没有其它现成的线程池。百度、谷歌了很久,发如今.net界比较成熟的就 SmartThreadPool,对 SmartThreadPool 简单了解之后,仍是以为它不是我想要的,因而决定,自造一个。因而,借助强大的网络,又开始网上处处搜索有关线程池如何实现以及相关注意事项之类的信息,也拜读过一些网上开源的项目,如 .net 的 SmartThreadPool、java 里的 ThreadPoolExecutor 等,虽然没接触过 java,但 java 和 C# 犹如亲兄弟,大同小异,让我这 .net coder 读起来不是很费劲。基于前人的实现思路,再融入本身的思想,脑中本身的池也慢慢浮现…java
根据需求,首先定义基本接口程序员
public interface IThreadPool : IDisposable { /// <summary> /// 线程池大小 /// </summary> int Threads { get; set; } /// <summary> /// 一个以毫秒为单位的值,表示从最后一个活动的线程执行完任务后开始计时,在指定的时间内线程池都没有接收到任何任务,则释放掉池内的全部线程。若设置值小于 0,则不会释放池内线程。如未指定,默认为 -1。 /// </summary> double KeepAliveTime { get; set; } /// <summary> /// 获取当前线程池内的空闲线程数量 /// </summary> /// <returns></returns> int GetAvailableThreads(); /// <summary> /// 获取当前线程池内工做项总数 /// </summary> /// <returns></returns> int GetWorkCount(); /// <summary> /// 向线程池中添加工做项 /// </summary> /// <param name="callback"></param> /// <param name="state"></param> /// <returns></returns> bool QueueWorkItem(WaitCallback callback, object state); }
能够看到,我定义的线程池接口成员就几个(命名都是来自 .net 自带 ThreadPool,哈哈),它们的用途上面代码也都带有注释。接下来,咱们看下核心实现。首先,介绍类 WorkerThread:服务器
internal class WorkerThread : IDisposable { Thread _thread; AutoResetEvent _waitEvent; Action _action; bool _disposed = false; public WorkerThread() { this._waitEvent = new AutoResetEvent(false); this._thread = new Thread(this.Run); this._thread.IsBackground = true; this._thread.Start(); } //是否正在执行工做 public bool IsWorking { get; private set; } public event Action<WorkerThread> Complete; public int ThreadId { get { return this._thread.ManagedThreadId; } } public ThreadState ThreadState { get { return this._thread.ThreadState; } } public void SetWork(Action act) { this.CheckDisposed(); if (this.IsWorking) throw new Exception("正在执行工做项"); this._action = act; } public void Activate() { this.CheckDisposed(); if (this.IsWorking) throw new Exception("正在执行工做项"); if (this._action == null) throw new Exception("未设置任何工做项"); this._waitEvent.Set(); } void Run() { while (!this._disposed) { this._waitEvent.WaitOne(); if (this._disposed) break; try { this.IsWorking = true; this._action(); } catch (ThreadAbortException) { if (!this._disposed) Thread.ResetAbort(); } catch (Exception) { } finally { this.IsWorking = false; this._action = null; var completeEvent = this.Complete; if (completeEvent != null) { try { completeEvent(this); } catch { } } } } } void CheckDisposed() { if (this._disposed) throw new ObjectDisposedException(this.GetType().Name); } public void Dispose() { if (this._disposed) return; try { this._disposed = true; //注意:只有调用 Dispose() 的不是当前对象维护的线程才调用 Abort if (Thread.CurrentThread.ManagedThreadId != this._thread.ManagedThreadId) { this._thread.Abort(); } } finally { this._waitEvent.Dispose(); } } }
这个类主要是对线程的封装,我称之为 WorkerThread。它主要负责接收线程池分配的一个任务,而后执行,线程池内维护的就是这么一个类对象。介绍下它的几个核心成员:网络
WorkerThread 这个类代码也不是不少,百来行而已。总结来讲它做用就是 SetWork(Action act) –> Activate() –> _action() –> WaitOne() –> SetWork(Action act) –> Activate()…无限循环…但愿你们看得明白。多线程
了解 WorkerThread 了之后,再来看下真正实现 IThreadPool 接口的类 WorkerThreadPool。前面提到,线程池的做用就是池内维护必定数量可重复利用的线程,WorkerThreadPool 负责 WorkerThread 的建立、接收用户任务并将任务分配给池内空闲的线程去执行用户任务,功能其实就这么简单。废话很少说,直接上代码:框架
public class WorkerThreadPool : IThreadPool, IDisposable { readonly object _lockObject = new object(); bool _disposed = false; bool _spin = false; //spin 每调用一次 QueueWorkItem 方法都会将 spin 设为 false,以通知计时线程中止循环了 double _keepAliveTime = -1;//等待时间,表示从最后一个活动的线程执行完任务后开始计时到必定的时间内都没有接受到任何任务,则释放掉池内的全部线程 DateTime _releaseTime = DateTime.Now;//释放线程的时间点 int _threads; List<WorkerThread> _allThreads; List<WorkerThread> _workingTreads; Queue<WorkerThread> _freeTreads; Queue<WorkItem> _workQueue; List<ReleaseThreadsRecord> _releaseThreadsRecords = new List<ReleaseThreadsRecord>(); /// <summary> /// 建立一个线程池,默认 Threads 为 25 /// </summary> public WorkerThreadPool() : this(25) { } /// <summary> /// 建立一个大小为 threads 的线程池 /// </summary> /// <param name="threads"></param> public WorkerThreadPool(int threads) { if (threads < 1) throw new ArgumentException("threads 小于 1"); this._threads = threads; this._allThreads = new List<WorkerThread>(threads); this._workingTreads = new List<WorkerThread>(threads); this._freeTreads = new Queue<WorkerThread>(threads); this._workQueue = new Queue<WorkItem>(); } /// <summary> /// /// </summary> ~WorkerThreadPool() { this.Dispose(); } /// <summary> /// 线程池大小 /// </summary> public int Threads { get { return this.GetPoolSize(); } set { this.SetPoolSize(value); } } /// <summary> /// 一个以毫秒为单位的值,表示从最后一个活动的线程执行完任务后开始计时,在指定的时间内线程池都没有接受到任何任务,则释放掉池内的全部线程。若设置值小于 0,则不会释放池内线程。如未指定,默认为 -1。 /// </summary> public double KeepAliveTime { get { return this._keepAliveTime; } set { this._keepAliveTime = value; } } int GetPoolSize() { return this._threads; } void SetPoolSize(int threads) { if (threads < 1) throw new ArgumentException("threads 小于 1"); lock (this._lockObject) { this._threads = threads; this.AdjustPool(); WorkerThread workerThread = null; WorkItem workItem = null; while (this.TryGetWorkerThreadAndWorkItem(out workerThread, out workItem, false)) { this.ActivateWorkerThread(workerThread, workItem); workerThread = null; workItem = null; } } } /// <summary> /// 获取当前线程池内的空闲线程数量 /// </summary> /// <returns></returns> public int GetAvailableThreads() { lock (this._lockObject) { if (this._threads <= this._workingTreads.Count) return 0; int r = this._threads - this._workingTreads.Count; return r; } } /// <summary> /// 获取当前线程池内工做项总数 /// </summary> /// <returns></returns> public int GetWorkCount() { lock (this._lockObject) { return this._workQueue.Count + this._workingTreads.Count; } } /// <summary> /// 向线程池中添加工做项 /// </summary> /// <param name="callback"></param> /// <param name="state"></param> /// <returns></returns> public bool QueueWorkItem(WaitCallback callback, object state) { if (callback == null) return false; WorkerThread workerThread = null; WorkItem workItem = null; lock (this._lockObject) { CheckDisposed(); if (this._workQueue.Count == int.MaxValue) return false; this._workQueue.Enqueue(new WorkItem(callback, state)); this._spin = false; if (!this.TryGetWorkerThreadAndWorkItem(out workerThread, out workItem, true)) { return true; } } this.ActivateWorkerThread(workerThread, workItem); return true; } public List<ReleaseThreadsRecord> GetReleaseThreadsRecords() { lock (this._lockObject) { List<ReleaseThreadsRecord> list = new List<ReleaseThreadsRecord>(this._releaseThreadsRecords.Count); foreach (var releaseThreadsRecord in this._releaseThreadsRecords) { list.Add(releaseThreadsRecord); } return list; } } /// <summary> /// 该方法必须在 locked 下执行 /// </summary> /// <param name="workerThread"></param> /// <param name="workItem"></param> /// <param name="workerThreadCall">是不是当前池内的线程调用该方法</param> /// <returns></returns> bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall) { workerThread = null; workItem = null; if (this._workQueue.Count > 0) { if (this._freeTreads.Count > 0) { workerThread = this._freeTreads.Dequeue(); workItem = this._workQueue.Dequeue(); this._workingTreads.Add(workerThread); return true; } else { if (this._allThreads.Count < this._threads) { workerThread = new WorkerThread(); workItem = this._workQueue.Dequeue(); this._allThreads.Add(workerThread); this._workingTreads.Add(workerThread); return true; } return false; } } else { if (!workerThreadCall) return false; double t = this._keepAliveTime; if (t < 0) { this._workQueue.TrimExcess(); return false; } //此代码块只有当前池内的线程完成工做了之后访问到,从 QueueWorkItem 方法调用该方法是不会执行此代码块的,由于 this.workQueue.Count > 0 if (this._freeTreads.Count == this._allThreads.Count && this._workingTreads.Count == 0 && this._freeTreads.Count > 0) { /* *能执行到这,说明池内没有了任何任务,而且是最后一个活动线程执行完毕 *此时从池中取出一个线程来执行 Tick 方法 */ DateTime now = DateTime.Now; int threadId = Thread.CurrentThread.ManagedThreadId; if (this._allThreads.Any(a => a.ThreadId == threadId))//既然只有当前池内的线程能访问到这,这句判断是否是有点多余了- - { workerThread = this._freeTreads.Dequeue();//弹出一个 WorkerThread 对象,此时不需将弹出的 WorkerThread 对象放入 workingTreads 队列中,由于该对象是供池内自身计时用,相对外界是不知道的,保证外界调用 GetAvailableThreads 方法能获得一个合理的结果 workItem = new WorkItem((state) => { this.Tick((WorkerThread)state); }, workerThread); this._spin = true; try { this._releaseTime = now.AddMilliseconds(t);//设置待释放线程的时间点 } catch (ArgumentOutOfRangeException) { this._releaseTime = DateTime.MaxValue; } return true; } } return false; } } void ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem) { workerThread.SetWork(workItem.Execute); workerThread.Complete += this.WorkComplete; workerThread.Activate(); } void WorkComplete(WorkerThread workerThread) { //避免没法调用终结器,务必将 this.WorkComplete 从 workerThread.Complete 中移除,取出 workerThread 的时候再加上 workerThread.Complete -= this.WorkComplete; if (this._disposed) return; WorkerThread nextWorkerThread = null; WorkItem nextWorkItem = null; lock (this._lockObject) { if (this._disposed) return; this._workingTreads.Remove(workerThread); this._freeTreads.Enqueue(workerThread); this.AdjustPool(); if (!this.TryGetWorkerThreadAndWorkItem(out nextWorkerThread, out nextWorkItem, true)) { return; } } this.ActivateWorkerThread(nextWorkerThread, nextWorkItem); } /// <summary> /// 该方法必须在 locked 下执行 /// </summary> void AdjustPool() { while (this._allThreads.Count > this._threads && this._freeTreads.Count > 0) { WorkerThread workerThread = this._freeTreads.Dequeue(); this._allThreads.Remove(workerThread); workerThread.Dispose(); } } /// <summary> /// 自旋计时,直到收到中止自旋的消息或者达到了释放池内的线程时刻 /// </summary> /// <param name="workerThread"></param> void Tick(WorkerThread workerThread) { DateTime releaseTimeTemp = this._releaseTime; while (true) { lock (this._lockObject) { if (!this._spin) { //spin==false,即说明收到了中止计时的通知 break; } if (DateTime.Now >= releaseTimeTemp) { //此时中止计时,开始释放线程,在释放线程前先把以前加上的 WorkComplete 给取消,这样就不会执行 WorkComplete 方法了,循环终止后 当前线程也天然执行完毕 workerThread.Complete -= this.WorkComplete; //释放全部的线程,也包括当前线程 this.ReleaseThreads(); this._spin = false; //对于运行到这里,由于全部的线程已经被释放(包括当前线程),因此运行完这个方法,当前线程也天然运行结束了 break; } } Thread.SpinWait(1); } } void ReleaseThreads() { ReleaseThreadsRecord releaseThreadsRecord = new ReleaseThreadsRecord(DateTime.Now); foreach (WorkerThread thread in this._allThreads) { try { thread.Dispose(); } catch { } releaseThreadsRecord.ThreadIds.Add(thread.ThreadId); } releaseThreadsRecord.ThreadIds.TrimExcess(); this._releaseThreadsRecords.Add(releaseThreadsRecord); this._allThreads.Clear(); this._freeTreads.Clear(); this._workingTreads.Clear(); this._workQueue.Clear(); this._workQueue.TrimExcess(); } void CheckDisposed() { if (this._disposed) throw new ObjectDisposedException(this.GetType().Name); } /// <summary> /// 销毁当前对象资源,调用此方法会强制终止池内的全部线程 /// </summary> public void Dispose() { if (this._disposed) return; //释放全部线程 lock (this._lockObject) { if (this._disposed) return; this.ReleaseThreads(); this._threads = 0; this._disposed = true; } this.Dispose(true); GC.SuppressFinalize(this); } /// <summary> /// /// </summary> /// <param name="disposing"></param> protected virtual void Dispose(bool disposing) { } class WorkItem { bool _executed = false; WaitCallback _callback; object _state; public WorkItem(WaitCallback callback, object state) { if (callback == null) throw new ArgumentNullException("callback"); this._callback = callback; this._state = state; } public void Execute() { if (this._executed) throw new Exception("当前 WorkItem 已经执行"); this._executed = true; this._callback(this._state); } } public class ReleaseThreadsRecord { public ReleaseThreadsRecord() { this.ThreadIds = new List<int>(); } public ReleaseThreadsRecord(DateTime releaseTime) { this.ReleaseTime = releaseTime; this.ThreadIds = new List<int>(); } /// <summary> /// 释放时间 /// </summary> public DateTime ReleaseTime { get; set; } /// <summary> /// 被释放线程的 Id 集合 /// </summary> public List<int> ThreadIds { get; set; } public ReleaseThreadsRecord Clone() { ReleaseThreadsRecord record = new ReleaseThreadsRecord(this.ReleaseTime); record.ThreadIds = this.ThreadIds.ToList(); return record; } } }
这个类代码仍是挺多的(貌似有点占篇幅- - ),虽然代码里都加上了注释,但我仍是想给你们简单说说其实现思路以及内部一些核心相关成员,方便你们更快的理解。ide
上面就是 WorkerThreadPool 的一些核心字段以及方法,至于其它的成员就不作详细说明了。 为了方便管理,池内用了 _freeTreads 和 _workingTreads 两个集合来维护池内线程状态。因此每次从空闲线程 _freeTreads 取出 workerThread 执行任务的时候,都必须将 workerThread 添加到 _workingTreads 集合中;每一个 workerThread 执行完任务都会将本身从 _workingTreads 移除,同时将本身置为空闲线程添加到 _freeTreads 集合中等待接受下一个任务来临,因此 WorkComplete 方法体内最后都要调用 TryGetWorkerThreadAndWorkItem 方法获取可用的 WorkerThread 以及一个待处理的任务,而后执行,这样就造成了一个循环,只要有任务,池内就会一直处于满负荷状态。性能
开篇提到一个需求:没有爬取任务的时候,须要减小甚至清空池内的全部线程,以避免池内线程一直挂着占用系统资源。所以我给 IThreadPool 加了一个属性:KeepAliveTime。经过这个属性,能够给线程池设定一个时间,即线程池在指定的时间内都没有接收到任何任务,则会自行将池内的线程给销毁。在 WorkerThreadPool 中这个功能的实现很简单,在最后一个任务被执行完了之后,会自动从池内取出一个空闲的 workerThread 执行计时操做,也就是 WorkerThreadPool.Tick 方法,其实现也就是自旋计时,若是过了指定时间后都没有接受到任务,则自动将池内的线程给销毁。这个计时实现很简陋- - ,技术有限,想不到其它好办法了。学习
咱们的这个线程池设计简单,功能不是很强,但很适合咱们如今的程序,至少让我用的安心。目前已经在服务器上跑了一年半,一切都很正常。小程进入园子已有3年,在这么好的平台上小程一直都只知道汲取,却从未想过回报。所以,我想给你们分享点东西,虽然这个 WorkerThreadPool 简单,没什么高深的技术,但也算是个小结晶。若是你们有好建议,小程将万分感谢!网站
第一次写博,好紧张!最后,感谢博客园提供了这么一个良好的学习平台!