在前面的《基于任务的异步编程模式(TAP)》文章中讲述了.net 4.5框架下的异步操做自我实现方式,实际上,在.net 4.5中部分类已实现了异步封装。如在.net 4.5中,Stream类加入了Async方法,因此基于流的通讯方式均可以实现异步操做。html
public static void TaskFromIOStreamAsync(string fileName) { int chunkSize = 4096; byte[] buffer = new byte[chunkSize]; FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true); Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length); task.ContinueWith((readTask) => { int amountRead = readTask.Result; //必须在ContinueWith中释放文件流 fileStream.Dispose(); Console.WriteLine($"Async(Simple) Read {amountRead} bytes"); }); }
上述代码中,异步读取数据只读取了一次,完成读取后就将执行权交还主线程了。但在真实场景中,须要从流中读取屡次才能得到所有的数据(如文件数据大于给定缓冲区大小,或处理来自网络流的数据(数据还没所有到达机器))。所以,为了完成异步读取操做,须要连续从流中读取数据,直到获取所需所有数据。编程
上述问题致使须要两级Task来处理。外层的Task用于所有的读取工做,供调用程序使用。内层的Task用于每次的读取操做。缓存
第一次异步读取会返回一个Task。若是直接返回调用Wait或者ContinueWith的地方,会在第一次读取结束后继续向下执行。其实是但愿调用者在完成所有读取操做后才执行。所以,不能把第一个Task发布会给调用者,须要一个“伪Task”在完成所有读取操做后再返回。网络
上述问题须要使用到TaskCompletionSource<T>类解决,该类能够生成一个用于返回的“伪Task”。当异步读取操做所有完成后,调用其对象的TrySetResult,让Wait或ContinueWith的调用者继续执行。框架
public static Task<long> AsynchronousRead(string fileName) { int chunkSize = 4096; byte[] buffer = new byte[chunkSize]; //建立一个返回的伪Task对象 TaskCompletionSource<long> tcs = new TaskCompletionSource<long>(); MemoryStream fileContents = new MemoryStream();//用于保存读取的内容 FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true); fileContents.Capacity += chunkSize;//指定缓冲区大小。好像Capacity会自动增加,设置与否不要紧,后续写入多少数据,就增加多少 Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length); task.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs)); //在ContinueWith中循环读取,读取完成后,再返回tcs的Task return tcs.Task; } /// <summary> /// 继续读取数据 /// </summary> /// <param name="task">读取数据的线程</param> /// <param name="fileStream">文件流</param> /// <param name="fileContents">文件存放位置</param> /// <param name="buffer">读取数据缓存</param> /// <param name="tcs">伪Task对象</param> private static void ContinueRead(Task<int> task, FileStream fileStream, MemoryStream fileContents, byte[] buffer, TaskCompletionSource<long> tcs) { if (task.IsCompleted) { int bytesRead = task.Result; fileContents.Write(buffer, 0, bytesRead);//写入内存区域。彷佛Capacity会自动增加 if (bytesRead > 0) { //虽然看似是一个新的任务,可是使用了ContinueWith,因此使用的是同一个线程。 //没有读取完,开启另外一个异步继续读取 Task<int> newTask = fileStream.ReadAsync(buffer, 0, buffer.Length); //此处作了一个循环 newTask.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs)); } else { //已经所有读取完,因此须要返回数据 tcs.TrySetResult(fileContents.Length); fileStream.Dispose(); fileContents.Dispose();//应该是在使用了数据以后才释放数据缓冲区的数据 } } }
.NET Framework中的旧版异步方法都带有“Begin-”和“End-”前缀。这些方法仍然有效,为了接口的一致性,它们能够被封装到Task中。异步
FromAsyn方法把流的BeginRead和EndRead方法做为参数,再加上存放数据的缓冲区。BeginRead和EndRead方法会执行,并在EndRead完成后调用Continuation Task,把控制权交回主代码。上述例子会关闭流并返回转换的数据async
const int ReadSize = 256;//16k /// <summary> /// 从文件中获取字符串 /// </summary> /// <param name="path">文件路径</param> /// <returns>字符串</returns> public static Task<string> GetStringFromFile(string path) { FileInfo file = new FileInfo(path); byte[] buffer = new byte[file.Length];//存放数据的缓冲区 FileStream fileStream = new FileStream( path, FileMode.Open, FileAccess.Read, FileShare.None, buffer.Length, FileOptions.DeleteOnClose | FileOptions.Asynchronous); Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, ReadSize, null);//此参数为BeginRead须要的参数 TaskCompletionSource<string> tcs = new TaskCompletionSource<string>(); task.ContinueWith(taskRead => OnReadBuffer(taskRead, fileStream, buffer, 0, tcs)); return tcs.Task; } /// <summary> /// 读取数据 /// </summary> /// <param name="taskRead">读取任务</param> /// <param name="fileStream">文件流</param> /// <param name="buffer">读取数据存放位置</param> /// <param name="offset">读取偏移量</param> /// <param name="tcs">伪Task</param> private static void OnReadBuffer(Task<int> taskRead, FileStream fileStream, byte[] buffer, int offset, TaskCompletionSource<string> tcs) { int readLength = taskRead.Result; if (readLength > 0) { int newOffset = offset + readLength; Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, newOffset, Math.Min(buffer.Length - newOffset, ReadSize), null); task.ContinueWith(callBackTask => OnReadBuffer(callBackTask, fileStream, buffer, newOffset, tcs)); } else { tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer, 0, buffer.Length)); fileStream.Dispose(); } }
下面的示例中,使用了async和await关键字实现异步读取一个文件的同时进行压缩并写入另外一个文件。全部位于await关键字以前的操做都运行于调用者线程,从await开始的操做都是在Continuation Task中运行。但有没法使用这两个关键字的场合:①Task的结束时机不明确时;②必须用到多级Task和TaskCompletionSource时异步编程
/// <summary> /// 同步方法的压缩 /// </summary> /// <param name="lstFiles">文件清单</param> public static void SyncCompress(IEnumerable<string> lstFiles) { byte[] buffer = new byte[16384]; foreach(string file in lstFiles) { using (FileStream inputStream = File.OpenRead(file)) { using (FileStream outputStream = File.OpenWrite(file + ".compressed")) { using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress)) { int read = 0; while((read=inputStream.Read(buffer,0,buffer.Length))>0) { compressStream.Write(buffer, 0,read); } } } } } } /// <summary> /// 异步方法的文件压缩 /// </summary> /// <param name="lstFiles">须要压缩的文件</param> /// <returns></returns> public static async Task AsyncCompress(IEnumerable<string> lstFiles) { byte[] buffer = new byte[16384]; foreach(string file in lstFiles) { using (FileStream inputStream = File.OpenRead(file)) { using (FileStream outputStream = File.OpenWrite(file + ".compressed")) { using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress)) { int read = 0; while ((read = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0) { await compressStream.WriteAsync(buffer, 0, read); } } } } } }