Unity 已可以使用 Thread、Task 等处理多线程任务,但缺乏成熟的多线程任务队列工具,因此在此实现一个,代码已上传 Git 项目 GRUnityTools,可直接下载源码或经过 UPM 使用git
本文原地址:Unity实践—多线程任务队列实现github
串行与并发队列c#
队列是首要实现目标,且须要串行与并发两种队列,以覆盖不一样需求api
同步与异步执行网络
因任务队列过多可能阻塞主线程,因此除同步执行外还须要多线程异步操做多线程
主线程同步并发
由于有多线程,但 Unity 部分操做只能在主线程执行,因此还须要线程同步到主线程异步
Taskasync
Task 为当前 .Net 提供的实用性最高的多线程接口,可实现任务的监控与操纵ide
TaskScheduler
Task 专用调度器,可更便捷地实现 Task 队列调度
Loom
Loom 为网络上广为流传的 Unity 中调用主线程的工具类,目前找不到源码最原始地址,代码拷贝自知乎
最初即决定使用 Task 做为队列基本单位,但彻底没有考虑 TaskScheduler
。原计划手动实现一个调度器,负责保存传入的 Task 放入队列,可设置同步异步,根据设置实现对队列的不一样操做。后来再研究微软官方文档时发如今其 Task 文档的示例中有一个 LimitedConcurrencyLevelTaskScheduler
的演示代码,直接经过 TaskScheduler
实现了可控并发数量的调度器,且当设置并发数为1时队列中的任务会逐一按顺序执行即产生了串行队列效果
TaskScheduler
有两种使用方式
方式一:为 TaskFactory
配置 TaskScheduler,经过 TaksFactory
使用配置的调度器启动 Task
//建立并发数32的调度器
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(32);
//方式1
TaskFactory factory = new TaskFactory(scheduler);
factory.StartNew(()=>{
//执行任务
});
复制代码
方式二:直接使用 Task.Start(TaskFactory)
方法
//建立并发数1的调度器(此时即为串行队列效果)
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
//声明一个 Task 对象
Task task = new Task(()=>{
//任务
});
//启动 Task 指定调度器
task.Start(scheduler);
复制代码
建立名为 TaskQueue
的类,添加变量
//根据需求设置默认并发数
private const int DefaultConcurrentCount = 32;
//线程锁
private static object _lock = new object();
//默认静态串行队列对象
private static TaskQueue _defaultSerial;
//默认静态并发队列对象
private static TaskQueue _defaultConcurrent;
//持有的调度器
private LimitedConcurrencyLevelTaskScheduler _scheduler;
//提供默认串行队列
public static TaskQueue DefaultSerailQueue
{
get
{
if (_defaultSerial == null)
{
lock (_lock)
{
if (_defaultSerial == null)
{
_defaultSerial = new TaskQueue(1);
}
}
}
return _defaultSerial;
}
}
//提供默认并发队列
public static TaskQueue DefaultConcurrentQueue
{
get
{
if (_defaultConcurrent == null)
{
lock (_lock)
{
if (_defaultConcurrent == null)
{
_defaultConcurrent = new TaskQueue(DefaultConcurrentCount);
}
}
}
return _defaultConcurrent;
}
}
复制代码
提供快捷构造方法
//默认构造方法,因 Loom 为 UnityEngine.Monobehaviour对象,因此必须执行初始化方法将其加入场景中
public TaskQueue(int concurrentCount)
{
_scheduler = new LimitedConcurrencyLevelTaskScheduler(concurrentCount);
Loom.Initialize();
}
//建立串行队列
public static TaskQueue CreateSerialQueue()
{
return new TaskQueue(1);
}
//建立并发队列
public static TaskQueue CreateConcurrentQueue()
{
return new TaskQueue(DefaultConcurrentCount);
}
复制代码
下面是各类同步、异步、主线程执行方法,方法会将执行的 Task 返回,以便在实际使用中须要对 Task 有其余操做
需注意 RunSyncOnMainThread
和 RunAsyncOnMainThread
为独立的执行系统与队列无关,若在主线程执行方法直接在主线程同步执行便可
//异步执行,因Task自己会开启新线程因此直接调用便可
public Task RunAsync(Action action)
{
Task t = new Task(action);
t.Start(_scheduler);
return t;
}
//同步执行,使用 Task 提供的 RunSynchronously 方法
public Task RunSync(Action action)
{
Task t = new Task(action);
t.RunSynchronously(_scheduler);
return t;
}
//同步执行主线程方法
//为避免主线程调用该方法因此须要判断当前线程,若为主线程则直接执行,防止死锁
//为保证线程同步,此处使用信号量,仅在主线程方法执行完成后才会释放信号
public static void RunSyncOnMainThread(Action action)
{
if (Thread.CurrentThread.ManagedThreadId == 1)
{
action();
}
else
{
Semaphore sem = new Semaphore(0, 1);
Loom.QueueOnMainThread((o => {
action();
sem.Release();
}), null);
sem.WaitOne();
}
}
//因 Loom 自己即为不会当即执行方法,因此直接调用便可
public static void RunAsyncOnMainThread(Action action)
{
Loom.QueueOnMainThread((o => { action(); }), null);
}
复制代码
扩展延迟执行方法,因延迟自己为异步操做,因此只提供异步执行方式
// 此处使用async、await 关键字实现延迟操做, delay 为秒,Task.Delay 参数为毫秒
public Task RunAsync(Action action, float delay)
{
Task t = Task.Run(async () =>
{
await Task.Delay((int) (delay * 1000));
return RunAsync(action);
});
return t;
}
复制代码
并发队列异步执行
并发队列同步执行
串行队列异步执行
串行队列同步执行
并发队列延迟执行
子线程异步执行主线程
子线程同步执行主线程
到此一个多线程任务队列工具就完成了,通常的需求基本能够知足,后续还可提供更多扩展功能,如传参、取消任务等
另外我我的想尽力将这套工具脱离 UnityEngine.Monobehaviour,但目前还没找到除 Loom 外其余 Unity 获取主线程的方法,固然 Loom 自己仍然是一个很巧妙的工具
若想了解 LimitedConcurrencyLevelTaskScheduler
和 Loom
可继续想下看
TaskScheduler 为抽象类,想自定义任务调度需继承该类,并复写部份内部调度方法
LimitedConcurrencyLevelTaskScheduler ,如下简称为 LCLTS,为微软官方文档提供的示例代码,用于调度任务,控制并发数
ThreadPool
分配工做线程执行,并计数+1public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
//ThreadStatic 线程变量特性,代表是当前线程是否正在处理任务
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
// 任务队列,使用链表比 List 和 Array 更方便执行插队出队操做(队列中不会出现空位)
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // 该队列由 lock(_tasks) 锁定
// 最大并发数
private readonly int _maxDegreeOfParallelism;
// 当前已分配入队的任务数量
private int _delegatesQueuedOrRunning = 0;
// 带并发数的构造方法
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
// 将 Task 放入调度队列
protected sealed override void QueueTask(Task task)
{
//将任务放入列表,检查当前执行数是否达到最大值,若未达到则分配线程执行,并计数+1
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
// 使用 ThreadPool 将 Task 分配到工做线程
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
//标记当前线程正在执行任务,当有 Task 想插入此线程执行时会检查该状态
_currentThreadIsProcessingItems = true;
try
{
// 死循环处理全部队列中 Task
while (true)
{
Task item;
lock (_tasks)
{
// 任务队列执行完后退出循环,并将占用标记置为 false
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// 若还有 Task 则获取第一个,并出队
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// 执行 Task
base.TryExecuteTask(item);
}
}
// 线程占用标记置为 false
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
// 尝试在当前线程执行指定任务
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 若当前线程没有在执行任务则没法执行插队操做
if (!_currentThreadIsProcessingItems) return false;
// 若该任务已在队列中,则出队
if (taskWasPreviouslyQueued)
// 尝试执行 Task
if (TryDequeue(task))
return base.TryExecuteTask(task);
else
return false;
else
return base.TryExecuteTask(task);
}
// 尝试将已调度的 Task 移出调度队列
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
// 获取最大并发数
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
// 获取已调度任务队列迭代器
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
// Monitor.TryEnter 做用为线程锁,其语法糖为 lock (_tasks)
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
复制代码
Loom 经过继承 UnityEngine.MonoBehaviour,使用 Unity 主线程生命周期 Update
在主线程执行方法,同时 Loom 也支持简单的多线程异步执行
Loom 包含两个队列有延迟方法队列和无延迟方法队列,两条队列方法均可执行传参方法
Action
和 param
以及延迟时间打包入结构体放入延迟或无延迟队列Time.time
获取添加任务的时间加上延迟时间获得预约执行时间打包入延迟任务结构体并入队Update
周期执行,清空执行队列旧任务,取出无延迟队列全部对象,放入执行队列,清空无延迟队列,遍历执行执行队列任务Update
周期,清空延迟执行队列旧任务,取出预计执行时间小于等于当前时间的任务,放入延迟执行队列,将取出的任务移出延迟队列,遍历执行延迟执行队列任务用户可将 Loom 脚本挂载在已有对象上,也可直接代码调用方法,Loom 会自动在场景中添加一个不会销毁的 Loom 单例对象
代码中使用 QueueOnMainThread
将延迟和无延迟方法加入主线程队列, RunAsync
执行异步方法
public class Loom :MonoBehaviour
{
public static int maxThreads = 8;
static int numThreads;
private static Loom _current;
//private int _count;
public static Loom Current
{
get
{
Initialize();
return _current;
}
}
void Awake()
{
_current = this;
initialized = true;
}
static bool initialized;
[RuntimeInitializeOnLoadMethod]
public static void Initialize()
{
if (!initialized)
{
if (!Application.isPlaying)
return;
initialized = true;
var g = new GameObject("Loom");
_current = g.AddComponent<Loom>();
#if !ARTIST_BUILD
UnityEngine.Object.DontDestroyOnLoad(g);
#endif
}
}
public struct NoDelayedQueueItem
{
public Action<object> action;
public object param;
}
private List<NoDelayedQueueItem> _actions = new List<NoDelayedQueueItem>();
public struct DelayedQueueItem
{
public float time;
public Action<object> action;
public object param;
}
private List<DelayedQueueItem> _delayed = new List<DelayedQueueItem>();
List<DelayedQueueItem> _currentDelayed = new List<DelayedQueueItem>();
public static void QueueOnMainThread(Action<object> taction, object tparam)
{
QueueOnMainThread(taction, tparam, 0f);
}
public static void QueueOnMainThread(Action<object> taction, object tparam, float time)
{
if (time != 0)
{
lock (Current._delayed)
{
Current._delayed.Add(new DelayedQueueItem { time = Time.time + time, action = taction, param = tparam });
}
}
else
{
lock (Current._actions)
{
Current._actions.Add(new NoDelayedQueueItem { action = taction, param = tparam });
}
}
}
public static Thread RunAsync(Action a)
{
Initialize();
while (numThreads >= maxThreads)
{
Thread.Sleep(100);
}
Interlocked.Increment(ref numThreads);
ThreadPool.QueueUserWorkItem(RunAction, a);
return null;
}
private static void RunAction(object action)
{
try
{
((Action)action)();
}
catch
{
}
finally
{
Interlocked.Decrement(ref numThreads);
}
}
void OnDisable()
{
if (_current == this)
{
_current = null;
}
}
// Use this for initialization
void Start()
{
}
List<NoDelayedQueueItem> _currentActions = new List<NoDelayedQueueItem>();
// Update is called once per frame
void Update()
{
if (_actions.Count > 0)
{
lock (_actions)
{
_currentActions.Clear();
_currentActions.AddRange(_actions);
_actions.Clear();
}
for (int i = 0; i < _currentActions.Count; i++)
{
_currentActions[i].action(_currentActions[i].param);
}
}
if (_delayed.Count > 0)
{
lock (_delayed)
{
_currentDelayed.Clear();
_currentDelayed.AddRange(_delayed.Where(d => d.time <= Time.time));
for (int i = 0; i < _currentDelayed.Count; i++)
{
_delayed.Remove(_currentDelayed[i]);
}
}
for (int i = 0; i < _currentDelayed.Count; i++)
{
_currentDelayed[i].action(_currentDelayed[i].param);
}
}
}
}
复制代码