参考连接 : 数组
http://esprog.hatenablog.com/entry/2018/05/19/150313安全
https://blogs.unity3d.com/2018/10/22/what-is-a-job-system/多线程
Job系统做为一个多线程系统, 它由于跟ECS有天生的融合关系因此比较重要的样子, 我也按照使用类型的分类来看看Job System到底怎么样.函数
Job说实话就是一套封装的多线程系统, 我相信全部开发人员都能本身封装一套, 因此Unity推出这个的时候跟着ECS一块儿推出, 由于单独推出来的话确定推不动, 多线程, 线程安全, 线程锁, 线程共享资源, 这些都没什么区别, 我从一个简单列表的功能来讲吧.oop
先来一个普通的多线程 : 测试
using System.Collections; using System.Collections.Generic; using UnityEngine; using System; using System.Threading; public class NormalListAccessTest01 : MonoBehaviour { public class RunData { public List<int> datas = new List<int>(); public float speed; public float deltaTime; } public static void RunOnThread<T>(System.Action<T> call, T obj, System.Action endCall = null) { System.Threading.ThreadPool.QueueUserWorkItem((_obj) => { call.Invoke(obj); if(endCall != null) { ThreadMaster.Instance.CallFromMainThread(endCall); } }); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Run Test")) { ThreadMaster.GetOrCreate(); var data = new RunData(); data.deltaTime = Time.deltaTime; data.speed = 100.0f; for(int i = 0; i < 10000; i++) { data.datas.Add(i); } RunOnThread<RunData>((_data) => { // 这是在工做线程里 Debug.Log("Start At : " + System.DateTime.Now.ToString("HH:mm:ss fff")); var move = _data.deltaTime * _data.speed; for(int i = 0; i < _data.datas.Count; i++) { var val = _data.datas[i] + 1; _data.datas[i] = val; } }, data, () => { // 这是在主线程里 Debug.Log(data.datas[0]); Debug.Log("End At : " + System.DateTime.Now.ToString("HH:mm:ss fff")); }); } } }
线程转换的一个简单封装ThreadMaster : 优化
using System.Collections; using System.Collections.Generic; using UnityEngine; public class ThreadMaster : MonoBehaviour { private static ThreadMaster _instance; public static ThreadMaster Instance { get { return GetOrCreate(); } } private volatile List<System.Action> _calls = new List<System.Action>(); public static ThreadMaster GetOrCreate() { if(_instance == false) { _instance = new GameObject("ThreadMaster").AddComponent<ThreadMaster>(); } return _instance; } public void CallFromMainThread(System.Action call) { _calls.Add(call); } void Update() { if(_calls.Count > 0) { for(int i = 0; i < _calls.Count; i++) { var call = _calls[i]; call.Invoke(); } _calls.Clear(); } } }
没有加什么锁, 简单运行没有问题, 下面来个Job的跑一下: this
using UnityEngine; using Unity.Collections; using Unity.Jobs; public class JobSystemSample00 : MonoBehaviour { struct VelocityJob : IJob { public NativeArray<int> datas; public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } } } public void Test() { var datas = new NativeArray<int>(100, Allocator.Persistent); var job = new VelocityJob() { datas = datas }; JobHandle jobHandle = job.Schedule(); JobHandle.ScheduleBatchedJobs(); //Debug.Log(datas[0]); // Error : You must call JobHandle.Complete() jobHandle.Complete(); Debug.Log(datas[0]); datas.Dispose(); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }
这里就有一个大问题了, 在有注释的地方 // Error : You must call JobHandle.Complete(), 是说在Job没有调用Complete()时, 去获取相关数组内容是非法的! 而这个jobHandle.Complete(); 没法经过工做线程去调用, 也就是说Job的运行它是没法自行结束的, 没法发出运行结束的通知的, 对比上面封装的普通多线程弱爆了. 而这个Complete()函数若是在工做线程执行完成前调用, 会强制当即执行(文档也是写 Wait for the job to complete), 也就是说它只能在主线程调用而且会阻塞主线程, 这样就能够定性了, 它的Job System不是为了提供通常使用的多线程封装给咱们用的, 但是它又是很强大的, 由于它能使用高效的内存结构, 能保证数据访问安全, 能在须要的时候调用Complete方法强制等待工做线程执行完毕(若是没猜错的话, 引擎对这个作了很大优化, 并非简单等待), 还有BurstCompile等, 若是咱们封装成功了的话, 就是很好的多线程库了.spa
PS : 打个比方一个mesh的渲染, 在渲染以前必须计算完全部坐标转换, Job的好处就是能够进行多线程并行的计算, 而后还能被主线程强制执行完毕, 比在主线程中单独计算强多了. 而这个强制执行才是核心逻辑.线程
通过几回测试, 几乎没有办法简单扩展Job系统来让它成为像上面同样拥有自动完成通知的系统, 以下 :
1. 添加JobHandle变量到IJob中, 在Execute结束时调用
struct VelocityJob : IJob { public NativeArray<int> datas; [Unity.Collections.LowLevel.Unsafe.NativeDisableUnsafePtrRestriction] public JobHandle selfHandle; // 是这个IJob调用Schedule的句柄 public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } selfHandle.Complete(); } }
报错, InvalidOperationException: VelocityJob.selfHandle.jobGroup uses unsafe Pointers which is not allowed. 没法解决, 直接就没法在IJob结构体中添加JobHandle变量. 而且没法在工做线程中调用Complete方法.
2. 添加回调函数进去
struct VelocityJob : IJob { public NativeArray<int> datas; public System.Action endCall; public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } if(endCall != null) { endCall.Invoke(); } } }
报错, Job系统的struct里面只能存在值类型的变量 !!-_-
3. 使用全局的引用以及线程转换逻辑来作成自动回调的形式, 虽然可使用了但是很是浪费资源 :
using UnityEngine; using Unity.Collections; using Unity.Jobs; using System.Collections.Generic; public class JobSystemSample01 : MonoBehaviour { private static int _id = 0; public static int NewID => _id++; public static Dictionary<int, IJobCall> ms_handleRef = new Dictionary<int, IJobCall>(); public class IJobCall { public JobHandle jobHandle; public System.Action endCall; } struct VelocityJob : IJob { public NativeArray<int> datas; public int refID; public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } var handle = ms_handleRef[refID]; ThreadMaster.Instance.CallFromMainThread(() => { handle.jobHandle.Complete(); if(handle.endCall != null) { handle.endCall.Invoke(); } }); } } public void Test() { ThreadMaster.GetOrCreate(); var datas = new NativeArray<int>(100, Allocator.Persistent); int id = NewID; var job = new VelocityJob() { refID = id, datas = datas }; ms_handleRef[id] = new IJobCall() { jobHandle = job.Schedule(), endCall = () => { Debug.Log(datas[0]); datas.Dispose(); } }; } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }
经过上面封装就能够做为通常多线程使用了, 而且咱们得到了引擎提供的数据安全和高效逻辑性, 再加上利用BurstCpmpile和只读属性, 可以提高一些计算效率吧. ECS on Job已经在另一篇中说过了, 这里忽略了.
----------------------------------------------
当我测试到IJobParallelFor的时候, 发现并行并不像GPU那样的并行那么美好, 由于GPU它自己就是全并行的, 像卷积之类的, 它跟像素的处理顺序自己就没有关系, 但是咱们的逻辑有些会受顺序的影响. 先看看下面的代码 :
using UnityEngine; using Unity.Collections; using Unity.Jobs; public class IJobParallelForSample01 : MonoBehaviour { struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { if(index == 0) { index = datas.Length - 1; } datas[index] = datas[index - 1] + 1; } } public void Test() { var datas = new NativeArray<int>(100, Allocator.Persistent); for(int i = 0; i < datas.Length; i++) { datas[i] = i; } var job = new VelocityJob() { datas = datas }; var jobHandle = job.Schedule(datas.Length, 20); JobHandle.ScheduleBatchedJobs(); jobHandle.Complete(); Debug.Log(datas[0]); datas.Dispose(); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }
主要的是Schedule的方法上 : public static JobHandle Schedule<T>(this T jobData, int arrayLength, int innerloopBatchCount, JobHandle dependsOn = default) where T : struct, IJobParallelFor;
第二个参数innerloopBatchCount表示的是分块的大小, 好比咱们数组长度是100, 每20个元素分红一块, 一共能够分5块, 若是你的CPU核心数大于等于5它就能开5个线程来处理, 但是你不能去获取这个块以外的Index的数据:
显然这里数据每20个一组被分为了5组, 在5个线程里, 而后跨组获取数据就报错了.
测试一下线程数是否5个 :
struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { throw new System.Exception(index + " ERROR"); } }
5个线程报错, 应该每一个线程内的处理也是按照for的顺序来的.
把每一个块改为5的大小, 看看它能开几个线程:
var jobHandle = job.Schedule(datas.Length, 5);
恩开了8个, 个人机器确实是8核的, 不过它的分块不是我想的0-5-10-15, 或者0-12-24-36 而是整10的, 不知道为何, 由于按照我设定每一个分组是5, 而总体平均100/8=12.5而不该该是整10的, 具体不详.
若是咱们要跟其它元素进行交互, 就只能把处理单元设置到跟数组同样大, 才能在一个块中处理:
using UnityEngine; using Unity.Collections; using Unity.Jobs; public class IJobParallelForSample01 : MonoBehaviour { struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { if(index > 0 && index < datas.Length - 1) { datas[index] = datas[datas.Length - 1]; } } } public void Test() { var datas = new NativeArray<int>(10, Allocator.Persistent); for(int i = 0; i < datas.Length; i++) { datas[i] = i; } var job = new VelocityJob() { datas = datas }; var jobHandle = job.Schedule(datas.Length, datas.Length); JobHandle.ScheduleBatchedJobs(); jobHandle.Complete(); Debug .Log(datas[0]); datas.Dispose(); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }
顺便测试一下各个线程的分配状况:
private volatile static Dictionary<int, List<int>> ms_threads = new Dictionary<int, List<int>>(); struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { Debug.Log(index + " : " + System.Threading.Thread.CurrentThread.ManagedThreadId); lock(ms_threads) { List<int> val = null; ms_threads.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out val); if(val == null) { val = new List<int>(); ms_threads[System.Threading.Thread.CurrentThread.ManagedThreadId] = val; } val.Add(index); } } }
var jobHandle = job.Schedule(100, 5);
结果是分为8个线程, 4个线程的块为10, 4个为15
因此不能想固然的去获取其它Index的内容, 毕竟分块逻辑不必定.