AsyncLocal
的实现很简单,将AsyncLocal
实例和当前线程的值以键值对的形式保存在Thread.CurrentThread.ExecutionContext.m_localValues.
中。因为使用[ThreadStatic]
修饰了 Thread.CurrentThread
属性对应的字段,因此实现了多个线程之间各自维护不一样的一份数据。同时,在每一次修改AsyncLocal
的时候,都新建了ExecutionContext
和IAsyncLocalValueMap
对象并赋值给当前的线程。异步
AsyncLocal
的测试代码class AsyncLocalTests : Singleton<AsyncLocalTests>,ITestMethod { private readonly AsyncLocal<int> asyncLocalVariable = new AsyncLocal<int>(); public async Task MethodAsync() { asyncLocalVariable.Value = 88; await Task.Run(() => { Console.WriteLine($"进入 Task,值:{asyncLocalVariable.Value};线程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); asyncLocalVariable.Value = 888; }); Console.WriteLine($"await Task 后,值:{asyncLocalVariable.Value};线程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); } public void RunTest() { asyncLocalVariable.Value = 1; Console.WriteLine($"初始值:{asyncLocalVariable.Value};线程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); MethodAsync(); Thread.Sleep(1000); Console.WriteLine($"async方法后,值:{asyncLocalVariable.Value};线程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); Console.ReadKey(); } }
MethodAsync()
异步方法先后,AsyncLocal
.Value
的值相同await MethodAsync()
异步方法先后,AsyncLocal
.Value
的值相同await Task.Run()
先后代码块中,AsyncLocal
.Value
的值相同因为AsyncLocal
是从 Thread.CurrentThread.ExecutionContext
获取实际的值,那么理解ExecutionContext
在 async、Task、Thread
的中流动就十分重要。先说结论,并简单描述一下缘由:async
Task.Run()
先后,ExecutionContext
相同(处于不一样线程)缘由:这是由于 Task.Run()
和Thread.Start()
会捕获当前线程的 ExecutionContext
传递给工做线程,而且在工做线程修改 AsyncLocal
的值, 不会影响原线程的ExecutionContext
。由于每次修改 AsyncLocal
的值,都会新建 ExecutionContext
实例并保存到工做线程ide
MethodAsync()
先后代码块的 ExecutionContext
相同(不使用await
)缘由:在状态机第一次执行先后会备份、恢复ExecutionContext
(线程并无进行切换)函数
await Task.Run()
先后代码块的 ExecutionContext
相同(处于不一样线程)缘由:咱们知道await Task.Run()
确定位于一个异步方法中,该异步方法会被编译成一个状态机,经过状态的切换,将await
先后的代码分红了两步来执行。第一次执行由当前线程执行,在开启新 Task
后、当前线程返回以前,会保存当前线程的 ExecutionContext
,供状态机第二次执行使用(工做线程)。从第一点咱们知道新建Task
实例的时候会捕获一次ExecutionContext
给工做线程,碰到await
返回以前会捕获一次ExecutionContext
给状态机,这两次捕获的其实是同一个对象。oop
await MethodAsync()
先后代码块的 ExecutionContext
相同(处于不一样线程)缘由:在状态机第一次执行(当前线程)的时候,会捕获当前线程的 ExecutionContext
,供状态机第二次执行使用(工做线程)。测试
async
关键字对ExecutionContext
的影响async
关键字其实是编译器的语法糖,能够经过Dnspy
反编译查看去除语法糖的原始代码。
Dnspy
配置以下,去除勾选"反编译异步方法(async/await)"
ui
public async Task MethodAsyncWithAwait() { asyncLocalVariable.Value = 88; await Task.Run(() => { asyncLocalVariable.Value = 888; }); asyncLocalVariable.Value = 8888; }
async/await
语法糖代码:能够看到编译器生成了一个实现 IAsyncStateMachine
接口的异步状态机,并生成了一个私有方法,保存了Task.Run()
中的的代码块。异步方法中实际上作了如下四个步骤:this
AsyncTaskMethodBuilder
AsyncTaskMethodBuilder.Sratr(ref IAsyncStateMachine)
启动状态机Task
public Task MethodAsyncWithAwait() { AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = new AsyncLocalTests.<MethodAsyncWithAwait>d__1(); <MethodAsyncWithAwait>d__.<>4__this = this; <MethodAsyncWithAwait>d__.<>t__builder = AsyncTaskMethodBuilder.Create(); <MethodAsyncWithAwait>d__.<>1__state = -1; AsyncTaskMethodBuilder <>t__builder = <MethodAsyncWithAwait>d__.<>t__builder; <>t__builder.Start<AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref <MethodAsyncWithAwait>d__); return <MethodAsyncWithAwait>d__.<>t__builder.Task; }
能够看到在执行 stateMachine.MoveNext()
以前备份了当前线程的 _executionContext
和 _synchronizationContext
,而且在 finally
代码块中恢复了备份的数据。
这样也就解释了:在不使用 await
等待异步方法的状况下,虽然在原线程修改了AsyncLocal
的值,可是离开async
方法后,咱们获取的仍是原来的值。值得注意的是,这里的备份恢复针对的都是当前线程,而不涉及到工做线程。spa
public void Start<[Nullable(0)] TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { AsyncMethodBuilderCore.Start<TStateMachine>(ref stateMachine); } internal static class AsyncMethodBuilderCore { [DebuggerStepThrough] public static void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { if (stateMachine == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.stateMachine); } Thread currentThread = Thread.CurrentThread; Thread thread = currentThread; ExecutionContext executionContext = currentThread._executionContext; ExecutionContext executionContext2 = executionContext; SynchronizationContext synchronizationContext = currentThread._synchronizationContext; try { stateMachine.MoveNext(); } finally { SynchronizationContext synchronizationContext2 = synchronizationContext; Thread thread2 = thread; if (synchronizationContext2 != thread2._synchronizationContext) { thread2._synchronizationContext = synchronizationContext2; } ExecutionContext executionContext3 = executionContext2; ExecutionContext executionContext4 = thread2._executionContext; if (executionContext3 != executionContext4) { ExecutionContext.RestoreChangedContextToThread(thread2, executionContext3, executionContext4); } } }
实际上编译器将标记为 async
的方法分红了两部分,一部分是 await
以前的代码(包括新建并启动启动Task
部分),另外一部分是 await
以后的代码。经过状态的改变,这两部分代码分两次执行。若是没有使用await
修饰异步方法,那么该状态机没有 else
代码块, 只会执行一次stateMachine.MoveNext()
。能够看到在第一次执行stateMachine.MoveNext()
以后,当前线程就直接返回了,而后一层层的返回到最外层。这也是为何说碰到await
以后,当前线程就直接返回,固然最内层的返回是在开启新Task
以后。线程
[CompilerGenerated] private void <MethodAsyncWithAwait>b__1_0() { this.asyncLocalVariable.Value = 888; } [CompilerGenerated] private sealed class <MethodAsyncWithAwait>d__1 : IAsyncStateMachine { void IAsyncStateMachine.MoveNext() { int num = this.<>1__state; try { TaskAwaiter awaiter; if (num != 0) { this.<>4__this.asyncLocalVariable.Value = 88; awaiter = Task.Run(new Action(this.<>4__this.<MethodAsyncWithAwait>b__1_0)).GetAwaiter(); if (!awaiter.IsCompleted) { this.<>1__state = 0; this.<>u__1 = awaiter; AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = this; this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref awaiter, ref <MethodAsyncWithAwait>d__); return; } } else { awaiter = this.<>u__1; this.<>u__1 = default(TaskAwaiter); this.<>1__state = -1; } awaiter.GetResult(); this.<>4__this.asyncLocalVariable.Value = 8888; } catch (Exception exception) { this.<>1__state = -2; this.<>t__builder.SetException(exception); return; } this.<>1__state = -2; this.<>t__builder.SetResult(); } [DebuggerHidden] void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine) { } public int <>1__state; public AsyncTaskMethodBuilder <>t__builder; public AsyncLocalTests <>4__this; private TaskAwaiter <>u__1; }
咱们能够看到在第一次执行stateMachine.MoveNext()
的时候,会经过当前线程执行 await
以前的代码块,并经过Task.Run()
启用工做线程去完成任务。IAsyncStateMachine.MoveNext()
里面有三句代码比较重要,这里先大概描述一下做用:
Task.Run()
:新建Task
实例、捕获当前线程的ExecutionContext
保存到Task
实例中、启动新任务
AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine)
:将状态机和当前线程的上下文包装成 AsyncStateMachineBox:Task
对象,保存到在 Task(第一步新建的
字段中,最后将其传递到 Task
实例).m_continuationObjectstateMachine.AsyncTaskMethodBuilder.Task
属性中, 这样外层状态机能够经过MethodAsync().GetAwaiter().m_task
获取到内层状态机的AsyncStateMachineBox:Task
对象,一样的外层状态机再次执行本步骤,将自身的 AsyncStateMachineBox:Task
对象赋值给内层 AsyncStateMachineBox:Task
对象的 m_continuationObject
字段,这样的话,就构建了一个单向链表,该链表保存了每一层异步方法的stateMachine
和 ExecutionContext
。
this.<>t__builder.SetResult()
:Task.m_continuationObject
是否为空,不为空的状况下,执行外层状态机的第二次 stateMachine.MoveNext()
。最内层 Task.m_continuationObject
的执行会在Task
完成以后调用,接下来经过SetResult()
一层层调用了外层状态机的第二次 stateMachine.MoveNext()
AwaitUnsafeOnCompleted()
代码分析如下只放出了简化的代码。这里首先构建 IAsyncStateMachineBox
实例,并将其赋值给 m_task
供外层状态机使用。IAsyncStateMachineBox
实例保存了本层的状态机,并捕获了当前线程的ExecutionContext
。awaiter.m_task
是调用内层异步方法返回的 Task
实例。最后在 TaskAwaiter.UnsafeOnCompletedInternal()
方法中,将构建的IAsyncStateMachineBox
实例保存到 awaiter(内层).m_task.m_continuationObject
字段中,使得内层状态机指向本层状态机。由于每一层状态机都会调用AwaitUnsafeOnCompleted
方法,因此一层层构建了 await
后的全部回调,而且每一层回调的 ExecutionContext
都不一样。
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine { IAsyncStateMachineBox box = GetStateMachineBox(ref stateMachine); if ((null != (object)default(TAwaiter)!) && (awaiter is ITaskAwaiter)) { ref TaskAwaiter ta = ref Unsafe.As<TAwaiter, TaskAwaiter>(ref awaiter); TaskAwaiter.UnsafeOnCompletedInternal(ta.m_task, box, continueOnCapturedContext: true); } }
private IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { ExecutionContext? currentContext = ExecutionContext.Capture(); AsyncStateMachineBox<TStateMachine> box = AsyncMethodBuilderCore.TrackAsyncMethodCompletion ? CreateDebugFinalizableAsyncStateMachineBox<TStateMachine>() : new AsyncStateMachineBox<TStateMachine>(); m_task = box; box.StateMachine = stateMachine; box.Context = currentContext; return box; }
SetResult()
代码分析该方法最后调用了 Task
方法,读取了Task.m_continuationObject
来获取外一层的回调(包括状态机、ExecutionContext
),并经过FinishContinuations()
执行外层状态机的第二次执行。
internal bool TrySetResult([AllowNull] TResult result) { bool result2 = false; if (base.AtomicStateUpdate(67108864, 90177536)) { this.m_result = result; Interlocked.Exchange(ref this.m_stateFlags, this.m_stateFlags | 16777216); Task.ContingentProperties contingentProperties = this.m_contingentProperties; if (contingentProperties != null) { base.NotifyParentIfPotentiallyAttachedTask(); contingentProperties.SetCompleted(); } base.FinishContinuations(); result2 = true; } return result2; }
FinishContinuations()
方法接下来的调用在 Task.Run()
部分会有介绍。
internal void FinishContinuations() { object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel); if (obj != null) { this.RunContinuations(obj); } }
Task.Run()
代码分析实际上在Task.Run()
内部也是先新建一个Task
实例,而后经过Task.ScheduleAndStart()
方法来调度并启动任务。二者的区别在于传入的Options
和scheduler
是不相同的。
var task1 = Task.Run(() => { }); //默认配置没法更改: InternalTaskOptions.QueuedByRuntime, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default //t.ScheduleAndStart(false); var task2 = new Task(() => { }, TaskCreationOptions.LongRunning); //默认配置能够修改: InternalTaskOptions.None, TaskCreationOptions.None, scheduler:null task2.Start(); //能够传入scheduler //默认使用TaskScheduler.Current: 先取[ThreadStatic]Task.InternalCurrent,若是为空取 TaskScheduler.Default //t.ScheduleAndStart(true);
经过在Task
的构造函数中调用ExecutionContext.Capture()
方法来保存当前线程的ExecutionContext
到Task
实例中,这样的话,只要将到Task
实例做为参数传入到工做线程中,工做线程就能够获取到ExecutionContext
internal Task(Delegate action, object state, Task parent, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) { if (action == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.action); } if (parent != null && (creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None) { this.EnsureContingentPropertiesInitializedUnsafe().m_parent = parent; } this.TaskConstructorCore(action, state, cancellationToken, creationOptions, internalOptions, scheduler); this.CapturedContext = ExecutionContext.Capture(); }
Task.ScheduleAndStart()
方法:Task
的调度分两种状况:一、配置了TaskCreationOptions.LongRunning
的Task
实例直接新建一个后台 Thread
,并将Task
实例做为启动参数来启动工做线程;二、对于没有配置TaskCreationOptions.LongRunning
的Task
实例,将其加入ThreadPool
的线程池,由线程池来调度运行
internal sealed class ThreadPoolTaskScheduler : TaskScheduler { protected internal override void QueueTask(Task task) { TaskCreationOptions options = task.Options; if ((options & TaskCreationOptions.LongRunning) != 0) { // Run LongRunning tasks on their own dedicated thread. Thread thread = new Thread(s_longRunningThreadWork); thread.IsBackground = true; // Keep this thread from blocking process shutdown thread.Start(task); } else { // Normal handling for non-LongRunning tasks. bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0); ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal); } } }
虽然已经在Task的构造函数中,捕获了ExecutionContext
,可是对于直接新建的Thread实例,启动的时候一样也须要捕获当前线程的ExecutionContext
public void Start() { this.StartupSetApartmentStateInternal(); if (this._delegate != null) { ThreadHelper threadHelper = (ThreadHelper)this._delegate.Target; ExecutionContext executionContextHelper = ExecutionContext.Capture(); threadHelper.SetExecutionContextHelper(executionContextHelper); } this.StartInternal(); }
Task、IThreadPoolWorkItem
逻辑:从线程池取出工做线程,工做线程调用Dispatch()
方法,对于IThreadPoolWorkItem
,直接执行IThreadPoolWorkItem.Execute()
方法,因此线程池处理IThreadPoolWorkItem
是不涉及到上下文切换的。对于 Task
,将ExecutionContext
赋值给工做线程,调用委托,而后清除工做线程的上下文,最后调用Finish(true)
来执行任务完成的回调方法。调用链路很长,这里直接跳到RunContinuations()
方法。
调用链路:
// 最内层 async 状态机 执行await Task.Run()以后的代码块 //=>Task.Finish(true); //=>FinishStageTwo(); //===>FinishStageThree(); //=====>FinishContinuations(); //=======>RunContinuations(continuationObject); //=========>AwaitTaskContinuation.RunOrScheduleAction(asyncStateMachineBox, flag); 状态机的回调执行这个 //==========>box.ExecuteFromThreadPool(threadPoolThread); 或者 box.MoveNext();
咱们知道,异步方法的最内层确定有一个 await Task
。 正是Task.Finish(true)
这个方法调用了最内层状态机,去执行第二次stateMachine.MoveNext()
方法,而且在MoveNext()
方法中都会调用SetResult()
方法,从而触发外层状态机的第二次stateMachine.MoveNext()
执行,就这样一层层的调用完成了全部的层次的回调。能够看到,工做线程在执行 await Task.Run()/MethodAsnc()
后代码块时,传入的是在 AwaitUnsafeOnCompleted()
方法中捕获的 ExecutionContext
。
s_callback
字段保存了状态机的MoveNext()
方法。
private class AsyncStateMachineBox<TStateMachine> :Task<TResult>,IAsyncStateMachineBox where TStateMachine : IAsyncStateMachine { private static readonly ContextCallback s_callback = ExecutionContextCallback; private static void ExecutionContextCallback(object? s) { Unsafe.As<AsyncStateMachineBox<TStateMachine>>(s).StateMachine!.MoveNext(); } public TStateMachine StateMachine = default; public ExecutionContext? Context; internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread); internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread); public void MoveNext() => MoveNext(threadPoolThread: null); private void MoveNext(Thread? threadPoolThread) { bool loggingOn = AsyncCausalityTracer.LoggingOn; if (loggingOn) { AsyncCausalityTracer.TraceSynchronousWorkStart(this, CausalitySynchronousWork.Execution); } ExecutionContext? context = Context; if (context == null) { Debug.Assert(StateMachine != null); StateMachine.MoveNext(); } else { if (threadPoolThread is null) { ExecutionContext.RunInternal(context, s_callback, this); } else { ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, context, s_callback, this); } } if (IsCompleted) { if (System.Threading.Tasks.Task.s_asyncDebuggingEnabled) { System.Threading.Tasks.Task.RemoveFromActiveTasks(this); } StateMachine = default; Context = default; } if (loggingOn) { AsyncCausalityTracer.TraceSynchronousWorkCompletion(CausalitySynchronousWork.Execution); } } }
最后在 RunInternal
和 RunFromThreadPoolDispatchLoop
中,都会使用在AwaitUnsafeOnCompleted()
方法里面捕获的 ExecutionContext
,这也就解释了为何在 await Task.Run()/MethodAsync()
先后的代码块中,ExecutionContext
始终相同。须要注意的一点是,无论在Task
任务执行以后,仍是 await
回调执行以后,都会把工做线程的上下文清空。
internal static void RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, object state) { if (executionContext != null && !executionContext.m_isDefault) { ExecutionContext.RestoreChangedContextToThread(threadPoolThread, executionContext, null); } ExceptionDispatchInfo exceptionDispatchInfo = null; try { callback(state); } catch (Exception source) { exceptionDispatchInfo = ExceptionDispatchInfo.Capture(source); } ExecutionContext executionContext2 = threadPoolThread._executionContext; threadPoolThread._synchronizationContext = null; if (executionContext2 != null) { ExecutionContext.RestoreChangedContextToThread(threadPoolThread, null, executionContext2); } if (exceptionDispatchInfo != null) { exceptionDispatchInfo.Throw(); } }