为何编写TaskSchedulerEx类?html
由于.NET默认线程池只有一个线程池,若是某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。数据库
特色:编程
一、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增长和释放,且总线程数不大于参数_maxThreadCount浏览器
二、无缝兼容Task,使用上和Task同样,能够用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker网络
三、队列中还没有执行的任务能够取消异步
四、经过扩展类TaskHelper实现任务分组async
五、和SmartThreadPool对比,优势是无缝兼容Task类,和Task类使用没有区别,由于它自己就是对Task、TaskScheduler的扩展,因此Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程ide
六、代码量至关精简,TaskSchedulerEx类只有260多行代码异步编程
七、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道你们有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每一个任务执行平均耗时,而后使用公式(线程数 = CPU核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,而后按最佳线程数来动态建立线程,但这个计算过程可能会牺牲性能函数
对比SmartThreadPool:
TaskSchedulerEx类代码(使用BlockingCollection,当线程数大于200时,CPU占用高,线程数小于100时,CPU占用正常):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { public class TaskSchedulerEx : TaskScheduler, IDisposable { #region 外部方法 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion #region 变量属性事件 private BlockingCollection<Task> _tasks = new BlockingCollection<Task>(); List<Thread> _threadList = new List<Thread>(); private int _threadCount = 0; private int _maxThreadCount = 0; private int _timeOut = Timeout.Infinite; private int _extTimeOut = 3000; private Task _tempTask; private int _activeThreadCount = 0; private System.Timers.Timer _timer; private object _lockCreateTimer = new object(); /// <summary> /// 活跃线程数 /// </summary> public int ActiveThreadCount { get { return _activeThreadCount; } } /// <summary> /// 核心线程数 /// </summary> public int CoreThreadCount { get { return _threadCount; } } /// <summary> /// 最大线程数 /// </summary> public int MaxThreadCount { get { return _maxThreadCount; } } #endregion #region 构造函数 public TaskSchedulerEx(int threadCount = 10, int maxThreadCount = 20) { _maxThreadCount = maxThreadCount; CreateThreads(threadCount); } #endregion #region override GetScheduledTasks protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } #endregion #region override TryExecuteTaskInline protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } #endregion #region override QueueTask protected override void QueueTask(Task task) { _tasks.Add(task); } #endregion #region 资源释放 /// <summary> /// 资源释放 /// 若是尚有任务在执行,则会在调用此方法的线程上引起System.Threading.ThreadAbortException,请使用Task.WaitAll等待任务执行完毕后,再调用该方法 /// </summary> public void Dispose() { if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } _timeOut = 100; foreach (Thread item in _threadList) { item.Abort(); Interlocked.Decrement(ref _activeThreadCount); } _threadList.Clear(); GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } #endregion #region 建立线程池 /// <summary> /// 建立线程池 /// </summary> private void CreateThreads(int? threadCount = null) { if (threadCount != null) _threadCount = threadCount.Value; _timeOut = Timeout.Infinite; for (int i = 0; i < _threadCount; i++) { Interlocked.Increment(ref _activeThreadCount); Thread thread = new Thread(new ThreadStart(() => { Task task; while (_tasks.TryTake(out task, _timeOut)) { CreateTimer(); TryExecuteTask(task); } })); thread.IsBackground = true; thread.Start(); _threadList.Add(thread); } } #endregion #region 建立线程 /// <summary> /// 建立线程 /// </summary> private void CreateThread() { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; while (_tasks.TryTake(out task, _extTimeOut)) { TryExecuteTask(task); } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == _threadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } if (thread != null) { thread.Abort(); thread = null; } })); thread.IsBackground = true; thread.Start(); } #endregion #region 建立定时器 private void CreateTimer() { if (_timer == null) //_timer不为空时,跳过,不走lock,提高性能 { lock (_lockCreateTimer) { if (_timer == null) { int interval = 20; _timer = new System.Timers.Timer(); _timer.Interval = 1000; _timer.Elapsed += (s, e) => { if (_timer.Interval != interval) _timer.Interval = interval; if (_activeThreadCount >= _threadCount && _activeThreadCount < _maxThreadCount) { if (_tasks.Count > 0) { CreateThread(); } else { if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } } } }; _timer.Start(); } } } } #endregion #region 所有取消 /// <summary> /// 所有取消 /// 当前正在执行的任务没法取消,取消的只是后续任务,至关于AbortAll /// </summary> public void CancelAll() { while (_tasks.TryTake(out _tempTask)) { } } #endregion } }
TaskSchedulerEx类代码(使用ConcurrentQueue,测试500个线程,CPU占用0%-2%,正常):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// TaskScheduler扩展 /// 每一个实例都是独立线程池 /// </summary> public class TaskSchedulerEx : TaskScheduler, IDisposable { #region 外部方法 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion #region 变量属性事件 private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>(); List<Thread> _coreThreadList = new List<Thread>(); private int _coreThreadCount = 0; private int _maxThreadCount = 0; private int _auxiliaryThreadTimeOut = 20000; //辅助线程释放时间 private int _activeThreadCount = 0; private System.Timers.Timer _timer; private object _lockCreateTimer = new object(); private bool _run = true; /// <summary> /// 活跃线程数 /// </summary> public int ActiveThreadCount { get { return _activeThreadCount; } } /// <summary> /// 核心线程数 /// </summary> public int CoreThreadCount { get { return _coreThreadCount; } } /// <summary> /// 最大线程数 /// </summary> public int MaxThreadCount { get { return _maxThreadCount; } } #endregion #region 构造函数 /// <summary> /// TaskScheduler扩展 /// 每一个实例都是独立线程池 /// </summary> /// <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(若是是一次性使用,则设置为0比较合适)</param> /// <param name="maxThreadCount">最大线程数</param> public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20) { _maxThreadCount = maxThreadCount; CreateCoreThreads(coreThreadCount); } #endregion #region override GetScheduledTasks protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } #endregion #region override TryExecuteTaskInline protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } #endregion #region override QueueTask protected override void QueueTask(Task task) { CreateTimer(); _tasks.Enqueue(task); } #endregion #region 资源释放 /// <summary> /// 资源释放 /// 若是尚有任务在执行,则会在调用此方法的线程上引起System.Threading.ThreadAbortException,请使用Task.WaitAll等待任务执行完毕后,再调用该方法 /// </summary> public void Dispose() { _run = false; if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } foreach (Thread item in _coreThreadList) { item.Abort(); Interlocked.Decrement(ref _activeThreadCount); } _coreThreadList.Clear(); GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } #endregion #region 建立核心线程池 /// <summary> /// 建立核心线程池 /// </summary> private void CreateCoreThreads(int? coreThreadCount = null) { if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value; for (int i = 0; i < _coreThreadCount; i++) { Interlocked.Increment(ref _activeThreadCount); Thread thread = new Thread(new ThreadStart(() => { Task task; while (_run) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); } else { Thread.Sleep(10); } } })); thread.IsBackground = true; thread.Start(); _coreThreadList.Add(thread); } } #endregion #region 建立辅助线程 /// <summary> /// 建立辅助线程 /// </summary> private void CreateThread() { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; DateTime dt = DateTime.Now; while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); dt = DateTime.Now; } else { Thread.Sleep(100); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == _coreThreadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } if (thread != null) { thread.Abort(); thread = null; } })); thread.IsBackground = true; thread.Start(); } #endregion #region 建立定时器 private void CreateTimer() { if (_timer == null) //_timer不为空时,跳过,不走lock,提高性能 { lock (_lockCreateTimer) { if (_timer == null) { int interval = 20; _timer = new System.Timers.Timer(); _timer.Interval = _coreThreadCount == 0 ? 1 : 500; _timer.Elapsed += (s, e) => { if (_timer.Interval != interval) _timer.Interval = interval; if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) { if (_tasks.Count > 0) { CreateThread(); } else { if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } } } }; _timer.Start(); } } } } #endregion #region 所有取消 /// <summary> /// 所有取消 /// 当前正在执行的任务没法取消,取消的只是后续任务,至关于AbortAll /// </summary> public void CancelAll() { Task tempTask; while (_tasks.TryDequeue(out tempTask)) { } } #endregion } }
RunHelper类代码:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// 线程工具类 /// </summary> public static class RunHelper { #region 变量属性事件 #endregion #region 线程中执行 /// <summary> /// 线程中执行 /// </summary> public static Task Run(this TaskScheduler scheduler, Action<object> doWork, object arg = null, Action<Exception> errorAction = null) { return Task.Factory.StartNew((obj) => { try { doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run错误"); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 线程中执行 /// <summary> /// 线程中执行 /// </summary> public static Task Run(this TaskScheduler scheduler, Action doWork, Action<Exception> errorAction = null) { return Task.Factory.StartNew(() => { try { doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run错误"); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 线程中执行 /// <summary> /// 线程中执行 /// </summary> public static Task<T> Run<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null) { return Task.Factory.StartNew<T>((obj) => { try { return doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 线程中执行 /// <summary> /// 线程中执行 /// </summary> public static Task<T> Run<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null) { return Task.Factory.StartNew<T>(() => { try { return doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 线程中执行 /// <summary> /// 线程中执行 /// </summary> public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null) { return await Task.Factory.StartNew<T>((obj) => { try { return doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 线程中执行 /// <summary> /// 线程中执行 /// </summary> public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null) { return await Task.Factory.StartNew<T>(() => { try { return doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion } }
TaskHelper扩展类(代码中LimitedTaskScheduler改成TaskSchedulerEx便可)(这个任务分类有点多,每一个任务分类的核心线程通常是不释放的,一直占着线程,算不算滥用):
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Utils { /// <summary> /// Task帮助类基类 /// </summary> public class TaskHelper { #region UI任务 private static LimitedTaskScheduler _UITask; /// <summary> /// UI任务(4个线程) /// </summary> public static LimitedTaskScheduler UITask { get { if (_UITask == null) _UITask = new LimitedTaskScheduler(4); return _UITask; } } #endregion #region 菜单任务 private static LimitedTaskScheduler _MenuTask; /// <summary> /// 菜单任务 /// </summary> public static LimitedTaskScheduler MenuTask { get { if (_MenuTask == null) _MenuTask = new LimitedTaskScheduler(2); return _MenuTask; } } #endregion #region 计算任务 private static LimitedTaskScheduler _CalcTask; /// <summary> /// 计算任务(8个线程) /// </summary> public static LimitedTaskScheduler CalcTask { get { if (_CalcTask == null) _CalcTask = new LimitedTaskScheduler(8); return _CalcTask; } } #endregion #region 网络请求 private static LimitedTaskScheduler _RequestTask; /// <summary> /// 网络请求(32个线程) /// </summary> public static LimitedTaskScheduler RequestTask { get { if (_RequestTask == null) _RequestTask = new LimitedTaskScheduler(32); return _RequestTask; } } #endregion #region 数据库任务 private static LimitedTaskScheduler _DBTask; /// <summary> /// 数据库任务(32个线程) /// </summary> public static LimitedTaskScheduler DBTask { get { if (_DBTask == null) _DBTask = new LimitedTaskScheduler(32); return _DBTask; } } #endregion #region IO任务 private static LimitedTaskScheduler _IOTask; /// <summary> /// IO任务(8个线程) /// </summary> public static LimitedTaskScheduler IOTask { get { if (_IOTask == null) _IOTask = new LimitedTaskScheduler(8); return _IOTask; } } #endregion #region 首页任务 private static LimitedTaskScheduler _MainPageTask; /// <summary> /// 首页任务(16个线程) /// </summary> public static LimitedTaskScheduler MainPageTask { get { if (_MainPageTask == null) _MainPageTask = new LimitedTaskScheduler(16); return _MainPageTask; } } #endregion #region 图片加载任务 private static LimitedTaskScheduler _LoadImageTask; /// <summary> /// 图片加载任务(32个线程) /// </summary> public static LimitedTaskScheduler LoadImageTask { get { if (_LoadImageTask == null) _LoadImageTask = new LimitedTaskScheduler(32); return _LoadImageTask; } } #endregion #region 浏览器任务 private static LimitedTaskScheduler _BrowserTask; /// <summary> /// 浏览器任务 /// </summary> public static LimitedTaskScheduler BrowserTask { get { if (_BrowserTask == null) _BrowserTask = new LimitedTaskScheduler(2); return _BrowserTask; } } #endregion } }
Form1.cs测试代码:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Management; using System.Reflection; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using Utils; namespace test { public partial class Form1 : Form { private TaskSchedulerEx _taskSchedulerEx = null; private TaskSchedulerEx _taskSchedulerExSmall = null; private TaskSchedulerEx _task = null; public Form1() { InitializeComponent(); _taskSchedulerEx = new TaskSchedulerEx(50, 500); _taskSchedulerExSmall = new TaskSchedulerEx(5, 50); _task = new TaskSchedulerEx(2, 10); } private void Form1_Load(object sender, EventArgs e) { } /// <summary> /// 模拟大量网络请求任务 /// </summary> private void button1_Click(object sender, EventArgs e) { DoTask(_taskSchedulerEx, 200000, 1000, 20); } /// <summary> /// 模拟CPU密集型任务 /// </summary> private void button2_Click(object sender, EventArgs e) { DoTask(_taskSchedulerEx, 100000, 2000, 1); } /// <summary> /// 模拟大量网络请求任务 /// </summary> private void button3_Click(object sender, EventArgs e) { DoTask(_taskSchedulerExSmall, 2000, 100, 20); } /// <summary> /// 模拟CPU密集型任务 /// </summary> private void button4_Click(object sender, EventArgs e) { DoTask(_taskSchedulerExSmall, 2000, 100, 1); } /// <summary> /// 模拟任务 /// </summary> /// <param name="scheduler">scheduler</param> /// <param name="taskCount">任务数量</param> /// <param name="logCount">每隔多少条数据打一个日志</param> /// <param name="delay">模拟延迟或耗时(毫秒)</param> private void DoTask(TaskSchedulerEx scheduler, int taskCount, int logCount, int delay) { _task.Run(() => { Log("开始"); DateTime dt = DateTime.Now; List<Task> taskList = new List<Task>(); for (int i = 1; i <= taskCount; i++) { Task task = scheduler.Run((obj) => { var k = (int)obj; Thread.Sleep(delay); //模拟延迟或耗时 if (k % logCount == 0) { Log("最大线程数:" + scheduler.MaxThreadCount + " 核心线程数:" + scheduler.CoreThreadCount + " 活跃线程数:" + scheduler.ActiveThreadCount.ToString().PadLeft(4, ' ') + " 处理数/总数:" + k + " / " + taskCount); } }, i, (ex) => { Log(ex.Message); }); taskList.Add(task); } Task.WaitAll(taskList.ToArray()); double d = DateTime.Now.Subtract(dt).TotalSeconds; Log("完成,耗时:" + d + "秒"); }); } private void Form1_FormClosed(object sender, FormClosedEventArgs e) { if (_taskSchedulerEx != null) { _taskSchedulerEx.Dispose(); //释放资源 _taskSchedulerEx = null; } } } }
测试截图: