在使用异步方法中最好不要使用void当作返回值,无返回值也应使用Task做为返回值,由于使用void做为返回值具备如下缺点ios
- 没法得知异步函数的状态机在何时执行完毕
- 若是异步函数中出现异常,则会致使进程崩溃
❌异步函数不该该返回voidgit
static void Main(string[] args) { try { // 若是Run方法无异常正常执行,那么程序没法得知其状态机何时执行完毕 Run(); } catch (Exception ex) { Console.WriteLine(ex.Message); } Console.Read(); } static async void Run() { // 因为方法返回的为void,因此在调用此方法时没法捕捉异常,使得进程崩溃 throw new Exception("异常了"); await Task.Run(() => { }); }
☑️应该将异步函数返回Taskgithub
static async Task Main(string[] args) { try { // 由于在此进行await,因此主程序知道何时状态机执行完成 await RunAsync(); Console.Read(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } static async Task RunAsync() { // 由于此异步方法返回的为Task,因此此异常能够被捕捉 throw new Exception("异常了"); await Task.Run(() => { }); }
注:事件是一个例外,异步事件也是返回void编程
对于一些预先知道的结果或者只是一个简单的计算函数,使用Task,FromResult要比Task.Run性能要好,由于Task.FromResult只是建立了一个包装已计算任务的任务,而Task.Run会将一个工做项在线程池进行排队,计算,返回.而且使用Task.FromResult在具备SynchronizationContext 程序中(例如WinForm)调用Result或wait()并不会死锁(虽然并不建议这么干)c#
❌对于预计算或普通计算的函数不该该这么写api
public async Task<int> RunAsync() { return await Task.Run(()=>1+1); }
☑️而应该使用Task.FromResult代替缓存
public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
还有另一种代替方法,那就是使用ValueTask
☑️使用ValueTask
static async Task Main(string[] args) { await AddAsync(1, 1); } static ValueTask<int> AddAsync(int a, int b) { // 返回一个可被等待的ValueTask类型 return new ValueTask<int>(a + b); }
注: ValueTask
结构是C#7.0加入的,存在于Sysntem,Threading.Task.Extensions包中async
长时间运行的工做是指在应用程序生命周期执行后台工做的线程,如:执行processing queue items,执行sleeping,执行waiting或者处理某些数据,此类线程不建议使用Task.Run方法执行,由于Task.Run方法是将任务在线程池内进行排队执行,若是线程池线程进行长时间堵塞,会致使线程池增加,进而浪费性能,因此若是想要运行长时间的工做建议直接建立一个新线程进行工做
❌下面这个例子就利用了线程池执行长时间的阻塞工做
public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection<Message>(); public void StartProcessing() { Task.Run(ProcessQueue); } public void Enqueue(Message message) { _messageQueue.Add(message); } private void ProcessQueue() { foreach (var item in _messageQueue.GetConsumingEnumerable()) { ProcessItem(item); } } private void ProcessItem(Message message) { } }
☑️因此应该改为这样
public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection<Message>(); public void StartProcessing() { var thread = new Thread(ProcessQueue) { // 设置线程为背后线程,使得在主线程结束时此线程也会自动结束 IsBackground = true }; thread.Start(); } public void Enqueue(Message message) { _messageQueue.Add(message); } private void ProcessQueue() { foreach (var item in _messageQueue.GetConsumingEnumerable()) { ProcessItem(item); } } private void ProcessItem(Message message) { } }
🔔线程池内线程增长会致使在执行时大量的进行上下文切换,从而浪费程序的总体性能, 线程池详细信息请参考CLR第27章
🔔Task.Factory.StartNew方法中有一个TaskCreationOptions参数重载,若是设置为LongRunning,则会建立一个新线程执行
// 此方法会建立一个新线程进行执行 Task.Factory.StartNew(() => { }, TaskCreationOptions.LongRunning);
使用Task.Result和Task.Wait()两个方法进行阻塞异步同步化比直接同步方法阻塞还要MUCH worse(更糟),这种方式被称为Sync over async 此方式操做步骤以下
1.异步线程启动
2.调用线程调用Result或者Wait()进行阻塞
3.异步完成时,将一个延续代码调度到线程池,恢复等待该操做的代码
虽然看起来并无什么关系,可是其实这里倒是使用了两个线程来完成同步操做,这样一般会致使线程饥饿和死锁
🔔线程饥饿(starvation):指等待时间已经影响到进程运行,若是等待时间过长,致使进程使命没有意义时,称之为饿死
🔔死锁(deadlock):指两个或两个以上的线程相互争夺资源,致使进程永久堵塞,
🔔使用Task.Result和Task.Wait()会在winform和ASP.NET中会死锁,由于它们SynchronizationContext
具备对象,两个线程在SynchronizationContext
争夺致使死锁,而ASP.NET Core则不会产生死锁,由于ASP.NET Core本质是一个控制台应用程序,并无上下文
❌下面的例子,虽然都不会产生死锁,可是依然具备不少问题
async Task<string> RunAsync() { // 此线程ID输出与UI线程ID不一致 Debug.WriteLine("UI线程:"+Thread.CurrentThread.ManagedThreadId); return await Task.Run(() => "Run"); } string DoOperationBlocking() { // 这种方法虽然摆脱了死锁的问题,可是也致使了上下文问题,RunAsync不在以UI线程调用 // Result和Wait()方法若是出现异常,异常将被包装为AggregateException进行抛出, return Task.Run(() => RunAsync()).Result; } } private async void button1_Click(object sender, EventArgs e) { Debug.WriteLine("RunAsync:" + Thread.CurrentThread.ManagedThreadId); Debug.WriteLine(DoOperationBlocking()); }
public string DoOperationBlocking2() { // 此方法也是会致使上下文问题, // GetAwaiter()方法对异常不会包装 return Task.Run(() => RunAsync()).GetAwaiter().GetResult(); }
在async和await,当时可使用continueWith来延迟执行一些方法,可是continueWith并不会捕捉`SynchronizationContext
`,因此建议使用await代替continueWith
❌下面例子就是使用continueWith
private void button1_Click(object sender, EventArgs e) { Debug.WriteLine("UI线程:" + Thread.CurrentThread.ManagedThreadId); RunAsync().ContinueWith(task => { Console.WriteLine("RunAsync returned:"+task.Result); // 由于是使用的continueWith,因此线程ID与UI线程并不一致 Debug.WriteLine("ContinueWith:" + Thread.CurrentThread.ManagedThreadId); }); } public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
☑️应该使用await来代替continueWith
private async void button1_Click(object sender, EventArgs e) { Debug.WriteLine("UI线程:" + Thread.CurrentThread.ManagedThreadId); Debug.WriteLine("RunAsync returned:"+ await RunAsync()); Debug.WriteLine("UI线程:" + Thread.CurrentThread.ManagedThreadId); } public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
对于编写类库的人来讲TaskCompletionSource<T>
是一个具备很是重要的做用,默认状况下任务延续可能会在调用try/set(Result/Exception/Cancel)的线程上进行运行,这也就是说做为编写类库的人来讲必须须要考虑上下文,这一般是很是危险,可能就会致使死锁' 线程池饥饿 *数据结构损坏(若是代码异常运行)
因此在建立TaskCompletionSourece<T>
时,应该使用TaskCreationOption.RunContinuationAsyncchronously
参数将后续任务交给线程池进行处理
❌下面例子就没有使用TaskCreationOptions.RunComtinuationsAsynchronously
,
static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); var tcs = new TaskCompletionSource<bool>(); // 使用TaskContinuationOptions.ExecuteSynchronously来测试延续任务 ContinueWith(1, tcs.Task); // 测试await延续任务 ContinueAsync(2, tcs.Task); Task.Run(() => { Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId ); tcs.TrySetResult(true); }); Console.ReadLine(); } static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}"); static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); print(id); } static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => print(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); }
☑️因此应该改成使用TaskCreationOptions.RunComtinuationsAsynchronously
参数进行设置TaskCompletionSoure
static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); // 使用TaskContinuationOptions.ExecuteSynchronously来测试延续任务 ContinueWith(1, tcs.Task); // 测试await延续任务 ContinueAsync(2, tcs.Task); Task.Run(() => { Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); tcs.TrySetResult(true); }); Console.ReadLine(); } static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}"); static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); print(id); } static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => print(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); }
🔔TaskCreationOptions.RunContinuationsAsynchronously
属性和TaskContinuationOptions.RunContinuationsAsynchronously
很类似,但请注意它们的使用方式
用于进行超时的CancellationTokenSources,若是不释放,则会增长timer queue(计时器队列)
的压力
❌下面例子由于没有释放,因此在每次请求发出以后,计时器在队列中停留10秒钟
public async Task<Stream> HttpClientAsyncWithCancellationBad() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); using (var client = _httpClientFactory.CreateClient()) { var response = await client.GetAsync("http://backend/api/1", cts.Token); return await response.Content.ReadAsStreamAsync(); } }
☑️因此应该及时的释放CancellationSoure,使得正确的从队列中删除计时器
public async Task<Stream> HttpClientAsyncWithCancellationGood() { using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))) { using (var client = _httpClientFactory.CreateClient()) { var response = await client.GetAsync("http://backend/api/1", cts.Token); return await response.Content.ReadAsStreamAsync(); } } }
🔔设置延迟时间具备两种方式
1.构造器参数
public CancellationTokenSource(TimeSpan delay); public CancellationTokenSource(int millisecondsDelay);2.调用实例对象CancelAfter()
public void CancelAfter(TimeSpan delay);
public void CancelAfter(int millisecondsDelay);
因为在.NET中取消操做必须显示的传递CancellationToken
,因此若是想取消全部调用的异步函数,那么应该将CancllationToken
传递给此调用链中的全部函数
❌下面例子在调用ReadAsync时并无传递CancellationToken
,因此不能有效的取消
public async Task<string> DoAsyncThing(CancellationToken cancellationToken = default) { byte[] buffer = new byte[1024]; // 使用FileOptions.Asynchronous参数指定异步通讯 using(Stream stream = new FileStream( @"d:\资料\Blogs\Task\TaskTest", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, 1024, options:FileOptions.Asynchronous)) { // 因为并无将cancellationToken传递给ReadAsync,因此没法进行有效的取消 int read = await stream.ReadAsync(buffer, 0, buffer.Length); return Encoding.UTF8.GetString(buffer, 0, read); } }
☑️因此应该将CancellationToken
传递给ReadAsync(),以达到有效的取消
public async Task<string> DoAsyncThing(CancellationToken cancellationToken = default) { byte[] buffer = new byte[1024]; // 使用FileOptions.Asynchronous参数指定异步通讯 using(Stream stream = new FileStream( @"d:\资料\Blogs\Task\TaskTest", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, 1024, options:FileOptions.Asynchronous)) { // 因为并无将cancellationToken传递给ReadAsync,因此没法进行有效的取消 int read = await stream.ReadAsync(buffer, 0, buffer.Length,cancellationToken); return Encoding.UTF8.GetString(buffer, 0, read); } }
🔔在使用异步IO时,应该将options参数设置为FileOptions.Asynchronous,不然会产生额外的线程浪费,详细信息请参考CLR中28.12节
在异步编程时出现了一种模式cancelling an uncancellable operation,这个用于取消像CancellationTokenRegistry
和timer
这样的东西,一般是在被取消或超时时建立另一个线程进行操做,而后使用Task.WhenAny进行判断是完成仍是被取消了
:x: 下面例子使用了Task.delay(-1,token)建立在触发CancellationToken时触发的任务,可是若是CancellationToken不触发,则没有办法释放CancellationTokenRegistry,就有可能会致使内存泄露
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken) { // 没有方法释放cancellationToken注册 var delayTask = Task.Delay(-1, cancellationToken); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消异步操做 throw new OperationCanceledException(); } return await task; }
:ballot_box_with_check:因此应该改为下面这样,在任务一完成,就释放CancellationTokenRegistry
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); using (cancellationToken.Register(state => { // 这样将在其中一个任务触发时当即释放CancellationTokenRegistry ((TaskCompletionSource<object>)state).TrySetResult(null); }, tcs)) { var resultTask = await Task.WhenAny(task, tcs.Task); if (resultTask == tcs.Task) { // 取消异步操做 throw new OperationCanceledException(cancellationToken); } return await task; } }
:x:下面这个例子即便在操做完成以后,也不会取消定时器,这也就是说最终会在计时器队列中产生大量的计时器,从而浪费性能
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout) { var delayTask = Task.Delay(timeout); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消异步操做 throw new OperationCanceledException(); } return await task; }
:ballot_box_with_check:应改为下面这样,这样将在任务完成以后,取消计时器的操做
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout) { using (var cts = new CancellationTokenSource()) { var delayTask = Task.Delay(timeout, cts.Token); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消异步操做 throw new OperationCanceledException(); } else { // 取消计时器任务 cts.Cancel(); } return await task; } }
当使用Stream和StreamWriter进行异步写入时,底层数据也有可能被缓冲,当数据被缓冲时,Stream和StreamWriter将使用同步的方式进行write/flush
,这将会致使线程阻塞,而且有可能致使线程池内线程不足(线程池饥饿)
❌下面例子因为没有调用FlushAsync(),因此最后是以同步方式进行write/flush的
public async static Task RunAsync() { using (var streamWriter = new StreamWriter(@"C:\资料\Blogs\Task")) { // 因为没有调用FlushAsync,因此最后是以同步方式进行write/flush的 await streamWriter.WriteAsync("Hello World"); } }
☑️因此应该改成下面这样,在Dispose以前调用FlushAsync()
public async static Task RunAsync() { using (var streamWriter = new StreamWriter(@"C:\资料\Blogs\Task")) { await streamWriter.WriteAsync("Hello World"); // 调用FlushAsync() 使其使用异步write/flush await streamWriter.FlushAsync(); } }
使用async/await 代替直接返回Task具备以上好处
using
)❌下面这个错误的例子是将Task直接返回给了调用者
public Task<int> RunAsync() { return Task.FromResult(1 + 1); }
☑️因此应该使用async/await来代替返回Task
public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
🔔使用async/await来代替返回Task时,还有性能上的考虑,虽然直接Task会更快,可是最终却改变了异步的行为,失去了异步状态机的一些好处
❌下面例子使用一个返回值为void的异步,将其传递给Timer进行,所以,若是其中任务抛出异常,则整个进程将退出
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public async void Heartbeat(object state) { await httpClient.GetAsync("http://mybackend/api/ping"); } }
❌下面例子将阻止计时器回调,这有可能致使线程池中线程耗尽,这也是一个异步差于同步的例子
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public void Heartbeat(object state) { httpClient.GetAsync("http://mybackend/api/ping").GetAwaiter().GetResult(); } }
☑️下面例子是使用基于的异步的方法,并在定时器回调函数中丢弃该任务,而且若是此方法抛出异常,则也不会关闭进程,而是会触发TaskScheduler.UnobservedTaskException
事件
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public void Heartbeat(object state) { _ = DoAsyncPing(); } private async Task DoAsyncPing() { // 异步等待 await _client.GetAsync("http://mybackend/api/ping"); }
假若有BackgroudQueue
类中有一个接收回调函数的FireAndForget
方法,该方法在某个时候执行调用
❌下面这个错误例子将强制调用者要么阻塞要么使用async void异步方法
public class BackgroundQueue { public static void FireAndForget(Action action) { } }
static async Task Main(string[] args) { var httpClient = new HttpClient(); // 由于方法类型是Action,因此只能使用async void BackgroundQueue.FireAndForget(async () => { await httpClient.GetAsync("http://pinger/api/1"); }); }
☑️因此应该构建一个回调异步方法的重载
public class BackgroundQueue { public static void FireAndForget(Action action) { } public static void FireAndForget(Func<Task> action) { } }
缓存异步结果是一种很常见的作法,ConcurrentDictionary是一个很好的集合,而GetOrAdd也是一个很方便的方法,它用于尝试获取已经存在的项,若是没有则添加项.由于回调是同步的,因此很容易编写Task.Result
的代码,从而生成异步的结果值,可是这样很容易致使线程池饥饿
❌下面这个例子就有可能致使线程池饥饿,由于当若是没有缓存人员数据时,将阻塞请求线程
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, Person> _cache = new ConcurrentDictionary<int, Person>(); public PersonController(AppDbContext db) { _db = db; } public IActionResult Get(int id) { // 若是不存在缓存数据,则会进入堵塞状态 var person = _cache.GetOrAdd(id, (key) => db.People.FindAsync(key).Result); return Ok(person); } }
☑️能够改为缓存线程自己,而不是结果,这样将不会致使线程池饥饿
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, Task<Person>> _cache = new ConcurrentDictionary<int, Task<Person>>(); public PersonController(AppDbContext db) { _db = db; } public async Task<IActionResult> Get(int id) { // 由于缓存的是线程自己,因此没有进行堵塞,也就不会产生线程池饥饿 var person = await _cache.GetOrAdd(id, (key) => db.People.FindAsync(key)); return Ok(person); } }
🔔这种方法,在最后,GetOrAdd()可能并行屡次来执行缓存回调,这可能致使启动屡次昂贵的计算
☑️可使用async lazy
模式来取代屡次执行回调问题
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, AsyncLazy<Person>> _cache = new ConcurrentDictionary<int, AsyncLazy<Person>>(); public PersonController(AppDbContext db) { _db = db; } public async Task<IActionResult> Get(int id) { // 使用Lazy进行了延迟加载(使用时调用),解决了屡次执行回调问题 var person = await _cache.GetOrAdd(id, (key) => new AsyncLazy<Person>(() => db.People.FindAsync(key))); return Ok(person); } private class AsyncLazy<T> : Lazy<Task<T>> { public AsyncLazy(Func<Task<T>> valueFactory) : base(valueFactory) { } }
构造函数是同步,下面看看在构造函数中处理异步状况
下面是使用客户端API的例子,固然,在使用API以前须要异步进行链接
public interface IRemoteConnectionFactory { Task<IRemoteConnection> ConnectAsync(); } public interface IRemoteConnection { Task PublishAsync(string channel, string message); Task DisposeAsync(); }
❌下面例子使用Task.Result在构造函数中进行链接,这有可能致使线程池饥饿和死锁现象
public class Service : IService { private readonly IRemoteConnection _connection; public Service(IRemoteConnectionFactory connectionFactory) { _connection = connectionFactory.ConnectAsync().Result; } }
☑️正确的方式应该使用静态工厂模式进行异步链接
public class Service : IService { private readonly IRemoteConnection _connection; private Service(IRemoteConnection connection) { _connection = connection; } public static async Task<Service> CreateAsync(IRemoteConnectionFactory connectionFactory) { return new Service(await connectionFactory.ConnectAsync()); } }