static void Main(string[] args) { Thread t = new Thread(PrintNumbers); t.Start();//线程开始执行 PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } }
class Program { static void Main(string[] args) { Thread t = new Thread(PrintNumbersWithDelay); t.Start(); PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2));//暂停2S Console.WriteLine(i); } } }
当程序运行时,会建立一个线程,该线程会执行PrintNumbersWithDelay方法中的代码。而后会当即执行PrintNumbers方法。关键之处在于在PrintNumbersWithDelay方法中加入了Thread.Sleep方法调用。这将致使线程执行该代码时,在打印任何数字以前会等待指定的时间(本例中是2秒钟),当线程处于休眠状态时,它会占用尽量少的CPU时间。结果咱们4·会发现一般后运行的PrintNumbers方法中的代码会比独立线程中的PrintNumbersWithDelay方法中的代码先执行。html
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); t.Join(); Console.WriteLine("Thread completed"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
当程序运行时,启动了一个耗时较长的线程来打印数字,打印每一个数字前要等待两秒。但咱们在主程序中调用了t.Join方法,该方法容许咱们等待直到线程t完成。当线程t完成 "时,主程序会继续运行。借助该技术能够实如今两个线程间同步执行步骤。第一个线程会等待另外一个线程完成后再继续执行。第一个线程等待时是处于阻塞状态(正如暂停线程中调用 Thread.Sleep方法同样),程序员
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
当主程序和单独的数字打印线程运行时,咱们等待6秒后对线程调用了t.Abort方法。这给线程注入了ThreadAbortException方法,致使线程被终结。这很是危险,由于该异常能够在任什么时候刻发生并可能完全摧毁应用程序。另外,使用该技术也不必定总能终止线程。目-标线程能够经过处理该异常并调用Thread.ResetAbort方法来拒绝被终止。所以并不推荐使用,Abort方法来关闭线程。可优先使用一些其余方法,好比提供一个CancellationToken方法来,取消线程的执行。web
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithStatus); Thread t2 = new Thread(DoNothing); Console.WriteLine(t.ThreadState.ToString()); t2.Start(); t.Start(); for (int i = 1; i < 30; i++) { Console.WriteLine(t.ThreadState.ToString()); } Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); Console.WriteLine(t.ThreadState.ToString()); Console.WriteLine(t2.ThreadState.ToString()); Console.ReadKey(); } static void DoNothing() { Thread.Sleep(TimeSpan.FromSeconds(2)); } static void PrintNumbersWithStatus() { Console.WriteLine("Starting..."); Console.WriteLine(Thread.CurrentThread.ThreadState.ToString()); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
当主程序启动时定义了两个不一样的线程。一个将被终止,另外一个则会成功完成运行。线,.程状态位于Thread对象的ThreadState属性中。ThreadState属性是一个C#枚举对象。刚开始线程状态为ThreadState.Unstarted,而后咱们启动线程,并估计在一个周期为30次迭代的,区间中,线程状态会从ThreadState.Running变为ThreadState. WaitSleepJoin。算法
请注意始终能够经过Thread.CurrentThread静态属性得到当前Thread对象。
若是实际状况与以上不符,请增长迭代次数。终止第一个线程后,会看到如今该线程状态为ThreadState.Aborted,程序也有可能会打印出ThreadState.AbortRequested状态。这充分说明了同步两个线程的复杂性。请记住不要在程序中使用线程终止。我在这里使用它只是为 ,了展现相应的线程状态。数据库
最后能够看到第二个线程t2成功完成而且状态为ThreadState.Stopped。另外还有一些其,他的线程状态,可是要么已经被弃用,要么没有咱们实验过的几种状态有用。编程
class Program { static void Main(string[] args) { Console.WriteLine("Current thread priority: {0}", Thread.CurrentThread.Priority); Console.WriteLine("Running on all cores available"); RunThreads(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Running on a single core"); Process.GetCurrentProcess().ProcessorAffinity = new IntPtr(1); RunThreads(); } static void RunThreads() { var sample = new ThreadSample(); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; var threadTwo = new Thread(sample.CountNumbers); threadTwo.Name = "ThreadTwo"; threadOne.Priority = ThreadPriority.Highest; threadTwo.Priority = ThreadPriority.Lowest; threadOne.Start(); threadTwo.Start(); Thread.Sleep(TimeSpan.FromSeconds(2)); sample.Stop(); Console.ReadKey(); } class ThreadSample { private bool _isStopped = false; public void Stop() { _isStopped = true; } public void CountNumbers() { long counter = 0; while (!_isStopped) { counter++; } Console.WriteLine("{0} with {1,11} priority " + "has a count = {2,13}", Thread.CurrentThread.Name, Thread.CurrentThread.Priority, counter.ToString("N0")); } } }
当主程序启动时定义了两个不一样的线程。第一个线程优先级为ThreadPriority.Highest,即具备最高优先级。第二个线程优先级为ThreadPriority.Lowest,即具备最低优先级。咱们先, ,打印出主线程的优先级值,而后在全部可用的CPU核心上启动这两个线程。若是拥有一个1以上的计算核心,将在两秒钟内获得初步结果。最高优先级的线程一般会计算更多的迭代.可是两个值应该很接近。然而,若是有其余程序占用了全部的CPU核心运行负载,结果则会大相径庭。数组
为了模拟该情形,咱们设置了ProcessorAffinity选项,让操做系统将全部的线程运,行在单个CPU核心(第一个核心)上。如今结果彻底不一样,而且计算耗时将超过2秒钟。 .这是由于CPU核心大部分时间在运行高优先级的线程,只留给剩下的线程不多的时间来,运行。浏览器
请注意这是操做系统使用线程优先级的一个演示。一般你无需使用这种行为编写程序。安全
class Program { static void Main(string[] args) { var sampleForeground = new ThreadSample(10); var sampleBackground = new ThreadSample(20); var threadOne = new Thread(sampleForeground.CountNumbers); threadOne.Name = "ForegroundThread"; var threadTwo = new Thread(sampleBackground.CountNumbers); threadTwo.Name = "BackgroundThread"; threadTwo.IsBackground = true; threadOne.Start(); threadTwo.Start(); Console.ReadKey(); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 0; i < _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
当主程序启动时定义了两个不一样的线程。默认状况下,显式建立的线程是前台线程。经过手动的设置threadTwo对象的IsBackground属性为ture来建立一个后台线程。经过配置来实现第一个线程会比第二个线程先完成。而后运行程序。服务器
第一个线程完成后,程序结束而且后台线程被终结。这是前台线程与后台线程的主要区,别:进程会等待全部的前台线程完成后再结束工做,可是若是只剩下后台线程,则会直接结束工做。
一个重要注意事项是若是程序定义了一个不会完成的前台线程,主程序并不会正常结束。
class Program { static void Main(string[] args) { var sample = new ThreadSample(10); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; threadOne.Start(); threadOne.Join(); Console.WriteLine("--------------------------"); var threadTwo = new Thread(Count); threadTwo.Name = "ThreadTwo"; threadTwo.Start(8); threadTwo.Join(); Console.WriteLine("--------------------------"); var threadThree = new Thread(() => CountNumbers(12)); threadThree.Name = "ThreadThree"; threadThree.Start(); threadThree.Join(); Console.WriteLine("--------------------------"); int i = 10; var threadFour = new Thread(() => PrintNumber(i)); i = 20; var threadFive = new Thread(() => PrintNumber(i)); threadFour.Start(); threadFive.Start(); } static void Count(object iterations) { CountNumbers((int)iterations); } static void CountNumbers(int iterations) { for (int i = 1; i <= iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } static void PrintNumber(int number) { Console.WriteLine(number); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 1; i <= _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
当主程序启动时,首先建立了ThreadSample类的一个对象,并提供了一个迭代次数。而后使用该对象的CountNumbers方法启动线程。该方法运行在另外一个线程中,可是使用数 ,字10,该数字是经过ThreadSample对象的构造函数传入的。所以,咱们只是使用相同的间接方式将该迭代次数传递给另外一个线程。
另外一种传递数据的方式是使用Thread.Start方法。该方法会接收一个对象,并将该对象,传递给线程。为了应用该方法,在线程中启动的方法必须接受object类型的单个参数。在建立threadTwo线程时演示了该方式。咱们将8做为一个对象传递给了Count方法,而后 Count方法被转换为整型。
接下来的方式是使用lambda表达式。lambda表达式定义了一个不属于任何类的方法。咱们建立了一个方法,该方法使用须要的参数调用了另外一个方法,并在另外一个线程中运行该 ,方法。当启动threadThree线程时,打印出了12个数字,这正是咱们经过lambda表达式传递,的数字。
使用lambda表达式引用另外一个C#对象的方式被称为闭包。当在lambda表达式中使用任何局部变量时, C#会生成一个类,并将该变量做为该类的一个属性。因此实际上该方式与 threadOne线程中使用的同样,可是咱们无须定义该类, C#编译器会自动帮咱们实现。
这可能会致使几个问题。例如,若是在多个lambda表达式中使用相同的变量,它们会共享该变量值。在前一个例子中演示了这种状况。当启动threadFour和threadFive线程时,.它们都会打印20,由于在这两个线程启动以前变量被修改成20。
class Program { static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}",c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterWithLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { public int Count { get; private set; } public override void Increment() { Count++; } public override void Decrement() { Count--; } } class CounterWithLock : CounterBase { private readonly object _syncRoot = new Object(); public int Count { get; private set; } public override void Increment() { lock (_syncRoot) { Count++; } } public override void Decrement() { lock (_syncRoot) { Count--; } } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
当主程序启动时,建立了一个Counter类的对象。该类定义了一个能够递增和递减的简,单的计数器。而后咱们启动了三个线程。这三个线程共享同一个counter实例,在一个周期中进行一次递增和一次递减。这将致使不肯定的结果。若是运行程序屡次,则会打印出多个不一样的计数器值。结果多是0,但大多数状况下则不是0.
这是由于Counter类并非线程安全的。当多个线程同时访问counter对象时,第一个线程获得的counter值10并增长为11,而后第二个线程获得的值是11并增长为12,第一个线程获得counter值12,可是递减操做发生前,第二个线程获得的counter值也是12,而后 , 第一个线程将12递减为11并保存回counter中,同时第二个线程进行了一样的操做。结果,咱们进行了两次递增操做可是只有一次递减操做,这显然不对。这种情形被称为竞争条件, (race condition),竞争条件是多线程环境中很是常见的致使错误的缘由。
为了确保不会发生以上情形,必须保证当有线程操做counter对象时,全部其余线程必须等待直到当前线程完成操做。咱们能够使用lock关键字来实现这种行为。若是锁定了一个对象,须要访问该对象的全部其余线程则会处于阻塞状态,并等待直到该对象解除锁定。这,可能会致使严重的性能问题,在第2章中将会进一步学习该知识点。
class Program { static void Main(string[] args) { object lock1 = new object(); object lock2 = new object(); new Thread(() => LockTooMuch(lock1, lock2)).Start(); lock (lock2) { Thread.Sleep(1000); Console.WriteLine("Monitor.TryEnter allows not to get stuck, returning false after a specified timeout is elapsed"); if (Monitor.TryEnter(lock1, TimeSpan.FromSeconds(5))) { Console.WriteLine("Acquired a protected resource succesfully"); } else { Console.WriteLine("Timeout acquiring a resource!"); } } new Thread(() => LockTooMuch(lock1, lock2)).Start(); Console.WriteLine("----------------------------------"); lock (lock2) { Console.WriteLine("This will be a deadlock!"); Thread.Sleep(1000); lock (lock1) { Console.WriteLine("Acquired a protected resource succesfully"); } } Console.ReadKey(); } static void LockTooMuch(object lock1, object lock2) { lock (lock1) { Thread.Sleep(1000); lock (lock2); } } }
先看看LockTooMuch方法。在该方法中咱们先锁定了第一个对象,等待一秒后锁定了 ,第二个对象。而后在另外一个线程中启动该方法。最后尝试在主线程中前后锁定第二个和第一个对象。
若是像该示例的第二部分同样使用lock关键字,将会形成死锁。第一个线程保持对, lock1对象的锁定,等待直到lock2对象被释放。主线程保持对lock2对象的锁定并等待直到。lock1对象被释放,但lock1对象永远不会被释放。
实际上lock关键字是Monitor类用例的一个语法糖。若是咱们分解使用了lock关键字的代码,将会看到它以下面代码片断所示:
bool acquiredLock = false; try { Monitor.Enter(lockObject, ref acquiredLock); } finally { if (acquiredLock) { Monitor.Exit(lockObject); } }
所以,咱们能够直接使用Monitor类。其拥有TryEnter方法,该方法接受一个超时, "参数。若是在咱们可以获取被lock保护的资源以前,超时参数过时,则该方法会返回 false.
class Program { static void Main(string[] args) { var t = new Thread(FaultyThread); t.Start(); t.Join(); try { t = new Thread(BadFaultyThread); t.Start(); } catch (Exception ex) { Console.WriteLine("We won't get here!"); } } static void BadFaultyThread() { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(2)); throw new Exception("Boom!"); } static void FaultyThread() { try { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(1)); throw new Exception("Boom!"); } catch (Exception ex) { Console.WriteLine("Exception handled: {0}", ex.Message); } } }
当主程序启动时,定义了两个将会抛出异常的线程。其中一个对异常进行了处理,另外一个则没有。能够看到第二个异常没有被包裹启动线程的try/catch代码块捕获到。因此若是直接使用线程,通常来讲不要在线程中抛出异常,而是在线程代码中使用try/catch代码块。
在较老版本的.NET Framework中(1.0和1.1),该行为是不同的,未被捕获的异常不会强制应用程序关闭。能够经过添加一个包含如下代码片断的应用程序配置文件(好比app config)来使用该策略。
<configuration> <runtime> <legacyUnhandledExceptionPolicy enable="1" /> </runtime> </configuration>
正如前面所看到的同样,多个线程同时使用共享对象会形成不少问题。同步这些线程使得对共享对象的操做可以以正确的顺序执行是很是重要的。在使用C#中的lock关键字,咱们遇到了一个叫做竞争条件的问题。致使这问题的缘由是多线程的执行并无正确同步。当一个线程执行递增和递减操做时,其余线程须要依次等待。这种常见问题一般被称为线程同步。
有多种方式来实现线程同步。首先,若是无须共享对象,那么就无须进行线程同步。令,人惊奇的是大多数时候能够经过从新设计程序来除移共享状态,从而去掉复杂的同步构造。请尽量避免在多个线程间使用单一对象。
若是必须使用共享的状态,第二种方式是只使用原子操做。这意味着一个操做只占用一个量子的时间,一次就能够完成。因此只有当前操做完成后,其余线程才能执行其余操做。所以,你无须实现其余线程等待当前操做完成,这就避免了使用锁,也排除了死锁的状况。
若是上面的方式不可行,而且程序的逻辑更加复杂,那么咱们不得不使用不一样的方式来,协调线程。方式之一是将等待的线程置于阻塞状态。当线程处于阻塞状态时,只会占用尽量少的CPU时间。然而,这意味着将引入至少一次所谓的上下文切换( context switch),上下文切换是指操做系统的线程调度器。该调度器会保存等待的线程的状态,并切换到另外一个.线程,依次恢复等待的线程的状态。这须要消耗至关多的资源。然而,若是线程要被挂起很,长时间,那么这样作是值得的。这种方式又被称为内核模式(kernel-mode),由于只有操做系,统的内核才能阻止线程使用CPU时间。
万一线程只须要等待一小段时间,最好只是简单的等待,而不用将线程切换到阻塞状,态。虽然线程等待时会浪费CPU时间,但咱们节省了上下文切换耗费的CPU时间。该方式又被称为用户模式(user-mode),该方式很是轻量,速度很快,但若是线程须要等待较长时间则会浪费大量的CPU时间。
为了利用好这两种方式,能够使用混合模式(hybrid),混合模式先尝试使用用户模式等,待,若是线程等待了足够长的时间,则会切换到阻塞状态以节省CPU资源。
本节将展现如何对对象执行基本的原子操做,从而不用阻塞线程就可避免竞争条件。
internal class Program { private static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterNoLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { _count++; } public override void Decrement() { _count--; } } class CounterNoLock : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { Interlocked.Increment(ref _count); } public override void Decrement() { Interlocked.Decrement(ref _count); } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
当程序运行时,会建立三个线程来运行TestCounter方法中的代码。该方法对一个对象,按序执行了递增或递减操做。起初的Counter对象不是线程安全的,咱们会遇到竞争条件。因此第一个例子中计数器的结果值是不肯定的。咱们可能会获得数字0,然而若是运行程序屡次,你将最终获得一些不正确的非零结果。在第1部分中,咱们经过锁定对象解决了这个问题。在一个线程获取旧的计数器值并计,算后赋予新的值以前,其余线程都被阻塞了。然而,若是咱们采用上述方式执行该操做中途不能中止。而借助于Interlocked类,咱们无需锁定任何对象便可获取到正确的结果。Interlocked提供了Increment, Decrement和Add等基本数学操做的原子方法,从而帮助咱们,在编写Counter类时无需使用锁。
本节将描述如何使用Mutex类来同步两个单独的程序。Mutex是一种原始的同步方式,其只对一个线程授予对共享资源的独占访问。
class Program { static void Main(string[] args) { const string MutexName = "CSharpThreadingCookbook"; using (var m = new Mutex(false, MutexName)) { if (!m.WaitOne(TimeSpan.FromSeconds(5), false)) { Console.WriteLine("Second instance is running!"); } else { Console.WriteLine("Running!"); Console.ReadLine(); m.ReleaseMutex(); } } } }
当主程序启动时,定义了一个指定名称的互斥量,设置initialOwner标志为false。这意.味着若是互斥量已经被建立,则容许程序获取该互斥量。若是没有获取到互斥量,程序则简单地显示Running,等待直到按下了任何键,而后释放该互斥量并退出。
若是再运行一样一个程序,则会在5秒钟内尝试获取互斥量。若是此时在第一个程序中,按下了任何键,第二个程序则会开始执行。然而,若是保持等待5秒钟,第二个程序将没法,获取到该瓦斥量。
本节将展现SemaphoreSlim类是如何做为Semaphore类的轻量级版本的。该类限制了同时访问同一个资源的线程数量。
class Program { static void Main(string[] args) { for (int i = 1; i <= 6; i++) { string threadName = "Thread " + i; int secondsToWait = 2 + 2 * i; var t = new Thread(() => AccessDatabase(threadName, secondsToWait)); t.Start(); } } static SemaphoreSlim _semaphore = new SemaphoreSlim(4); static void AccessDatabase(string name, int seconds) { Console.WriteLine("{0} waits to access a database", name); _semaphore.Wait(); Console.WriteLine("{0} was granted an access to a database", name); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} is completed", name); _semaphore.Release(); } }
当主程序启动时,建立了SemaphoreSlim的一个实例,并在其构造函数中指定容许的并发线程数量。而后启动了6个不一样名称和不一样初始运行时间的线程。
每一个线程都尝试获取数据库的访问,可是咱们借助于信号系统限制了访问数据库的并发,数为4个线程。当有4个线程获取了数据库的访问后,其余两个线程须要等待,直到以前线,程中的某一个完成工做并调用semaphore.Release方法来发出信号。
这里咱们使用了混合模式,其容许咱们在等待时间很短的状况下无需使用上下文切换。然而,有一个叫做Semaphore的SemaphoreSlim类的老版本。该版本使用纯粹的内核时间 ( kernel-time)方式。通常不必使用它,除非是很是重要的场景。咱们能够建立一个具名的semaphore,就像一个具名的mutex同样,从而在不一样的程序中同步线程。SemaphoreSlim并不使用Windows内核信号量,并且也不支持进程间同步。因此在跨程序同步的场景下能够使用Semaphore.
本示例借助于AutoResetEvent类来从一个线程向另外一个线程发送通知。AutoResetEvent类能够通知等待的线程有某事件发生。
class Program { static void Main(string[] args) { var t = new Thread(() => Process(10)); t.Start(); Console.WriteLine("Waiting for another thread to complete work"); _workerEvent.WaitOne(); Console.WriteLine("First operation is completed!"); Console.WriteLine("Performing an operation on a main thread"); Thread.Sleep(TimeSpan.FromSeconds(5)); _mainEvent.Set(); Console.WriteLine("Now running the second operation on a second thread"); _workerEvent.WaitOne(); Console.WriteLine("Second operation is completed!"); Console.ReadKey(); } private static AutoResetEvent _workerEvent = new AutoResetEvent(false); private static AutoResetEvent _mainEvent = new AutoResetEvent(false); static void Process(int seconds) { Console.WriteLine("Starting a long running work..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); Console.WriteLine("Waiting for a main thread to complete its work"); _mainEvent.WaitOne(); Console.WriteLine("Starting second operation..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); } }
当主程序启动时,定义了两个AutoResetEvent实例。其中一个是从子线程向主线程发信号,另外一个实例是从主线程向子线程发信号。咱们向AutoResetEvent构造方法传人false,定义了这两个实例的初始状态为unsignaled。这意味着任何线程调用这两个对象中的任何一个的WaitOne方法将会被阻塞,直到咱们调用了Set方法。若是初始事件状态为true,那么 AutoResetEvent实例的状态为signaled,若是线程调用WaitOne方法则会被当即处理。而后事件状态自动变为unsignaled,因此须要再对该实例调用一次Set方法,以便让其余的线程对,该实例调用WaitOne方法从而继续执行。
而后咱们建立了第二个线程,其会执行第一个操做10秒钟,而后等待从第二个线程发,出的信号。该信号意味着第一个操做已经完成。如今第二个线程在等待主线程的信号。咱们对主线程作了一些附加工做,并经过调用mainEvent.Set方法发送了一个信号。而后等待从第二个线程发出的另外一个信号。
AutoResetEvent类采用的是内核时间模式,因此等待时间不能太长。使用ManualResetEventslim类更好,由于它使用的是混合模式。
本节将描述如何使用ManualResetEventSlim类来在线程间以更灵活的方式传递信号。
class Program { static void Main(string[] args) { var t1 = new Thread(() => TravelThroughGates("Thread 1", 5)); var t2 = new Thread(() => TravelThroughGates("Thread 2", 6)); var t3 = new Thread(() => TravelThroughGates("Thread 3", 12)); t1.Start(); t2.Start(); t3.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); Console.WriteLine("The gates are now open!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); _mainEvent.Reset(); Console.WriteLine("The gates have been closed!"); Thread.Sleep(TimeSpan.FromSeconds(10)); Console.WriteLine("The gates are now open for the second time!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("The gates have been closed!"); _mainEvent.Reset(); Console.ReadKey(); } static void TravelThroughGates(string threadName, int seconds) { Console.WriteLine("{0} falls to sleep", threadName); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} waits for the gates to open!", threadName); _mainEvent.Wait(); Console.WriteLine("{0} enters the gates!", threadName); } static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false); }
当主程序启动时,首先建立了ManualResetEventSlim类的一个实例。而后启动了三个线程,等待事件信号通知它们继续执行。
ManualResetEvnetSlim的整个工做方式有点像人群经过大门。而AutoResetEvent事件像一个旋转门,一次只容许一人经过。ManualResetEventSlim是ManualResetEvent的混合版本,一直保持大门敞开直到手动调用Reset方法。当调用mainEvent.Set时,至关于打开了大门从而容许准备好的线程接收信号并继续工做。然而线程3还处于睡眠 "状态,没有遇上时间。当调用mainEvent.Reset至关于关闭了大门。最后一个线程已经准备好执行,可是不得不等待下一个信号,即要等待好几秒钟。
本节将描述如何使用CountdownEvent信号类来等待直到必定数量的操做完成。
class Program { static void Main(string[] args) { Console.WriteLine("Starting two operations"); var t1 = new Thread(() => PerformOperation("Operation 1 is completed", 4)); var t2 = new Thread(() => PerformOperation("Operation 2 is completed", 8)); t1.Start(); t2.Start(); _countdown.Wait(); Console.WriteLine("Both operations have been completed."); _countdown.Dispose(); Console.ReadKey(); } static CountdownEvent _countdown = new CountdownEvent(2); static void PerformOperation(string message, int seconds) { Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine(message); _countdown.Signal(); } }
当主程序启动时,建立了一个CountdownEvent实例,在其构造函数中指定了当两个操,做完成时会发出信号。而后咱们启动了两个线程,当它们执行完成后会发出信号。一旦第二个线程完成,主线程会从等待CountdownEvent的状态中返回并继续执行。针对须要等待多,个异步操做完成的情形,使用该方式是很是便利的。
然而这有一个重大的缺点。若是调用countdown.Signal()没达到指定的次数,那么-countdown. Wait()将一直等待。请确保使用CountdownEvent时,全部线程完成后都要调用,Signal方法。
本节将展现另外一种有意思的同步方式,被称为Barrier, Barrier类用于组织多个线程及时, 在某个时刻碰面。其提供了一个回调函数,每次线程调用了SignalAndWait方法后该回调函数会被执行。
class Program { static void Main(string[] args) { var t1 = new Thread(() => PlayMusic("the guitarist", "play an amazing solo", 5)); var t2 = new Thread(() => PlayMusic("the singer", "sing his song", 2)); t1.Start(); t2.Start(); Console.ReadKey(); } static Barrier _barrier = new Barrier(2,b => Console.WriteLine("End of phase {0}", b.CurrentPhaseNumber + 1)); static void PlayMusic(string name, string message, int seconds) { for (int i = 1; i < 3; i++) { Console.WriteLine("----------------------------------------------"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} starts to {1}", name, message); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} finishes to {1}", name, message); _barrier.SignalAndWait(); } } }
咱们建立了Barrier类,指定了咱们想要同步两个线程。在两个线程中的任何一个调用了-barrier.SignalAndWait方法后,会执行一个回调函数来打印出阶段。
每一个线程将向Barrier发送两次信号,因此会有两个阶段。每次这两个线程调用Signal AndWait方法时, Barrier将执行回调函数。这在多线程迭代运算中很是有用,能够在每一个迭代,结束前执行一些计算。当最后一个线程调用SignalAndWait方法时能够在迭代结束时进行交互。
本节将描述如何使用ReaderWriterLockSlim来建立一个线程安全的机制,在多线程中对,一个集合进行读写操做。ReaderWriterLockSlim表明了一个管理资源访问的锁,容许多个线程同时读取,以及独占写。
class Program { static void Main(string[] args) { new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(() => Write("Thread 1")){ IsBackground = true }.Start(); new Thread(() => Write("Thread 2")){ IsBackground = true }.Start(); Thread.Sleep(TimeSpan.FromSeconds(30)); Console.ReadKey(); } static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static Dictionary<int, int> _items = new Dictionary<int, int>(); static void Read() { Console.WriteLine("Reading contents of a dictionary"); while (true) { try { _rw.EnterReadLock(); foreach (var key in _items.Keys) { Thread.Sleep(TimeSpan.FromSeconds(0.1)); } } finally { _rw.ExitReadLock(); } } } static void Write(string threadName) { while (true) { try { int newKey = new Random().Next(250); _rw.EnterUpgradeableReadLock(); if (!_items.ContainsKey(newKey)) { try { _rw.EnterWriteLock(); _items[newKey] = 1; Console.WriteLine("New key {0} is added to a dictionary by a {1}", newKey, threadName); } finally { _rw.ExitWriteLock(); } } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } finally { _rw.ExitUpgradeableReadLock(); } } } }
当主程序启动时,同时运行了三个线程来从字典中读取数据,还有另外两个线程向该字典中写入数据。咱们使用ReaderWriterLockSlim类来实现线程安全,该类专为这样的场景而设计。
这里使用两种锁:读锁容许多线程读取数据,写锁在被释放前会阻塞了其余线程的所,有操做。获取读锁时还有一个有意思的场景,即从集合中读取数据时,根据当前数据而决,定是否获取一个写锁并修改该集合。一旦获得写锁,会阻止阅读者读取数据,从而浪费大量的时间,所以获取写锁后集合会处于阻塞状态。为了最小化阻塞浪费的时间,能够使用 EnterUpgradeableReadLock和ExitUpgradeableReadLock方法。先获取读锁后读取数据。若是发现必须修改底层集合,只需使用EnterWriteLock方法升级锁,而后快速执行一次写操做.最后使用ExitWriteLock释放写锁。
在本例中,咱们先生成一个随机数。而后获取读锁并检查该数是否存在于字典的键集合中。若是不存在,将读锁更新为写锁而后将该新键加入到字典中。始终使用tyr/finaly代码块来确保在捕获锁后必定会释放锁,这是一项好的实践。全部的线程都被建立为后台线程。
主线程在全部后台线程完成后会等待30秒。
本节将描述如何不使用内核模型的方式来使线程等待。另外,咱们介绍了SpinWait,它, ,是一个混合同步构造,被设计为使用用户模式等待一段时间,而后切换到内核模式以节省CPU时间。
class Program { static void Main(string[] args) { var t1 = new Thread(UserModeWait); var t2 = new Thread(HybridSpinWait); Console.WriteLine("Running user mode waiting"); t1.Start(); Thread.Sleep(20); _isCompleted = true; Thread.Sleep(TimeSpan.FromSeconds(1)); _isCompleted = false; Console.WriteLine("Running hybrid SpinWait construct waiting"); t2.Start(); Thread.Sleep(5); _isCompleted = true; Console.ReadKey(); } static volatile bool _isCompleted = false; static void UserModeWait() { while (!_isCompleted) { Console.Write("."); } Console.WriteLine(); Console.WriteLine("Waiting is complete"); } static void HybridSpinWait() { var w = new SpinWait(); while (!_isCompleted) { w.SpinOnce(); Console.WriteLine(w.NextSpinWillYield); } Console.WriteLine("Waiting is complete"); } }
当主程序启动时,定义了一个线程,将执行一个无止境的循环,直到20毫秒后主线程,设置_isCompleted变量为true,咱们能够试验运行该周期为20-30秒,经过Windows任务管理器测量CPU的负载状况。取决于CPU内核数量,任务管理器将显示一个显著的处理时间。
咱们使用volatile关键字来声明isCompleted静态字段。Volatile关键字指出一个字段可能会被同时执行的多个线程修改。声明为volatile的字段不会被编译器和处理器优化为只能被单个线程访问。这确保了该字段老是最新的值。
而后咱们使用了SpinWait版本,用于在每一个迭代打印一个特殊标志位来显示线程是否切换为阻塞状态。运行该线程5毫秒来查看结果。刚开始, SpinWait尝试使用用户模式,在9 个迭代后,开始切换线程为阻塞状态。若是尝试测量该版本的CPU负载,在Windows任务管理器将不会看到任何CPU的使用。
在以前的章节中咱们讨论了建立线程和线程协做的几种方式。如今考虑另外一种状况,即只花费极少的时间来完成建立不少异步操做。建立线程是昂贵的操做,因此为每一个短暂的异步操做建立线程会产生显著的开销。
为了解决该问题,有一个经常使用的方式叫作池( pooling),线程池能够成功地适应于任何须要大量短暂的开销大的资源的情形。咱们事先分配必定的资源,将这些资源放入到资源池。每次须要新的资源,只需从池中获取一个,而不用建立一个新的。当该资源再也不被使用,时,就将其返回到池中。
.NET线程池是该概念的一种实现。经过System.Threading.ThreadPool类型能够使用线程池。线程池是受,NET通用语言运行时( Common Language Runtime,简称CLR)管理的。这意味着每一个CLR都有一个线程池实例。ThreadPool类型拥有一个QueueUserWorkItem静态方法。该静态方法接受一个委托,表明用户自定义的一个异步操做。在该方法被调用后,委,托会进入到内部队列中。若是池中没有任何线程,将建立一个新的工做线程( worker thread) 并将队列中第一个委托放入到该工做线程中。若是想线程池中放入新的操做,当以前的全部操做完成后,极可能只需重用一个线程来执行这些新的操做。然而,若是放置新的操做过快,线程池将建立更多的线程来执行这些操,做。建立太多的线程是有限制的,在这种状况下新的操做将在队列中等待直到线程池中的工做线程有能力来执行它们。
当中止向线程池中放置新操做时,线程池最终会删除必定时间后过时的再也不使用的线程。这将释放全部那些再也不须要的系统资源。我想再次强调线程池的用途是执行运行时间短的操做。使用线程池能够减小并行度耗费,及节省操做系统资源。
咱们只使用较少的线程,可是以比日常更慢的速度来执行异步操做, ,使用必定数量的可用的工做线程批量处理这些操做。若是操做能快速地完成则比较适用线程!池,可是执行长时间运行的计算密集型操做则会下降性能。
另外一个重要事情是在ASPNET应用程序中使用线程池时要至关当心。ASPNET基础设施使用本身的线程池,若是在线程池中浪费全部的工做线程, Web服务器将不可以服务新的请求。在ASPNET中只推荐使用输入/输出密集型的异步操做,由于其使用了一个不一样的方式,叫作IO线程。
在本章中,咱们将学习使用线程池来执行异步操做。本章将覆盖将操做放入线程池的不,,同方式,以及如何取消一个操做,并防止其长时间运行。
保持线程中的操做都是短暂的是很是重要的。不要在线程池中放入长时间运行的操做,或者阻塞工做线程。这将致使全部工做线程变得繁忙,从而没法服务用户操做。这会致使性能问题和很是难以调试的错误。
请注意线程池中的工做线程都是后台线程。这意味着当全部的前台线程(包括主程序线程)完成后,全部的后台线程将中止工做。
本节将展现在线程池中如何异步的执行委托。另外,咱们将讨论一个叫作异步编程模型(Asynchronous Programming Model,简称APM)的方式,这是NET历史中第一个异步编程模式。
class Program { static void Main(string[] args) { int threadId = 0; RunOnThreadPool poolDelegate = Test; var t = new Thread(() => Test(out threadId)); t.Start(); t.Join(); Console.WriteLine("Thread id: {0}", threadId); IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); r.AsyncWaitHandle.WaitOne(); string result = poolDelegate.EndInvoke(out threadId, r); Console.WriteLine("Thread pool worker thread id: {0}", threadId); Console.WriteLine(result); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private delegate string RunOnThreadPool(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
当程序运行时,使用旧的方式建立了一个线程,而后启动它并等待完成。因为线程的构造函数只接受一个无任何返回结果的方法,咱们使用了lambda表达式来将对Test方法的调用包起来。咱们经过打印出Thread. CurrentThread.IsThreadPoolThread属性值来确,保该线程不是来自线程池。咱们也打印出了受管理的线程ID来识别代码是被哪一个线程执行的。
而后定义了一个委托并调用Beginlnvoke方法来运行该委托。BeginInvoke方法接受一个回调函数。该回调函数会在异步操做完成后会被调用,而且一个用户自定义的状态会传给该回调函数。该状态一般用于区分异步调用。结果,咱们获得了一个实现了IAsyncResult接口的result对象。BeginInvoke当即返回告终果,当线程池中的工做线程在执行异步操做时,仍容许咱们继续其余工做。当须要异步操做的结果时,能够使用BeginInvoke方法调用返回的result对象。咱们能够使用result对象的IsCompleted属性轮询结果。可是在本例子中,使用的是AsyncWaitHandle属性来等待直到操做完成。当操做完成后,会获得一个结果,能够经过委托调用EndInvoke方法,将IAsyncResult对象传递给委托参数。
事实上使用AsyncWaitHandle并非必要的。若是注释掉r.AsyncWaitHandle.WaitOne,代码照样能够成功运行, 由于EndInvoke方法事实上会等待异步操做完成。调用 "EndInvoke方法(或者针对其余异步API的EndOperationName方法)是很是重要的, '由于该方法会将任何未处理的异常抛回到调用线程中。当使用这种异步API时,请确保始终调用了Begin和End方法。
当操做完成后,传递给BeginInvoke方法的回调函数将被放置到线程池中,确切地说是,一个工做线程中。若是在Main方法定义的结尾注释掉Thread.Sleep方法调用,回调函数将不,会被执行。这是由于当主线程完成后,全部的后台线程会被中止,包括该回调函数。对委托和回调函数的异步调用极可能会被同一个工做线程执行。经过工做线程ID能够容易地看出。使用BeginOperationName/EndOperationName方法和.NET中的IAsyncResult对象等方 ,式被称为异步编程模型(或APM模式),这样的方法对被称为异步方法。该模式也被应用于多个,NET类库的API中,但在现代编程中,更推荐使用任务并行库( Task Parallel Library,简称TPL)来组织异步API
class Program { static void Main(string[] args) { const int x = 1; const int y = 2; const string lambdaState = "lambda state 2"; ThreadPool.QueueUserWorkItem(AsyncOperation); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem(AsyncOperation, "async state"); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem( state => { Console.WriteLine("Operation state: {0}", state); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); ThreadPool.QueueUserWorkItem( _ => { Console.WriteLine("Operation state: {0}, {1}", x+y, lambdaState); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private static void AsyncOperation(object state) { Console.WriteLine("Operation state: {0}", state ?? "(null)"); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); } }
首先定义了AsyncOperation方法,其接受单个object类型的参数。而后使用QueueUser WorkItem方法将该方法放到线程池中。接着再次放入该方法,可是此次给方法调用传入了一个状态对象。该对象将做为状态参数传递给AsynchronousOperation方法。
在操做完成后让线程睡眠一秒钟,从而让线程池拥有为新操做重用线程的可能性。若是注释掉全部的Thread.Sleep调用,那么全部打印出的线程ID多半是不同的。若是ID是同样的,那极可能是前两个线程被重用来运行接下来的两个操做。
首先将一个lambda表达式放置到线程池中。这里没什么特别的。咱们使用了labmbda表达式语法,从而无须定义一个单独的方法。
而后,咱们使用闭包机制,从而无须传递lambda表达式的状态。闭包更灵活,容许我,们向异步操做传递一个以上的对象并且这些对象具备静态类型。因此以前介绍的传递对象给,方法回调的机制既冗余又过期。在C#中有了闭包后就再也不须要使用它了。
本节将展现线程池如何工做于大量的异步操做,以及它与建立大量单独的线程的方式有何不一样。
class Program { static void Main(string[] args) { const int numberOfOperations = 500; var sw = new Stopwatch(); sw.Start(); UseThreads(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); sw.Reset(); sw.Start(); UseThreadPool(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); Console.ReadKey(); } static void UseThreads(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Scheduling work by creating threads"); for (int i = 0; i < numberOfOperations; i++) { var thread = new Thread(() => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); thread.Start(); } countdown.Wait(); Console.WriteLine(); } } static void UseThreadPool(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Starting work on a threadpool"); for (int i = 0; i < numberOfOperations; i++) { ThreadPool.QueueUserWorkItem( _ => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); } countdown.Wait(); Console.WriteLine(); } } }
当主程序启动时,建立了不少不一样的线程,每一个线程都运行一个操做。该操做打印出线,程ID并阻塞线程100毫秒。结果咱们建立了500个线程,所有并行运行这些操做。虽然在,个人机器上的总耗时是300毫秒,可是全部线程消耗了大量的操做系统资源。
而后咱们使用了执行一样的任务,只不过不为每一个操做建立一个线程,而将它们放入到线程池中。而后线程池开始执行这些操做。线程池在快结束时建立更多的线程,可是仍然花,费了更多的时间,在我机器上是12秒。咱们为操做系统节省了内存和线程数,可是为此付,出了更长的执行时间。
.本节将经过一个示例来展现如何在线程池中取消异步操做。
class Program { static void Main(string[] args) { using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } Thread.Sleep(TimeSpan.FromSeconds(2)); } static void AsyncOperation1(CancellationToken token) { Console.WriteLine("Starting the first task"); for (int i = 0; i < 5; i++) { if (token.IsCancellationRequested) { Console.WriteLine("The first task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The first task has completed succesfully"); } static void AsyncOperation2(CancellationToken token) { try { Console.WriteLine("Starting the second task"); for (int i = 0; i < 5; i++) { token.ThrowIfCancellationRequested(); Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The second task has completed succesfully"); } catch (OperationCanceledException) { Console.WriteLine("The second task has been canceled."); } } private static void AsyncOperation3(CancellationToken token) { bool cancellationFlag = false; token.Register(() => cancellationFlag = true); Console.WriteLine("Starting the third task"); for (int i = 0; i < 5; i++) { if (cancellationFlag) { Console.WriteLine("The third task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The third task has completed succesfully"); } }
本节中介绍了CancellationTokenSource和CancellationToken两个新类。它们在.NET4.0被引人, 目前是实现异步操做的取消操做的事实标准。因为线程池已经存在了很长时间,并,没有特殊的API来实现取消标记功能,可是仍然能够对线程池使用上述API。
在本程序中使用了三种方式来实现取消过程。第一个是轮询来检查CancellationToken.IsCancellationRequested属性。若是该属性为true,则说明操做须要被取消,咱们必须放弃该操做。
第二种方式是抛出一个OperationCancelledException异常。这容许在操做以外控制取消过程,即须要取消操做时,经过操做以外的代码来处理。
最后一种方式是注册一个回调函数。当操做被取消时,在线程池将调用该回调函数。这容许链式传递一个取消逻辑到另外一个异步操做中。
本节将描述如何在线程池中对操做实现超时,以及如何在线程池中正确地等待。
class Program { static void Main(string[] args) { RunOperations(TimeSpan.FromSeconds(5)); RunOperations(TimeSpan.FromSeconds(7)); } static void RunOperations(TimeSpan workerOperationTimeout) { using (var evt = new ManualResetEvent(false)) using (var cts = new CancellationTokenSource()) { Console.WriteLine("Registering timeout operations..."); var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut), null, workerOperationTimeout, true); Console.WriteLine("Starting long running operation..."); ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt)); Thread.Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2))); worker.Unregister(evt); } } static void WorkerOperation(CancellationToken token, ManualResetEvent evt) { for(int i = 0; i < 6; i++) { if (token.IsCancellationRequested) { return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } evt.Set(); } static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut) { if (isTimedOut) { cts.Cancel(); Console.WriteLine("Worker operation timed out and was canceled."); } else { Console.WriteLine("Worker operation succeded."); } } }
线程池还有一个有用的方法: ThreadPool.RegisterWaitForSingleObject,该方法容许咱们将回调函数放入线程池中的队列中。当提供的等待事件处理器收到信号或发生超时时,该回调函数将被调用。这容许咱们为线程池中的操做实现超时功能。
首先按顺序向线程池中放入一个耗时长的操做。它运行6秒钟而后一旦成功完成,会设置一个ManualResetEvent信号类。其余的状况下,好比须要取消操做,则该操做会被丢弃。 .
而后咱们注册了第二个异步操做。当从ManualResetEvent对象接受到一个信号后,该异步操做会被调用。若是第一个操做顺利完成,会设置该信号量。另外一种状况是第一个操做还未完成就已经超时。若是发生了该状况,咱们会使用CancellationToken来取消第一个操做。
最后,为操做提供5秒的超时时间是不够的。这是由于操做会花费6秒来完成,只能取消该操做。因此若是提供7秒的超时时间是可行的,该操做会顺利完成。
当有大量的线程必须处于阻塞状态中等待一些多线程事件发信号时,以上方式很是有,用。借助于线程池的基础设施,咱们无需阻塞全部这样的线程。能够释放这些线程直到信号事件被设置。在服务器端应用程序中这是个很是重要的应用场景,由于服务器端应用程序要求高伸缩性及高性能。
本节将描述如何使用System.Threading. Timer对象来在线程池中建立周期性调用的异步
class Program { static void Main(string[] args) { Console.WriteLine("Press 'Enter' to stop the timer..."); DateTime start = DateTime.Now; _timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)); Thread.Sleep(TimeSpan.FromSeconds(6)); _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4)); Console.ReadLine(); _timer.Dispose(); Console.ReadKey(); } static Timer _timer; static void TimerOperation(DateTime start) { TimeSpan elapsed = DateTime.Now - start; Console.WriteLine("{0} seconds from {1}. Timer thread pool thread id: {2}", elapsed.Seconds, start,Thread.CurrentThread.ManagedThreadId); } }
咱们首先建立了一个Timer实例。第一个参数是一个1ambda表达式,将会在线程池中被执行。咱们调用TimerOperation方法并给其提供一个起始时间。因为无须使用用户状态对象,因此第二个参数为null,而后指定了何时会第一次运行TimerOperation,以及以后 "再次调用的间隔时间。因此第一个值实际上说明一秒后会启动第一次操做,而后每隔两秒再,次运行。
以后等待6秒后修改计时器。在调用timer.Change方法一秒后启动TimerOperation,而后每隔4秒再次运行。
计时器还能够更复杂:能够以更复杂的方式使用计时器。好比,能够经过Timeout.Infinet值提供给计时器个间隔参数来只容许计时器操做一次。而后在计时器异步操做内,可以设置下一次计,时器操做将被执行的时间。具体时间取决于自定义业务逻辑。
class Program { static void Main(string[] args) { var bw = new BackgroundWorker(); bw.WorkerReportsProgress = true; bw.WorkerSupportsCancellation = true; bw.DoWork += Worker_DoWork; bw.ProgressChanged += Worker_ProgressChanged; bw.RunWorkerCompleted += Worker_Completed; bw.RunWorkerAsync(); Console.WriteLine("Press C to cancel work"); do { if (Console.ReadKey(true).KeyChar == 'C') { bw.CancelAsync(); } } while(bw.IsBusy); } static void Worker_DoWork(object sender, DoWorkEventArgs e) { Console.WriteLine("DoWork thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); var bw = (BackgroundWorker) sender; for (int i = 1; i <= 100; i++) { if (bw.CancellationPending) { e.Cancel = true; return; } if (i%10 == 0) { bw.ReportProgress(i); } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } e.Result = 42; } static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e) { Console.WriteLine("{0}% completed. Progress thread pool thread id: {1}", e.ProgressPercentage, Thread.CurrentThread.ManagedThreadId); } static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e) { Console.WriteLine("Completed thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); if (e.Error != null) { Console.WriteLine("Exception {0} has occured.", e.Error.Message); } else if (e.Cancelled) { Console.WriteLine("Operation has been canceled."); } else { Console.WriteLine("The answer is: {0}", e.Result); } } }
当程序启动时,建立了一个BackgroundWorker组件的实例。显式地指出该后台工做线,程支持取消操做及该操做进度的通知。
接下来是最有意思的部分。咱们没有使用线程池和委托,而是使用了另外一个C#语法,称为事件。事件表示了一些通知的源或当通知到达时会有所响应的一系列订阅者。在本例中,咱们将订阅三个事件,当这些事件发生时,将调用相应的事件处理器。当事件通知其订,阅者时,具备特殊的定义签名的方法将被调用。
所以,除了将异步API组织为Begin/End方法对,还能够只启动一个异步操做而后订阅给不一样的事件。这些事件在该操做执行时会被触发。这种方式被称为基于事件的异步模式, ( Event-based Asynchronous Pattern,简称EAP)。这是历史上第二种用来构造异步程序的方,式,如今更推荐使用TPL
咱们共定义了三个事件。第一个是oWork事件。当一个后台工做对象经过RunWorkerAsync方法启动一个异步操做时,该事件处理器将被调用。该事件处理器将会运行在线程池中。若是须要取消操做,则这里是主要的操做点来取消执行。同时也能够提供该操做的运行进程信,息。最后,获得结果后,将结果设置给事件参数,而后RunWorkerCompleted事件处理器将,被调用。在该方法中,能够知道操做是成功完成,仍是发生错误,抑或被取消。
基于此, BackgroundWorker组件实际上被使用于Windows窗体应用程序(Windows Forms Applications,简称WPF)中。该实现经过后台工做事件处理器的代码能够直接与UI控制器交互。与线程池中的线程与UI控制器交互的方式相比较,使用BackgroundWorker组件的方式更加天然和好用。
咱们在以前的章节中学习了什么是线程,如何使用线程,以及为何须要线程池。使用线程池能够使咱们在减小并行度花销时节省操做系统资源。咱们能够认为线程池是一个抽象层,其向程序员隐藏了使用线程的细节,使咱们专心处理程序逻辑,而不是各类线程,问题。
然而使用线程池也至关复杂。从线程池的工做线程中获取结果并不容易。咱们须要实现,自定义方式来获取结果,并且万一有异常发生,还需将异常正确地传播到初始线程中。除此,之外,建立一组相关的异步操做,以及实现当前操做执行完成后下一操做才会执行的逻辑也不容易。在尝试解决这些问题的过程当中,建立了异步编程模型及基于事件的异步模式。在第3章中提到过基于事件的异步模式。这些模式使得获取结果更容易,传播异常也更轻松,可是组,合多个异步操做仍需大量工做,须要编写大量的代码。
为了解决全部的问题, Net Framework4.0引入了一个新的关于异步操做的API,它叫作.任务并行库( Task Parallel Library,简称TPL), .Net Framework 4.5版对该API进行了轻微的改进,使用更简单。在本书的项目中将使用最新版的TPL,即.Net Framework 4.5版中的 API, TPL可被认为是线程池之上的又一个抽象层,其对程序员隐藏了与线程池交互的底层代码,并提供了更方便的细粒度的APL, TPL的核心概念是任务。一个任务表明了一个异步操做,该操做能够经过多种方式运行,能够使用或不使用独立线程运行。在本章中将探究任务的全部使用细节。
默认状况下,程序员无须知道任务其实是如何执行的。TPL经过向用户隐藏任务的实现细节从而建立一个抽象层。遗憾的是,有些状况下这会致使诡秘的错误,好比试图获取任务的结果时程序被挂起。本章有助于理解TPL底层的原理,以及如何避免不恰当的使用方式。
一个任务能够经过多种方式和其余任务组合起来。例如,能够同时启动多个任务,等待全部任务完成,而后运行一个任务对以前全部任务的结果进行一些计算。TPL与以前的模式相比,其中一个关键优点是其具备用于组合任务的便利的API,
处理任务中的异常结果有多种方式。因为一个任务可能会由多个其余任务组成,这些任,务也可能依次拥有各自的子任务,因此有一个AggregateException的概念。这种异常能够捕获底层任务内部的全部异常,并容许单独处理这些异常。
并且,最后但并非最不重要的, C# 5.0已经内置了对TPL的支持,容许咱们使用新的 await和async关键字以平滑的、舒服的方式操做任务。
在本章中咱们将学习使用TPL来执行异步操做。咱们将学习什么是任务,如何用不一样的,方式建立任务,以及如何将任务组合在一块儿。咱们会讨论如何将遗留的APM和EAP模式转换为使用任务,还有如何正确地处理异常,如何取消任务,以及如何使多个任务同时执行。另外,还将讲述如何在Windows GUI应用程序中正确地使用任务。
class Program { static void Main(string[] args) { var t1 = new Task(() => TaskMethod("Task 1")); var t2 = new Task(() => TaskMethod("Task 2")); t2.Start(); t1.Start(); Task.Run(() => TaskMethod("Task 3")); Task.Factory.StartNew(() => TaskMethod("Task 4")); Task.Factory.StartNew(() => TaskMethod("Task 5"), TaskCreationOptions.LongRunning); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static void TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序运行时,咱们使用Task的构造函数建立了两个任务。咱们传入一个lambda表达式做为Action委托。这能够使咱们给TaskMethod提供一个string参数。而后使用Start方法运行这些任务。
请注意只有调用了这些任务的Start方法,才会执行任务。很容易忘记真正启动任务。
而后使用Task.Run和Task.Factory.StartNew方法来运行了另外两个任务。与使用Task构造函数的不一样之处在于这两个被建立的任务会当即开始工做,因此无需显式地调用这些任务的Start方法。从Task 1到Task 4的全部任务都被放置在线程池的工做线程中并以未指定,的顺序运行。若是屡次运行该程序,就会发现任务的执行顺序是不肯定的。
Task.Run方法只是Task.Factory.StartNew的一个快捷方式,可是后者有附加的选项。通!常若是无特殊需求,则可以使用前一个方法,如Task 5所示。咱们标记该任务为长时间运行,结果该任务将不会使用线程池,而在单独的线程中运行。然而,根据运行该任务的当前的任务调度程序( task scheduler)运行方式有可能不一样。
本节将描述如何从任务中获取结果值。咱们将经过几个场景来了解在线程池中和主线程中运行任务的不一样之处。
class Program { static void Main(string[] args) { TaskMethod("Main Thread Task"); Task<int> task = CreateTask("Task 1"); task.Start(); int result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 2"); task.RunSynchronously(); result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 3"); Console.WriteLine(task.Status); task.Start(); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); result = task.Result; Console.WriteLine("Result is: {0}", result); Console.ReadKey(); } static Task<int> CreateTask(string name) { return new Task<int>(() => TaskMethod(name)); } static int TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); return 42; } }
首先直接运行TaskMethod方法,这里并无把它封装到一个任务中。结果根据它提供给咱们的主线程的信息能够得知该方法是被同步执行的。很显然它不是线程池中的线程。
而后咱们运行了Task 1,使用Start方法启动该任务并等待结果。该任务会被放置在线程池中,而且主线程会等待,直到任务返回前一直处于阻塞状态。
Task 2和Task 1相似,除了Task 2是经过RunSynchronously()方法运行的。该任务会运行在主线程中,该任务的输出与第一个例子中直接同步调用TaskMethod的输出彻底同样。这是个很是好的优化,能够避免使用线程池来执行很是短暂的操做。
咱们用以运行Task 1相同的方式来运行Task 3,但此次没有阻塞主线程,只是在该任务完成前循环打印出任务状态。结果展现了多种任务状态,分别是Creatd, Running和 RanToCompletion.
本节将展现如何设置相互依赖的任务。咱们将学习如何建立一个任务,使其在父任务完成后才会被运行。另外,将探寻为很是短暂的任务节省线程开销的可能性。
class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); firstTask.ContinueWith( t => Console.WriteLine("The first answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); Task continuation = secondTask.ContinueWith( t => Console.WriteLine("The second answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); continuation.GetAwaiter().OnCompleted( () => Console.WriteLine("Continuation Task Completed! Thread id {0}, is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread)); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(); firstTask = new Task<int>(() => { var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5), TaskCreationOptions.AttachedToParent); innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent); return TaskMethod("First Task", 2); }); firstTask.Start(); while (!firstTask.IsCompleted) { Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(10)); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
当主程序启动时,咱们建立了两个任务,并为第一个任务设置了一个后续操做( continuation,一个代码块,会在当前任务完成后运行),而后启动这两个任务并等待4秒,这个时间足够两个任务完成。而后给第二个任务运行另外一个后续操做,并经过指定TaskContinuationOptions."ExecuteSynchronously选项来尝试同步执行该后续操做。若是后续操做耗时很是短暂,使用以上方式是很是有用的,由于放置在主线程中运行比放置在线程池中运行要快。能够实现这一点是由于第二个任务刚好在那刻完成。若是注释掉4秒的Thread.Sleep方法,将会看到该代码被放置到线程池中,这是由于还未从以前的任务中获得结果。
最后咱们为以前的后续操做也定义了一个后续操做,但这里使用了一个稍微不一样的方式,即便用了新的GetAwaiter和OnCompleted方法。这些方法是C# 5.0语言中异步机制中的方法。
本节示例的最后部分与父子线程有关。咱们建立了一个新任务,当运行该任务时,经过提供一个TaskCreationOptions.AttachedToParent选项来运行一个所谓的子任务。
子任务必须在父任务运行时建立,并正确的附加给父任务!
这意味着只有全部子任务结束工做,父任务才会完成。经过提供一个TaskContinuation Options选项也能够给在子任务上运行后续操做。该后续操做也会影响父任务,而且直到最后一个子任务结束它才会运行完成。
本节将说明如何将过期的APM API转换为任务。多个示例覆盖了转换过程当中可能发生的不一样状况。
class Program { private static void Main(string[] args) { int threadId; AsynchronousTask d = Test; IncompatibleAsynchronousTask e = Test; Console.WriteLine("Option 1"); Task<string> task = Task<string>.Factory.FromAsync( d.BeginInvoke("AsyncTaskThread", Callback, "a delegate asynchronous call"), d.EndInvoke); task.ContinueWith(t => Console.WriteLine("Callback is finished, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 2"); task = Task<string>.Factory.FromAsync( d.BeginInvoke, d.EndInvoke, "AsyncTaskThread", "a delegate asynchronous call"); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 3"); IAsyncResult ar = e.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); task = Task<string>.Factory.FromAsync(ar, _ => e.EndInvoke(out threadId, ar)); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}, ThreadId: {1}", t.Result, threadId)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } private delegate string AsynchronousTask(string threadName); private delegate string IncompatibleAsynchronousTask(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(string threadName) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); Thread.CurrentThread.Name = threadName; return string.Format("Thread name: {0}", Thread.CurrentThread.Name); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
这里咱们定义了两种委托。其中一个使用了out参数,所以在将APM模式转换为任务,时,与标准的TPLAPI是不兼容的。这样的转换有三个示例。
将APM转换为TPL的关键点是Task<T>.Factory.FromAsync方法, T是异步操做结果的类型。该方法有数个重载。在第一个例子中传人了IAsyncResult和Func<lAsyncResult, string?,这是一个将IAsyncResult的实现做为参数并返回一个字符串的方法。因为第一个委托类型提供的EndMethod与该签名是兼容的,因此将该委托的异步调用转换为任务没有任何问题。
第二个例子作的事与第一个很是类似,可是使用了不一样的FromAsync方法重载,该重载 ,并不容许指定一个将会在异步委托调用完成后被调用的回调函数。但咱们能够使用后续操做,替代它。但若是回调函数很重要,能够使用第一个例子所示的方法。
最后一个例子展现了一个小技巧。此次IncompatibleAsynchronousTask委托的 EndMethod使用了out参数,与FromAsync方法重载并不兼容。然而,能够很容易地将 EndMethod调用封装到一个lambda表达式中,从而适合任务工厂方法。
能够在等待异步操做结果过程当中打印出任务状态,从而了解底层任务的运行状况。能够看到第一个任务的状态为WaitingForActivation,这意味着TPL基础设施实际上还未启动该任务。
本节将描述如何将基于事件的异步操做转换为任务。在本节中,你将发现有一个可靠的模式可适用于.Net Framework类库中的全部基于事件的异步API.
class Program { static void Main(string[] args) { var tcs = new TaskCompletionSource<int>(); var worker = new BackgroundWorker(); worker.DoWork += (sender, eventArgs) => { eventArgs.Result = TaskMethod("Background worker", 5); }; worker.RunWorkerCompleted += (sender, eventArgs) => { if (eventArgs.Error != null) { tcs.SetException(eventArgs.Error); } else if (eventArgs.Cancelled) { tcs.SetCanceled(); } else { tcs.SetResult((int)eventArgs.Result); } }; worker.RunWorkerAsync(); int result = tcs.Task.Result; Console.WriteLine("Result is: {0}", result); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
这是一个将EAP模式转换为任务的既简单又优美的示例。关键点在于使用TaskCompletionSource<T>类型, T是异步操做结果类型。
不要忘记将tcs.SetResult调用封装在try-catch代码块中,从而保证错误信息始终会设置给任务完成源对象。也能够使用TrySetResult方法来替代SetResult方法,以保证结果能被成功设置。
本节是关于如何给基于任务的异步操做实现取消流程。咱们将学习如何正确的使用取消标志,以及在任务真正运行前如何得知其是否被取消。
class Program { private static void Main(string[] args) { var cts = new CancellationTokenSource(); var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token), cts.Token); Console.WriteLine(longTask.Status); cts.Cancel(); Console.WriteLine(longTask.Status); Console.WriteLine("First task has been cancelled before execution"); cts = new CancellationTokenSource(); longTask = new Task<int>(() => TaskMethod("Task 2", 10, cts.Token), cts.Token); longTask.Start(); for (int i = 0; i < 5; i++ ) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } cts.Cancel(); for (int i = 0; i < 5; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } Console.WriteLine("A task has been completed with result {0}.", longTask.Result); Console.ReadKey(); } private static int TaskMethod(string name, int seconds, CancellationToken token) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); for (int i = 0; i < seconds; i ++) { Thread.Sleep(TimeSpan.FromSeconds(1)); if (token.IsCancellationRequested) return -1; } return 42*seconds; } }
第3章中咱们已经讨论了取消标志概念,你已经至关熟悉了。而本节又是一个关于为TPL任务实现取消选项的简单例子。
首先仔细看看longTask的建立代码。咱们将给底层任务传递一次取消标志,而后给任务构造函数再传递一次。为何须要提供取消标志两次呢?
答案是若是在任务实际启动前取消它,该任务的TPL基础设施有责任处理该取消操做,由于这些代码根本不会执行。经过获得的第一个任务的状态能够知道它被取消了。若是尝试对该任务调用Start方法,将会获得InvalidOperationException异常。
而后须要本身写代码来处理取消过程。这意味着咱们对取消过程全权负责,而且在取消,任务后,任务的状态仍然是RanToCompletion,由于从TPL的视角来看,该任务正常完成了它的工做。辨别这两种状况是很是重要的,而且须要理解每种状况下职责的不一样。
本节将描述异步任务中处理异常这一重要的主题。咱们将讨论任务中抛出异常的不一样状况及如何获取这些异常信息
class Program { static void Main(string[] args) { Task<int> task; try { task = Task.Run(() => TaskMethod("Task 1", 2)); int result = task.Result; Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); try { task = Task.Run(() => TaskMethod("Task 2", 2)); int result = task.GetAwaiter().GetResult(); Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); var t1 = new Task<int>(() => TaskMethod("Task 3", 3)); var t2 = new Task<int>(() => TaskMethod("Task 4", 2)); var complexTask = Task.WhenAll(t1, t2); var exceptionHandler = complexTask.ContinueWith(t => Console.WriteLine("Exception caught: {0}", t.Exception), TaskContinuationOptions.OnlyOnFaulted ); t1.Start(); t2.Start(); Thread.Sleep(TimeSpan.FromSeconds(5)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); throw new Exception("Boom!"); return 42 * seconds; } }
当程序启动时,建立了一个任务并尝试同步获取任务结果。Result属性的Get部分会使,当前线程等待直到该任务完成,并将异常传播给当前线程。在这种状况下,经过catch代码块能够很容易地捕获异常,可是该异常是一个被封装的异常,叫作AggregateException。在本例中,它里面包含一个异常,由于只有一个任务抛出了异常。能够访问InnerException属性来获得底层异常。
第二个例子与第一个很是类似,不一样之处是使用GetAwaiter和GetResult方法来访问任务结果。这种状况下,无需封装异常,由于TPL基础设施会提取该异常。若是只有一个底层,任务,那么一次只能获取一个原始异常,这种设计很是合适。
最后一个例子展现了两个任务抛出异常的情形。如今使用后续操做来处理异常。只有以前,的任务完成前有异常时,该后续操做才会被执行。经过给后续操做传递TaskContinuationOptions.OnlyOnFaulted选项能够实现该行为。结果打印出了AggregateException,其内部封装了两个任,务抛出的异常。
本节展现了如何同时运行多个异步任务。咱们将学习当全部任务都完成或任意一个任务,完成了工做时,如何高效地获得通知。
class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); var whenAllTask = Task.WhenAll(firstTask, secondTask); whenAllTask.ContinueWith(t => Console.WriteLine("The first answer is {0}, the second is {1}", t.Result[0], t.Result[1]), TaskContinuationOptions.OnlyOnRanToCompletion ); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); var tasks = new List<Task<int>>(); for (int i = 1; i < 4; i++) { int counter = i; var task = new Task<int>(() => TaskMethod(string.Format("Task {0}", counter), counter)); tasks.Add(task); task.Start(); } while (tasks.Count > 0) { var completedTask = Task.WhenAny(tasks).Result; tasks.Remove(completedTask); Console.WriteLine("A task has been completed with result {0}.", completedTask.Result); } Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
当程序启动时,建立了两个任务。而后借助于Task.WhenAll方法,建立了第三个任务,该任务将会在全部任务完成后运行。该任务的结果提供了一个结果数组,第一个元素是第.个任务的结果,第二个元素是第二个任务的结果,以此类推。
而后咱们建立了另一系列任务,并使用Task.WhenAny方法等待这些任务中的任何一 ,个完成。当有一个完成任务后,从列表中移除该任务并继续等待其余任务完成,直到列表为, 4空。获取任务的完成进展状况或在运行任务时使用超时,均可以使用Task.WhenAny方法。例如,咱们等待一组任务运行,而且使用其中一个任务用来记录是否超时。若是该任务先完,成,则只需取消掉其余还未完成的任务。
一、新建一个C# WPF应用程序项目
二、在MainWindow.xaml文件中,将下面的标记代码加入到一个网格元素中(即<Grid和<Grid>标签间):
<TextBlock Name="ContentTextBlock" HorizontalAlignment="Left" Margin="44,134,0,0" VerticalAlignment="Top" Width="425" Height="40"/> <Button Content="Sync" HorizontalAlignment="Left" Margin="45,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonSync_Click"/> <Button Content="Async" HorizontalAlignment="Left" Margin="165,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsync_Click"/> <Button Content="Async OK" HorizontalAlignment="Left" Margin="285,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsyncOK_Click"/>
三、在MainWindow.xaml.cs文件中使用如下using指令;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Input;
四、在MainWindow构造函数下面加入如下代码片断:
void ButtonSync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; try { //string result = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()).Result; string result = TaskMethod().Result; ContentTextBlock.Text = result; } catch (Exception ex) { ContentTextBlock.Text = ex.InnerException.Message; } } void ButtonAsync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(); task.ContinueWith(t => { ContentTextBlock.Text = t.Exception.InnerException.Message; Mouse.OverrideCursor = null; }, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.FromCurrentSynchronizationContext()); } void ButtonAsyncOK_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()); task.ContinueWith(t => Mouse.OverrideCursor = null, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.FromCurrentSynchronizationContext()); } Task<string> TaskMethod() { return TaskMethod(TaskScheduler.Default); } Task<string> TaskMethod(TaskScheduler scheduler) { Task delay = Task.Delay(TimeSpan.FromSeconds(5)); return delay.ContinueWith(t => { string str = string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); ContentTextBlock.Text = str; return str; }, scheduler); }
本例中引人了不少新鲜的东西。首先,建立了一个WPF应用程序,而不是一个命令行,程序。这是颇有必要的,由于咱们须要一个拥有消息循环的用户界面线程来演示异步运行任,务的不一样情形。
TaskScheduler是一个很是重要的抽象。该组件实际上负责如何执行任务。默认的任务调度程序将任务放置到线程池的工做线程中。这是很是常见的场景,因此TPL将其做为默认选项并不用奇怪。咱们已经知道了如何同步运行任务,以及如何将任务附加到父任务上从而一块儿运行。如今让咱们看看使用任务的其余方式。
当程序启动时,建立了一个包含三个按钮的窗口。第一个按钮调用了一个同步任务的执行。该代码被放置在ButtonSync Click方法中。当任务运行时,咱们甚至没法移动应用程序,窗口。当用户界面线程忙于运行任务时,整个用户界面被彻底冻结,在任务完成前没法响应任何消息循环。对于GUI窗口程序来讲这是一个至关很差的实践,咱们须要找到一个方式来,解决该问题 ,
第二个问题是咱们尝试从其余线程访问UI控制器。图形用户界面控制器从没有被设计,为可被多线程使用,而且为了不可能的错误,不容许从建立UI的线程以外的线程中访问, U1组件。当咱们尝试这样作时,获得了一个异常,该异常信息5秒后打印到了主窗口中。
为了解决第一个问题,咱们尝试异步运行任务。第二个按钮就是这样作的。该代码被,.放置在ButtonAsync Click方法中。当使用调试模式运行该任务时,将会看到该任务被放置,在线程池中,最后将获得一样的异常。然而,当任务运行时用户界面一直保持响应。这是好事,可是咱们仍须要除掉异常。
其实咱们已经解决了该问题。给TaskScheduler.FromCurrentSynchronizationContext选项提供一个后续操做用于输出错误信息。若是不这样作,咱们将没法看到错误信息,由于可能会获得在任务中产生的相同异常。该选项驱使TPL基础设施给U1线程的后续操做中放入代码,并借助UI线程消息循环来异步运行该代码。这解决了从其余线程访问UI控制器并仍保持U1处于响应状态的问题。
为了检查是否真的是这样,能够按下最后一个按钮来运行ButtonAsyncOK-Click方法中的代码。与其他例子不一样之处在于咱们将UI线程任务调度程序提供给了该任务。你将看到 ,任务以异步的方式运行在UI线程中。U1依然保持响应。甚至尽管等待光标处于激活状态,你仍能够按下另外一个按钮,
然而使用U1线程运行任务有一些技巧。若是回到同步任务代码,取消对使用UI线程任务调度程序获取结果的代码行的注释,咱们将永远得不到任何结果。这是一个经典的死锁情,况:咱们在UI线程队列中调度了一个操做, U1线程等待该操做完成,但当等待时,它又没法运行该操做,这将永不会结束(甚至永不会开始),若是在任务中调用Wait方法也会发生死锁。为了不死锁,绝对不要经过任务调度程序在U1线程中使用同步操做,请使用C# 5.0中的ContinueWith或async/await方法。
到如今为止,咱们学习了任务并行库,这是微软提供的最新的异步编程基础设施。它容许咱们以模块化的方式设计程序,来组合不一样的异步操做。
遗憾的是,当阅读此类程序时仍然很是难理解程序的实际执行顺序。在大型程序中将会,.有许多相互依赖的任务和后续操做,用于运行其余后续操做的后续操做,处理异常的后续操,做,而且它们都出如今程序代码中不一样的地方。所以了解程序的前后执行次序变成了一个极具挑战性的问题。
另外一个须要关注的问题是,可以接触用户界面控制器的每一个异步任务是否获得了正确的,同步上下文。程序只容许经过UI线程使用这些控制器,不然将会获得多线程访问异常。
说到异常,咱们不得不使用单独的后续操做任务来处理在以前的异步操做中发生的错误。这又致使了分散在代码的不一样部分的复杂的处理错误的代码,逻辑上没法相互关联。
为了解决这些问题, C#5.0的做者引入了新的语言特性,称为异步函数(asynchronous function),它是TPL之上的更高级别的抽象,真正简化了异步编程。正如在第4章提到的,抽象隐藏了主要的实现细节,使得程序员无须考虑许多重要的事情,从而使异步编程更容易。了解异步函数背后的概念是很是重要的,有助于咱们编写健壮的高扩展性的应用程序。
要建立一个异步函数,首先须要用async关键字标注一个方法。若是不先作这个,就不可能拥有async属性或事件访问方法和构造函数。代码以下所示:
另外一个重要的事实是,异步函数必须返回Task或Task<T>类型。能够使用async void方法,可是更推荐使用async Task方法。使用async void方法惟一合理的地方是在程序中使,用顶层UI控制器事件处理器的时候。
使用async关键字标注的方法内部,能够使用await操做符。该操做符可与TPL的任务,一块儿工做,并获取该任务中异步操做的结果。在本章中稍后会讲述细节。在async方法外不能使用await关键字,不然会有编译错误。另外,异步函数在其代码中至少要拥有一个await操做符。然而,若是没有只会致使编译警告,而不是编译错误。
须要注意的是,在执行完await调用的代码行后该方法会当即返回。若是是同步执行,执行线程将会阻塞两秒而后返回结果。这里当执行完await操做后,当即将工做线程,放回线程池的过程当中,咱们会异步等待。2秒后,咱们又一次从线程池中获得工做线程并继续运行其中剩余的异步方法。这容许咱们在等待2秒时重用工做线程作些其余事,这对提升应用程序的可伸缩性很是重要。借助于异步函数咱们拥有了线性的程序控制流,但它,的执行依然是异步的。这虽然好用,可是难以理解。本章将帮助你学习异步函数全部重要的方面。
以个人自身经验而言,若是程序中有两个连续的await操做符,此时程序如何工做有一个常见的误解。不少人认为若是在另外一个异步操做以后使用await函数,它们将会并行运行。然而,事实上它们是顺序运行的,即第一个完成后第二个才会开始运行。记住这一点很重要,在本章中稍后会覆盖该细节。
在C# 5.0中关联async和await有必定的限制。例如,不能把控制台程序的Main方法标,记为async,不能在catch, finally, lock或unsafe代码块中使用await操做符。不容许对任何异步函数使用ref或out参数。还有其余微妙的地方,可是以上已经包括了主要的须要注意的,地方。
异步函数会被C#编译器在后台编译成复杂的程序结构。这里我不会说明该细节。生,成的代码与另外一个C#构造很相似,称为迭代器。生成的代码被实现为一种状态机。尽管不少程序员几乎开始为每一个方法使用async修饰符,我仍是想强调若是方法原本无需异步 ,或并行运行,那么将该方法标注为async是没有道理的。调用async方法会有显著的性能。损失,一般的方法调用比使用async关键字的一样的方法调用要快上40~50倍。请注意这一点。
在本章中咱们将学习如何使用C# 5.0中的async和await关键字实现异步操做。本章将讲述如何使用await按顺序或并行地执行异步操做,还将讨论如何在lambda表达式中使,用await,如何处理异常,以及在使用async void方法时如何避免陷阱。在本章结束前,咱们会深刻探究同步上下文传播机制并学习如何建立自定义的awaitable对象,从而无需使用任务。
.本节将讲述使用异步函数的基本场景。咱们将比较使用TPL和使用await操做符获取异步操做结果的不一样之处。
class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { Task<string> t = GetInfoAsync("Task 1"); Task t2 = t.ContinueWith(task => Console.WriteLine(t.Result), TaskContinuationOptions.NotOnFaulted); Task t3 = t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted); return Task.WhenAny(t2, t3); } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Task 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { await Task.Delay(TimeSpan.FromSeconds(2)); //throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序运行时运行了两个异步操做。其中一个是标准的TPL模式的代码,第二个使用了 C#的新特性async和awaito。AsynchronyWithTPL方法启动了一个任务,运行两秒后返回关于工做线程信息的字符串。而后咱们定义了一个后续操做,用于在异步操做完成后打印出该 "操做结果,还有另外一个后续操做,用于万一有错误发生时打印出异常的细节。最终,返回了一个表明其中一个后续操做任务的任务,并等待其在Main函数中完成。
在AsynchronyWithAwait方法中,咱们对任务使用await并获得了相同的结果。这和编写一般的同步代码的风格同样,即咱们获取任务的结果,打印出结果,若是任务完成时带有 "错误则捕获异常。关键不一样的是这其实是一个异步程序。使用await后, C#当即建立了一 1个任务,其有一个后续操做任务,包含了await操做符后面的全部剩余代码。这个新任务也处理了异常传播。而后,将该任务返回到主方法中并等待其完成
请注意根据底层异步操做的性质和当前异步的上下文,执行异步代码的具体方式可能会不一样。稍后在本章中会解释这一点。
所以能够看到程序的第一部分和第二部分在概念上是等同的,可是在第二部分中C# ,编译器隐式地处理了异步代码。事实上,第二部分比第一部分更复杂,接下来咱们将讲述,细节。
请记住在Windows GUI或ASPNET之类的环境中不推荐使用Task.Wait和Task.Result方法。若是程序员不是百分百地清楚代码在作什么,极可能会致使死锁。在第4章的4.10节中,在WPF应用程序中使用Task.Result时已经演示了该一点。
请取消对GetInfoAsync方法的throw new Exception代码行的注释来测试异常处理是否工做。
本节将展现如何在lambda表达式中使用await,咱们将编写一个使用了await的匿名方法,而且获取异步执行该方法的结果。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { Func<string, Task<string>> asyncLambda = async name => { await Task.Delay(TimeSpan.FromSeconds(2)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); }; string result = await asyncLambda("async lambda"); Console.WriteLine(result); } }
首先,因为不能在Main方法中使用async,咱们将异步函数移到了Asynchronous Processing方法中。而后使用async关键字声明了一个lambda表达式。因为任何lambda表达式的类型都不能经过lambda自身来推断,因此不得不显式向C#编译器指定它的类型。在本例中,该类型说明该lambda表达式接受一个字符串参数,并返回一个Task<string>对象。
接着,咱们定义了lambda表达式体。有个问题是该方法被定义为返回一个Task<string>对象,但实际上返回的是字符串,却没有编译错误!这是由于C#编译器自动产生一个任务,并返回给咱们。
最后一步是等待异步lambda表达式执行并打印出结果。
本节将展现当代码中有多个连续的await方法时程序的实际流程是怎样的。咱们将学习如何阅读有await方法的代码,以及理解为何await调用是异步操做。
class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { var containerTask = new Task(() => { Task<string> t = GetInfoAsync("TPL 1"); t.ContinueWith(task => { Console.WriteLine(t.Result); Task<string> t2 = GetInfoAsync("TPL 2"); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Result), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }); containerTask.Start(); return containerTask; } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Async 1"); Console.WriteLine(result); result = await GetInfoAsync("Async 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { Console.WriteLine("Task {0} started!", name); await Task.Delay(TimeSpan.FromSeconds(2)); if(name == "TPL 2") throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序运行时,与上节同样运行了两个异步操做。然而此次从AsynchronyWithAwait方法讲起。它看起来仍然像日常的同步代码,惟一不一样之处是使用了两个await声明。最重要的一点是该代码依然是顺序执行的, Async2任务只有等以前的任务完成后才会开始执行。当阅读该代码时,程序流很清晰,能够看到什么先运行,什么后运行。但该程序如何是异步程序呢?首先,它不老是异步的。当使用await时若是一个任务已经完成,咱们会异步地获得该任务结果。不然,当在代码中看到await声明时,一般的行为是方法执行到该await代码行时将当即返回,而且剩下的代码将会在一个后续操做任务中运行。所以等待操做结果时并无阻塞程序执行,这是一个异步调用。当AsynchronyWithAwait方法中的代码在执行时,除了在Main方法中调用t.Wait外,咱们能够执行任何其余任务。然而, "主线程必须等待直到全部异步操做完成,不然主线程完成后全部运行异步操做的后台线程! ",会中止运行。
AsynchronyWithTPL方法模仿了AsynchronyWithAwait的程序流。咱们须要一个容器任务来处理全部相互依赖的任务。而后启动主任务,给其加了一组后续操做。当该任务完成后,会打印出其结果。而后又启动了一个任务,在该任务完成后会依次运行更多的后续操"做。为了测试对异常的处理,当运行第二个任务时故意抛出一个异常,并打印出异常信息。这组后续操做建立了与第一个方法中同样的程序流。若是用它与await方法比较,能够看到它更容易阅读和理解。惟一的技巧是请记住异步并不老是意味着并行执行。
本节将学习如何使用await来并行地运行异步任务,而不是采用经常使用的顺序执行。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 5); string[] results = await Task.WhenAll(t1, t2); foreach (string result in results) { Console.WriteLine(result); } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); //await Task.Run(() => Thread.Sleep(TimeSpan.FromSeconds(seconds))); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
这里定义了两个异步任务,分别运行3秒和5秒。而后使用Task.WhenAll辅助方法创!建了另外一个任务,该任务只有在全部底层任务完成后才会运行。以后咱们等待该组合任务的,结果。5秒后,咱们获取了全部结果,说明了这些任务是同时运行的。
然而这里观察到一个有意思的现象。当运行该程序时,你可能注意到这两个任务似平是,被线程池中的同一个工做线程执行的。当咱们并行运行任务时怎么可能发生这样的事情呢?为了让事情更有趣,咱们来注释掉GetIntroAsync方法中的await Task.Delay代码行,并解除,对await Task.Run代码行的注释,而后再次运行程序。
咱们会看到该状况下两个任务会被不一样的工做线程执行。不一样之处是Task.Delay在幕后使用了一个计时器,过程以下:从线程池中获取工做线程,它将等待Task.Delay方法返回结,果。而后, Task.Delay方法启动计时器并指定一块代码,该代码会在计时器时间到了Task.Delay方法中指定的秒数后被调用。以后当即将工做线程返回到线程池中。当计时器事件运,行时,咱们又从线程池中任意获取一个可用的工做线程(可能就是运行一个任务时使用的线,程)并运行计时器提供给它的代码。
当使用Task.Run方法时,从线程池中获取了一个工做线程并将其阻塞几秒,具体秒数,由Thread.Sleep方法提供。而后获取了第二个工做线程而且也将其阻塞。在这种场景下.咱们消费了两个工做线程,而它们绝对什么事没作,由于在它们等待时不能执行任何其余,操做。
咱们将在第9章中讨论第一个场景的细节。在第9章咱们将讨论用大量的异步操做进行,数据输入和输出。尽量地使用第一种方式是建立高伸缩性的服务器程序的关键。
本节将描述在C#中使用异步函数时如何处理异常。咱们将学习对多个并行的异步操做,使用await时如何聚合异常。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Console.WriteLine("1. Single exception"); try { string result = await GetInfoAsync("Task 1", 2); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions"); Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 2); try { string[] results = await Task.WhenAll(t1, t2); Console.WriteLine(results.Length); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions with AggregateException"); t1 = GetInfoAsync("Task 1", 3); t2 = GetInfoAsync("Task 2", 2); Task<string[]> t3 = Task.WhenAll(t1, t2); try { string[] results = await t3; Console.WriteLine(results.Length); } catch { var ae = t3.Exception.Flatten(); var exceptions = ae.InnerExceptions; Console.WriteLine("Exceptions caught: {0}", exceptions.Count); foreach (var e in exceptions) { Console.WriteLine("Exception details: {0}", e); Console.WriteLine(); } } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); throw new Exception(string.Format("Boom from {0}!", name)); } }
咱们运行了三个场景来展现在C#中使用async和await时关于错误处理的最多见状况。第一种状况是最简单的,而且与常见的同步代码几乎彻底同样。咱们只使用try/catch声明即 ,可获取异常细节。
一个很常见的错误是对一个以上的异步操做使用await时还使用以上方式。若是仍像第一种状况同样使用catch代码块,则只能从底层的AggregateException对象中获得第一个异常。
为了收集全部异常信息,能够使用await任务的Exception属性。在第三种状况中,咱们使用AggregateException的Flatten方法将层级异常放入一个列表,而且从中提取出全部的底层异常。
本节描述了当使用await来获取异步操做结果时,同步上下文行为的细节。咱们将学习,如何以及什么时候关闭同步上下文流。
加入对Windows Presentation Foundation库的引用。
(1)右键点击项目中的引用文件夹,选择添加引用菜单选项。
(2)添加对PresentationCore, PresentationFramework, System.Xaml及Windows.Base库的引用。
class Program { [STAThread] static void Main(string[] args) { var app = new Application(); var win = new Window(); var panel = new StackPanel(); var button = new Button(); _label = new Label(); _label.FontSize = 32; _label.Height = 200; button.Height = 100; button.FontSize = 32; button.Content = new TextBlock { Text = "Start asynchronous operations" }; button.Click += Click; panel.Children.Add(_label); panel.Children.Add(button); win.Content = panel; app.Run(win); Console.ReadLine(); } async static void Click(object sender, EventArgs e) { _label.Content = new TextBlock { Text = "Calculating..." }; TimeSpan resultWithContext = await Test(); TimeSpan resultNoContext = await TestNoContext(); //TimeSpan resultNoContext = await TestNoContext().ConfigureAwait(false); var sb = new StringBuilder(); sb.AppendLine(string.Format("With the context: {0}", resultWithContext)); sb.AppendLine(string.Format("Without the context: {0}", resultNoContext)); sb.AppendLine(string.Format("Ratio: {0:0.00}", resultWithContext.TotalMilliseconds / resultNoContext.TotalMilliseconds)); _label.Content = new TextBlock { Text = sb.ToString() }; } async static Task<TimeSpan> Test() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t; } sw.Stop(); return sw.Elapsed; } async static Task<TimeSpan> TestNoContext() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t.ConfigureAwait( continueOnCapturedContext: false); } sw.Stop(); return sw.Elapsed; } private static Label _label; }
在本例中,咱们将学习异步函数默认行为的最重要的方面之一。咱们已经从第4章中了解了任务调度程序和同步上下文。默认状况下, await操做符会尝试捕获同步上下文,并在其中执行代码。咱们已经知道这有助于咱们编写与用户界面控制器协做的异步代码。另外,使用await不会发生在以前章节中描述过的死锁状况,由于当等待结果时并不会阻塞UI线程。
这是合理的,可是让咱们看看潜在会发生什么事。在本例中,咱们使用编程方式建立了·一个Windows Presentation Foundation应用程序并订阅了它的按钮点击事件。当点击该按钮!时,运行了两个异步操做。其中一个使用了一个常规的await操做符,另外一个使用了带false参数值的ConfigureAwait方法。false参数明确指出咱们不能对其使用捕获的同步上下文来运行后续操做代码。在每一个操做中,咱们测量了执行完成花费的时间,而后将各自的时间和比例显示在主屏幕上。
结果看到常规的await操做符花费了更多的时间来完成。这是由于咱们向UI线程中放,入了成百上千个后续操做任务,这会使用它的消息循环来异步地执行这些任务。在本例中,咱们无需在UI线程中运行该代码,由于异步操做并未访问UI组件。使用带false参数值的, ConfigureAwait方法是一个更高效的方案。
还有一件事值得一提。尝试运行程序并只点击按钮而后等待结果,而后再这样作一次,可是此次点击按钮后尝试随机地拖拽应用程序窗口从一侧到另外一侧。你将注意到在捕获的同步上下文中的代码执行速度变慢了!这个有趣的反作用完美演示了异步编程是多么危险。经历相似的状况是很是容易的,并且若是你以前从未经历过这样的状况,那么几乎不可能经过,调试来找出问题所在。
公平起见,让咱们来看看相反的状况。在前面的代码片断中,在Click方法中,取消注,释的代码行,并注释掉紧挨着它的前一行代码。当运行程序时,咱们将获得多线程控制访问异常,由于设置Label控制器文本的代码不会放置到捕捉的上下文中,而是在线程池的工做,线程中执行。
本节描述了为何使用async void方法很是危险。咱们将学习以及如何尽量地替代该方法。在哪一种状况下可以使用该方,
class Program { static void Main(string[] args) { Task t = AsyncTask(); t.Wait(); AsyncVoid(); Thread.Sleep(TimeSpan.FromSeconds(3)); t = AsyncTaskWithErrors(); while(!t.IsFaulted) { Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine(t.Exception); //try //{ // AsyncVoidWithErrors(); // Thread.Sleep(TimeSpan.FromSeconds(3)); //} //catch (Exception ex) //{ // Console.WriteLine(ex); //} int[] numbers = new[] {1, 2, 3, 4, 5}; Array.ForEach(numbers, async number => { await Task.Delay(TimeSpan.FromSeconds(1)); if (number == 3) throw new Exception("Boom!"); Console.WriteLine(number); }); Console.ReadLine(); } async static Task AsyncTaskWithErrors() { string result = await GetInfoAsync("AsyncTaskException", 2); Console.WriteLine(result); } async static void AsyncVoidWithErrors() { string result = await GetInfoAsync("AsyncVoidException", 2); Console.WriteLine(result); } async static Task AsyncTask() { string result = await GetInfoAsync("AsyncTask", 2); Console.WriteLine(result); } private static async void AsyncVoid() { string result = await GetInfoAsync("AsyncVoid", 2); Console.WriteLine(result); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); if(name.Contains("Exception")) throw new Exception(string.Format("Boom from {0}!", name)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序启动时,咱们经过调用AsyncTask和AsyncVoid这两个方法启动了两个异步操做。第一个方法返回一个Task对象,而另外一个因为被声明为async void因此没有返回值。因为它们都是异步的因此都会当即返回。可是第一个方法经过返回的任务状态或对其调用, Wait方法从而很容易实现监控。等待第二个方法完成的惟一方式是确切地等待多长时间,由于咱们没有声明任何对象能够监控该异步操做的状态。固然能够使用某种共享的状态变量,将其设置到async void方法中,并从调用方法中检查其值,但返回一个Task对象的方式更好些。
最危险的部分是异常处理。使用async void方法,异常处理方法将被放置到当前的同步上下文中,在本例中即线程池中。线程池中未被处理的异常会终结整个进程。使用 AppDomain.UnhandledException事件能够拦截未被处理的异常,但不能从拦截的地方恢复进程。为了重现该场景,能够取消Main方法中对try/catch代码块的注释,而后运行,程序,
关于使用async void lambda表达式的另外一个事实是:它们与Action类型是兼容的,而 Action类型在标准.NET Framework类库中的使用很是普遍。在lambda表达式中很容易忘记对异常的处理,这将再次致使程序崩溃。能够取消在Main方法中第二个被注释的代码块的,注释来重现该场景。
强烈建议只在UI事件处理器中使用async void方法。在其余全部的状况下,请使用返,回Task的方法。
本节将展现如何设计一个与await操做符兼容的很是基础的awaitable类型。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { var sync = new CustomAwaitable(true); string result = await sync; Console.WriteLine(result); var async = new CustomAwaitable(false); result = await async; Console.WriteLine(result); } class CustomAwaitable { public CustomAwaitable(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public CustomAwaiter GetAwaiter() { return new CustomAwaiter(_completeSynchronously); } private readonly bool _completeSynchronously; } class CustomAwaiter : INotifyCompletion { private string _result = "Completed synchronously"; private readonly bool _completeSynchronously; public bool IsCompleted { get { return _completeSynchronously; } } public CustomAwaiter(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public string GetResult() { return _result; } public void OnCompleted(Action continuation) { ThreadPool.QueueUserWorkItem( state => { Thread.Sleep(TimeSpan.FromSeconds(1)); _result = GetInfo(); if (continuation != null) continuation(); }); } private string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } }
为了与await操做符保持兼容,类型应当遵照在C#5.0规格说明中的规定的一些要,求。若是你安装了Visual Studio 2012,那么能够在C:Program FilesMicrosoft Visual Studio11.0VC#Specifications\1033 (假设你使用的是默认安装路径)目录中找到该规格说明文档。
在规格说明文档的7.7.7.1节,咱们发现了awaitable表达式的定义:
Await表达式的任务被要求是awaitable,若是一个表达式t知足下面任意一条则认为是, awaitable的:
这些信息足够咱们开始了。首先咱们定义一个awaitable类型CustomAwaitable,并实现GetAwaiter方法,该方法返回一个CustomAwaiter类型的实例。CustomAwaiter实现了 .INotifyCompletion接口,拥有类型为bool的IsCompleted属性,而且有GetResult方法,该方法返回一个字符串类型。最后,咱们写了一些代码来建立两个CustomAwaitable对象并对,其使用await关键字。
如今咱们应该理解await表达式执行的方式了。这里并无引用规格说明文档,以避免陷入没必要要的细节。基本上,若是IsCompleted属性返回true,则只需同步调用GetResult方法。这种作法防止了该操做已经完成后咱们仍然为执行异步任务而分配资源。经过给 CustomAwaitable对象的构造函数传递completeSynchronously参数来展现该场景。
另外,咱们给CustomAwaiter的OnCompleted方法注册了一个回调函数并启动该异步操做。当操做完成时,就会调用提供的回调函数,该回调函数将会经过调用CustomAwaiter对象的GetResult方法来获取结果。
本节展现了如何设计一个很是基本的类型,该类型可以与await操做符和动态C#类型兼容。
请执行如下步骤来添加对Impromptulnterface NuGet包的引用:
(1)右键点击项目中的引用文件夹,并选择管理NuGet包 菜单选项。
(2)添加对你喜欢的Impromptulnterface NuGet包的引用。能够使用管理NuGet包对话框的搜索功能
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { string result = await GetDynamicAwaitableObject(true); Console.WriteLine(result); result = await GetDynamicAwaitableObject(false); Console.WriteLine(result); } static dynamic GetDynamicAwaitableObject(bool completeSynchronously) { dynamic result = new ExpandoObject(); dynamic awaiter = new ExpandoObject(); awaiter.Message = "Completed synchronously"; awaiter.IsCompleted = completeSynchronously; awaiter.GetResult = (Func<string>)(() => awaiter.Message); awaiter.OnCompleted = (Action<Action>) ( callback => ThreadPool.QueueUserWorkItem(state => { Thread.Sleep(TimeSpan.FromSeconds(1)); awaiter.Message = GetInfo(); if (callback != null) callback(); }) ); IAwaiter<string> proxy = Impromptu.ActLike(awaiter); result.GetAwaiter = (Func<dynamic>) ( () => proxy ); return result; } static string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } public interface IAwaiter<T> : INotifyCompletion { bool IsCompleted { get; } T GetResult(); }
这里咱们重复了5.9节的技巧,可是此次借助于动态表达式,能够使用NuGet来实现该目标。NuGet是一个包含了不少有用的库的包管理器。此次咱们将使用一个库来动态地建立,封装对象,实现咱们须要的接口。
首先咱们建立了ExpandoObject类型的两个实例,并把它们分配给动态的局部变量。这些变量将成为awaitable和awaiter对象。因为一个awaitable对象只须要拥有GetAwaiter方,法,提供该方法没有问题。使用dynamic关键字组合ExpandoOibect容许咱们自定义该对象,并经过分配相应的值来添加属性和方法。事实上它是一个字典类型的集合,键类型是string,值类型是object,若是你很熟悉JavaScript编程语言,你可能会注意到它与JavaScript对象很类似。
因为dynamic关键字容许咱们跳过C#的编译时检查。ExpandObject是以这样的方式编,写的:当你给属性分配值时, ExpandObject建立了一个字典条目,键是属性名,值是赋予的任何值。当尝试获取属性值时,会在字典中查找并提供存储在相应的字典条目中的值。若是该值是Action或Func类型,咱们实际上存储了一个委托,它能够当作方法使用。所以, ExpandoObject与dynamic类型的组合容许咱们建立一个对象并动态地赋予其属性和方法。
如今咱们须要构造自定义的awaiter和awaitable对象。先从awaiter开始。首先提供一个名为Message的属性并赋予初始值,而后使用Func<string>类型定义了GetResult方法.并分配一个lambda表达式,该表达式返回Message属性值。接下来实现IsCompleted属性。若是其值为true,则跳过剩下的工做并处理存储在result局部变量中的awaitable对象。咱们只须要添加一个方法用于返回该dynamic对象并从该对象返回awaiter对象。咱们能够使用 result做为await表达式。然而,它将会同步运行。
主要的挑战是在动态对象中实现异步处理。C#语言规格说明规定awaiter必须实现, INotifyCompletion或ICriticalNotifyCompletion接口,可是ExpandoObject却没有。甚至当咱们动态地实现OnCompleted方法并添加到awaiter对象时,这仍然行不通,由于该对象没有,实现上面提到的任何一个接口。
为了解决该问题,咱们使用了NuGet提供的Impromptulnterface库。它容许咱们使用 Impromptu.ActLike方法来动态地建立代理对象,该对象将实现任何须要的接口。若是咱们尝试建立一个实现了INotifyCompletion接口的代理,仍然行不通,由于该代理对象再也不是动态的,而且该接口只有OnCompleted方法,但没有IsCompleted属性或GetResult方法。做为最后的解决办法,咱们定义了一个泛型接口, IAwaiter<T>,它实现了INotifyCompletion并添加了全部须要的属性和方法。如今,咱们使用它生成代理并修改result对象来从GetAwaiter方法返回一个代理,而不是返回awaiter对象。如今程序能够工做了,咱们构造了一个在运行时彻底动态的awaitable对象。
编程须要对基本的数据结构和算法有所了解。程序员为并发状况选择最合适的数据结构,那就须要知道不少事情,例如算法运行时间、空间复杂度,以及大写0标记法等。在不一样的广为人知的场景中,咱们总知道哪一种数据结构更高效。
对于并行计算,咱们须要使用适当的数据结构。这些数据结构具有可伸缩性,尽量地, "避免锁,同时还能提供线程安全的访问。.NET framework版本4引入了System.Collections.Concurrent命名空间,其中包含了一些数据结构。在本章中,咱们将展现这些数据结构并经过简单的例子来讲明如何使用它们。
先从ConcurrentQueue开始。该集合使用了原子的比较和交换(Compare and Swap,简称CAS)操做,以及SpinWait来保证线程安全。它实现了一个先进先出( First In FirstOut,简称FIFO)的集合,这意味着元素出队列的顺序与加入队列的顺序是一致的。能够调用Enqueue方法向队列中加入元素。TryDequeue方法试图取出队列中的第一个元素,而 TryPeek方法则试图获得第一个元素但并不从队列中删除该元素。
ConcurrentStack的实现也没有使用任何锁,只采用了CAS操做。它是一个后进先出, (Last In First Out,简称LIFO)的集合,这意味着最近添加的元素会先返回。能够使用Push和PushRange方法添加元素,使用TryPop和TryPopRange方法获取元素,以及使用TryPeek方法检查元素。
ConcurrentBag是一个支持重复元素的无序集合。它针对这样如下状况进行了优化,即多个线程以这样的方式工做:每一个线程产生和消费本身的任务,极少与其余线程的任务交互 (若是要交互则使用锁),添加元素使用Add方法,检查元素使用TryPeek方法,获取元素使,用TryTake方法。
请避免使用上面说起的集合的Count属性。实现这些集合使用的是链表, Count操做的时间复杂度为0(N)。若是想检查集合是否为空,请使用IsEmpty属性,其时间复杂度为0(1),
ConcurrentDictionary是一个线程安全的字典集合的实现。对于读操做无需使用锁。可是对于写操做则须要锁。该并发字典使用多个锁,在字典桶之上实现了一个细粒度的锁模型。使用参数concurrencyLevel能够在构造函数中定义锁的数量,这意味着预估的线程数量将并发地更新该字典。
因为并发字典使用锁,因此一些操做须要获取该字典中的全部锁。若是不必请避免使用如下操做: Count, IsEmpty, Keys, Values, CopyTo及ToArray。
BlockingCollection是对IProducerConsumerCollection泛型接口的实现的一个高级封装。它有不少先进的功能来实现管道场景,即当你有一些步骤须要使用以前步骤运行的结果时。BlockingCollectione类支持以下功能:分块、调整内部集合容量、取消集合操做、从多个块集合中获取元素。
本节展现了一个很是简单的场景,比较在单线程环境中使用一般的字典集合与使用并发字典的性能。
class Program { static void Main(string[] args) { var concurrentDictionary = new ConcurrentDictionary<int, string>(); var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { dictionary[i] = Item; } } sw.Stop(); Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDictionary[i] = Item; } sw.Stop(); Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { CurrentItem = dictionary[i]; } } sw.Stop(); Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem = concurrentDictionary[i]; } sw.Stop(); Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed); Console.ReadKey(); } const string Item = "Dictionary item"; public static string CurrentItem; }
当程序启动时咱们建立了两个集合,其中一个是标准的字典集合,另外一个是新的并发字典集合。而后采用锁的机制向标准的字典中添加元素,并测量完成100万次迭代的时间。一样也采用一样的场景来测量ConcurrentDictionary的性能,最后比较从两个集合中获取值的性能。
经过这个很是简单的场景,咱们发现ConcurrentDictionary写操做比使用锁的一般的字典要慢得多,而读操做则要快些。所以若是对字典须要大量的线程安全的读操做, ConcurrentDictionary是最好的选择。
若是你对字典只须要多线程访问只读元素,则不必执行线程安全的读操做。在此场景中最好只使用一般的字典或ReadOnlyDictionary集合。
ConcurrentDictionary的实现使用了细粒度锁( fine-grained locking)技术,这在多线程写入方面比使用锁的一般的字典(也被称为粗粒度锁)的可伸缩性更好。正如本例中所示,当只用一个线程时,并发字典很是慢,可是扩展到5到6个线程(若是有足够的CPU核心来同时运行它们),并发字典的性能会更好。
本节将展现建立能被多个工做者异步处理的一组任务的例子
class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskQueue)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i-1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask {Id = i}; queue.Enqueue(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
当程序运行时,咱们使用ConcurrentQueue集合实例建立了一个任务队列。而后建立了一个取消标志,它是用来在咱们将任务放入队列后中止工做的。接下来启动了一个单独的工,做线程来将任务放入任务队列中。该部分为异步处理产生了工做量。
如今定义该程序中消费任务的部分。咱们建立了四个工做者,它们会随机等待一段时,间,而后从任务队列中获取一个任务,处理该任务,一直重复整个过程直到咱们发出取消标志信号。最后,咱们启动产生任务的线程,等待该线程完成。而后使用取消标志给消费者发信号咱们完成了工做。最后一步将等待全部的消费者完成。
咱们看到队列中的任务按从前到后的顺序被处理,但一个后面的任务是有可能会比前面的任务先处理的,由于咱们有四个工做者独立地运行,并且任务处理时间并非恒定的。我,们看到访问该队列是线程安全的,没有一个元素会被提取两次。
.本节是前一小节的细微修改版。咱们又一次建立了被多个工做者异步处理的一组任务,可是此次使用ConcurrentStack来实现并看看有什么不一样。
class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskStack = new ConcurrentStack<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskStack)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentStack<CustomTask> stack) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; stack.Push(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentStack<CustomTask> stack, string name, CancellationToken token) { await GetRandomDelay(); do { CustomTask workItem; bool popSuccesful = stack.TryPop(out workItem); if (popSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
当程序运行时,咱们建立了一个ConcurrentStack集合的实侈e其他的代码与前一小节中几乎同样,惟一不一样之处是咱们对并发堆栈使用Push和TryPop方法,而对并发队列使用Enqueue和TryDequeue方法。
如今能够看到任务处理的顺序被改变了。堆栈是一个LIFO集合,工做者先处理最近的,任务。在并发队列中,任务被处理的顺序与被添加的顺序几乎一致。这意味着根据工做者的!数量,咱们必将在必定时间窗内处理先被建立的任务。而在堆栈中,早先建立的任务具备较低的优先级,并且直到生产者中止向堆栈中放入更多任务后,该任务才有可能被处理。这种行为是肯定的,最好在该场景下使用队列。
本节展现了在多个独立的既可生产工做又可消费工做的工做者间如何扩展工做量。
class Program { static void Main(string[] args) { CreateLinks(); Task t = RunProgram(); t.Wait(); } static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>(); static async Task RunProgram() { var bag = new ConcurrentBag<CrawlingTask>(); string[] urls = new[] {"http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/"}; var crawlers = new Task[4]; for (int i = 1; i <= 4; i++) { string crawlerName = "Crawler " + i.ToString(); bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root"}); crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName)); } await Task.WhenAll(crawlers); Console.ReadKey(); } static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName) { CrawlingTask task; while (bag.TryTake(out task)) { IEnumerable<string> urls = await GetLinksFromContent(task); if (urls != null) { foreach (var url in urls) { var t = new CrawlingTask { UrlToCrawl = url, ProducerName = crawlerName }; bag.Add(t); } } Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!", task.UrlToCrawl, task.ProducerName, crawlerName); } } static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task) { await GetRandomDelay(); if (_contentEmulation.ContainsKey(task.UrlToCrawl)) return _contentEmulation[task.UrlToCrawl]; return null; } static void CreateLinks() { _contentEmulation["http://microsoft.com/"] = new [] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" }; _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" }; _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" }; _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" }; _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" }; _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentEmulation["http://facebook.com/"] = new [] { "http://facebook.com/a.html", "http://facebook.com/b.html" }; _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" }; _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" }; _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" }; _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" }; _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" }; _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" }; _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" }; } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(150, 200); return Task.Delay(delay); } class CrawlingTask { public string UrlToCrawl { get; set; } public string ProducerName { get; set; } } }
该程序模拟了使用多个网络爬虫进行网页索引的场景。网络爬虫是这样一个程序:它使用网页地址打开一个网页,索引该网页内容,尝试访问该页面包含的全部连接,而且也索引这些连接页面。刚开始,咱们定义了一个包含不一样网页URL的字典。该字典模拟了包含其,他页面连接的网页。该实现很是简单,并不关心索引已经访问过的页面,但正由于它如此简单咱们才能够关注并行工做负载。
接着建立了一个并发包,其中包含爬虫任务。咱们建立了四个爬虫,而且给每一个爬虫都提供了一个不一样的网站根URL,而后等待全部爬虫完成工做。如今每一个爬虫开始检索提供给,它的网站URL,咱们经过等待一个随机事件来模拟网络10处理。若是页面包含的URL越多,爬虫向包中放入的任务也会越多。而后检查包中是否还有任何须要爬虫处理的任务,若是没有说明爬虫完成了工做。
若是检查前四个根URL后的第一行输出内容,咱们将看到被爬虫N放置的任务一般会,被同一个爬虫处理。然而,接下来的行则会不一样。这是由于ConcurrentBag内部针对多个线程既能够添加元素又能够删除元素的场景进行了优化。实现方式是每一个线程使用本身的本地,队列的元素,因此使用该队列时无需任何锁。只有当本地队列中没有任何元素时,咱们才执,行一些锁定操做并尝试从其余线程的本地队列中“偷取”工做。这种行为有助于在全部工做,者间分发工做并避免使用锁。
本节将描述如何使用BlockingCollection来简化实现异步处理的工做负载。
class Program { static void Main(string[] args) { Console.WriteLine("Using a Queue inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(); t.Wait(); Console.WriteLine(); Console.WriteLine("Using a Stack inside of BlockingCollection"); Console.WriteLine(); t = RunProgram(new ConcurrentStack<CustomTask>()); t.Wait(); } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if(collection != null) taskCollection= new BlockingCollection<CustomTask>(collection); var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = "Processor " + i; processors[i - 1] = Task.Run( () => TaskProcessor(taskCollection, processorId)); } await taskSource; await Task.WhenAll(processors); } static async Task TaskProducer(BlockingCollection<CustomTask> collection) { for (int i = 1; i <= 20; i++) { await Task.Delay(20); var workItem = new CustomTask { Id = i }; collection.Add(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } collection.CompleteAdding(); } static async Task TaskProcessor( BlockingCollection<CustomTask> collection, string name) { await GetRandomDelay(); foreach (CustomTask item in collection.GetConsumingEnumerable()) { Console.WriteLine("Task {0} has been processed by {1}", item.Id, name); await GetRandomDelay(); } } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
先说第一个场景,这里咱们使用了BlockingCollection类,它带来了不少优点。首先,咱们可以改变任务存储在阻塞集合中的方式。默认状况下它使用的是ConcurrentQueue容器,可是咱们可以使用任何实现了IProducerConsumerCollection泛型接口的集合。为了演示该点,咱们运行了该程序两次,第二次时使用ConcurrentStack做为底层集合。
工做者经过对阻塞集合迭代调用GetConsumingEnumerable方法来获取工做项。若是在该集合中没有任何元素,迭代器会阻塞工做线程直到有元素被放置到集合中。当生产者调用集合的CompleteAdding时该迭代周期会结束。这标志着工做完成了。
这里很容易犯一个错误,即对BlockingCollection进行迭代,由于它自身实现了IEnumerable接口。不要忘记使用GetConsumingEnumerable,不然你迭代的只是集合的“快照”,这并非指望的程序行为。
工做量生产者将任务插入到BlockingCollection而后调用CompleteAdding方法,这会使全部工做者完成工做。如今在程序输出中咱们看到两个结果序列,演示了并发队列和堆栈集合的不一样之处。
NET Framework库中有个子集叫作并行库,一般被称为并行框架扩展( Parallel Framework Extensions,简称PFX),这是这些库很是早期的版本的名称。并行库随着.NET Framework 4.0一块儿发布,包含三大主要部分:
事实上咱们将 "程序分割成一组任务并使用不一样的线程来运行不一样的任务。这种方式被称为任务并行( task parallelism), 目前咱们只学习了任务并行。.
想象一下咱们有一个程序针对一组大数据进行重量级运算。并行运行该程最容易的方式,是将该组数据分割成较小的数据块,对这些数据块进行并行计算,而后聚合这些计算结果。这种编程模型称为数据并行(data parallelism)。
任务并行是最底层的抽象层。咱们将程序定义为任务的组合,显式地定义这些任务如何组合。由此方式组成的程序会很是复杂和细节化。并行操做被定义在该程序的不一样位置,随着并行操做的增加,程序变得愈来愈难理解和维护。采用这种方式来并行程序被称为无结构的并行(unstructured parallelism),这就是咱们为复杂的并行逻辑付出的代价。
然而,当咱们有较简单的程序逻辑时,咱们能够将更多的并行细节推给PFX库和C#编译器。例如,咱们能够说, “我想以并行方式运行这三个方法,但我不关心是如何实现并行的,让NET基础设施决定细节。”这产生了一个抽象层使得咱们不用提供一个关于如何实现并行的细节描述。这种方式被称为结构并行( structured parallelism),由于并行一般是一组声明,而且在程序中每一个并行状况并定义在确切的地方。
这可能致使一种印象,即无结构并行是一种很差的实践,应该始终使用结构并行替代它。我想强调这一点是不对的。结构并行确实更易维护,应该尽量地使用,可是它并非万能的。一般有不少状况咱们不能简单地使用结构并行,那么以非结构化的方式使用TPL任务并行也是彻底能够的。
任务并行库中有一个名为Parallel的类,其提供了一组API用来实现结构并行。它仍然是TPL的一部分,咱们在本章介绍它的缘由是它是从较低的抽象层向较高的抽象层过渡的完美例子。当使用Parallel类的API时,咱们无需提供分割工做的细节。可是咱们仍要显式定义如何从分割的结果中获得单个结果。
PLINQ具备最高级抽象。它自动将数据分割为数据块,而且决定是否真的须要并行化查询,或者使用一般的顺序查询处理更高效。PLINO基础设施会将分割任务的执行结果组合到一块儿。有不少选项可供程序员来优化查询,使用尽量高的性能获取结果。
在本章中咱们将涵盖Parallel类的用法以及不少不一样的PLINQ选项,例如让LINQ查询并行化,设置异常模型及设置PLINQ查询的并行等级,处理查询项的顺序,以及处理, PLINQ异常。咱们也会学习如何管理PLINO查询的数据分割。
本节展现了如何使用Parallel类的API,咱们将学习如何并行地调用方法,如何执行并, "行的循环,以及调整并行机制。
class Program { static void Main(string[] args) { Parallel.Invoke( () => EmulateProcessing("Task1"), () => EmulateProcessing("Task2"), () => EmulateProcessing("Task3") ); var cts = new CancellationTokenSource(); var result = Parallel.ForEach( Enumerable.Range(1, 30), new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = Environment.ProcessorCount, TaskScheduler = TaskScheduler.Default }, (i, state) => { Console.WriteLine(i); if (i == 20) { state.Break(); Console.WriteLine("Loop is stopped: {0}", state.IsStopped); } }); Console.WriteLine("---"); Console.WriteLine("IsCompleted: {0}", result.IsCompleted); Console.WriteLine("Lowest break iteration: {0}", result.LowestBreakIteration); Console.ReadKey(); } static string EmulateProcessing(string taskName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250, 350))); Console.WriteLine("{0} task was processed on a thread id {1}", taskName, Thread.CurrentThread.ManagedThreadId); return taskName; } }
该程序演示了Parallel类的不一样功能。与在任务并行库中定义任务的方式相比,调用 "Invoke方法能够免去不少麻烦就可实现并行地运行多个任务。Invoke方法会阻塞其余线程直到全部的任务都被完成,这是一个很是常见的方面使用Invoke方法的场景。
下一个功能是并行循环,使用For和ForEach方法来定义循环。由ForEach方法与For方法很是类似,咱们将仔细讲解ForEach方法。并行ForEach循环能够经过给每一个集合项应用一个action委托的方式,实现并行地处理任何IEnumerable集合。咱们能够提供几种选项,自定义并行行为,并获得一个结果来讲明循环是否成功完成。
能够给ForEach方法提供一个ParallelOptions类的实例来控制并行循环。其容许咱们使用CancellationToken取消循环,限制最大并行度(并行运行的最大操做数),还能够提供一个自定义的TaskScheduler类来调度任务。Action能够接受一个附加的ParallelLoopState参数.可用于从循环中跳出或者检查当前循环的状态。
使用ParallelLoopState有两种方式中止并行循环。既能够使用Break方法,也能够使用Stop方法。Stop方法告诉循环中止处理任何工做,并设置并行循环状态属性, IsStopped值为true, Break方法中止其以后的迭代,但以前的迭代还要继续工做。在那,种状况下,循环结果的LowestBreaklteration属性将会包含当Break方法被调用时的最低,循环次数。
本节将描述如何使用PLINQ来并行化查询,以及如何将并行查询改成顺序处理。
class Program { static void Main(string[] args) { var parallelQuery = from t in GetTypes().AsParallel() select EmulateProcessing(t); var cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(3)); try { parallelQuery .WithDegreeOfParallelism(Environment.ProcessorCount) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.Default) .WithCancellation(cts.Token) .ForAll(Console.WriteLine); } catch (OperationCanceledException) { Console.WriteLine("---"); Console.WriteLine("Operation has been canceled!"); } Console.WriteLine("---"); Console.WriteLine("Unordered PLINQ query execution"); var unorderedQuery = from i in ParallelEnumerable.Range(1, 30) select i; foreach (var i in unorderedQuery) { Console.WriteLine(i); } Console.WriteLine("---"); Console.WriteLine("Ordered PLINQ query execution"); var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered() select i; foreach (var i in orderedQuery) { Console.WriteLine(i); } Console.ReadKey(); } static string EmulateProcessing(string typeName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250,350))); Console.WriteLine("{0} type was processed on a thread id {1}", typeName, Thread.CurrentThread.ManagedThreadId); return typeName; } static IEnumerable<string> GetTypes() { return from assembly in AppDomain.CurrentDomain.GetAssemblies() from type in assembly.GetExportedTypes() where type.Name.StartsWith("Web") orderby type.Name.Length select type.Name; } }
当程序运行时,咱们建立了一个LINQ查询,其使用反射API来查询加载到当前应用程,序域中的全部组件中名称以“Web"开头的类型。咱们使用EmulateProcessing方法模拟处理每一个项时间的延迟,并使用PrintInfo方法打印结果。咱们也使用了Stopwatch类来测量每一个查询的执行时间。
首先咱们运行了一个一般的顺序LINQ查询。此时并无并行化,全部任何操做都运,行在当前线程。该查询的第二版显式地使用了ParallelEnumerable类。ParallelEnumerable包含了PLINO的逻辑实现,而且做为IEnumerable集合功能的一组扩展方法。一般无需显式,地使用该类,在这里是为了演示PLINQ的实际工做方式。第二个版本以并行的方式运行, "EmulateProcessing操做。然而,默认状况下结果会被合并到单个线程中,因此查询的执行时,间应该比第一个版本少几秒。
第三个版本展现了如何使用AsParallel方法来将LINO查询按声明的方式并行化运行。这里咱们并不关心实现细节,只是为了说明咱们想以并行的方式运行。然而,该版本的关键不一样处是咱们使用了ForAll方法来打印查询结果。打印结果操做与任务被处理的线程是同一个线程,跳过告终果合并步骤。它容许咱们也能以并行的方式运行PrintInfo方法,甚至该版本运行速度比以前的版本更快。
最后一个例子展现了如何使用AsSequential方法将PLINQ查询以顺序方式运行。能够看到该查询运行方式与第一个示例彻底同样。
若是在客户端运行程序,最重要的事情之一是有一个响应的用户界面。这意味着不管应用程序发生什么,全部的用户界面元素(好比按钮和进度条)都要保持快速运行,用户可以从应用程序获得快速响应。达到该点并不容易!若是你尝试在Windows系统中打开记事本编辑器并加载一个有几个兆字节大小的文档,应用程序窗口将冻结一段显著的时间,由于整个文档要先从硬盘中加载,而后程序才能开始处理用户输入。
这是一个很是重要的问题,在该状况下,惟一方案是不管如何都要避免阻塞UI线程。这反过来意味着为了防止阻塞UI线程,每一个与UI有关的API必须只被容许异步调用。这是Window 8操做系统从新升级API的关键缘由,其几乎把每一个方法替换为异步方式。可是若是应用程序使用多线程来达到此目的会影响性能吗?固然会!然而考虑到只有一个用户,那么这是划算的。若是应用程序能够使用电脑的全部能力从而变得更加高效,并且该能力只为运行程序的惟一用户服务,这是好事。
接下来看看第二种状况。若是程序运行在服务器端,则是彻底不一样的情形。可伸缩性是最高优先级,这意味着单个用户消耗越少的资源越好。若是为每一个用户建立多个线程,则!可伸缩性并很差。以高效的方式来平衡应用程序资源的消耗是个很是复杂的问题。例如,在ASPNET (其是微软提供的web应用程序平台)中,咱们使用工做线程池来服务客户端请求。该池的工做线程数是有限的,因此不得不最小化每一个工做线程的使用时间以便达到高伸缩性。这意味着须要把工做线程越快越好地放回到池中,从而能够服务下一个请求。若是咱们启动了一个须要计算的异步操做,则整个工做流程会很低效。首先从线程池中取出一个工做线程用以服务客户端请求。而后取出另外一个工做线程并开始处理异步操做。如今有两个工做线程都在处理请求,若是第一个线程能作些有用的事则很是好!遗憾的是,一般状况是咱们简单等待异步操做完成,可是咱们却消费了两个工做线程,而不是一个。在该场景中,异步比同步执行实际上更糟糕!咱们不须要使用全部CPU核心,由于咱们已经在服务不少客户端,它们已经使用了CP的全部计算能力。咱们无须保持第一个线程响应,由于这没有用户界面。那么为何咱们应该在服务器端使用异步呢?
答案是只有异步输人/输出操做才应该使用异步。目前,现代计算机一般有一个磁盘驱动器来存储文件,一块网卡来经过网络发送与接收数据。全部这些设备都有本身的微型计算机,以很是底层的方式来管理输入/输出操做并发信号给操做系统结果。这又是一个很是复杂的主题。但为了让概念清楚,咱们能够这样说,有一种方式让程序员开始一个输人/输出,操做,并提供给操做系统一段代码,当操做完成后被该代码会被调用。在启动I/O任务与完我之间,并不须要CPU工做。这是由相应的磁盘和网络控制器的微型计算机完成的。这种执行I/O任务的方式被称为I/O线程。实现时使用的是,NET线程池,而且使用了一个来自操做系统的基础设施,叫作I/O完成端口。
在APSNET中,一旦有一个异步的I/O操做在工做线程中开始时,它会被当即返回到线程池中。当该操做继续运行时,该线程能够服务其余的客户端。最终,当操做发出信号完成时, ASPNET基础设施从线程池中获取一个空闲的工做线程(该线程可能与操做开始时的!线程不一样),而后会完成该操做。
好的,咱们如今了解了I/O线程对服务器应用程序的重要性。遗憾的是,很难看出,哪些API在底层使用了I/O线程。除了学习源代码外,惟一的方式是简单知道哪一个NET , Framework类库对I/O线程进行了优化。在本章中,咱们将学习如何使用一些这样的API,咱们将学习如何异步操做文件,如何使用网络I/O来建立一个HTTP服务器并调用Windows Communication Foundation服务,以及如何使用异步API来查询数据库。
另外一个须要考虑的重要问题是并行。因为一些缘由,集中地并行磁盘操做可能致使很低的性能。请记住并行I/O操做常常很是低效,顺序执行I/O要好一些,可是要以异步的方式执行。
本节讲述了如何建立一个文件,而且以异步的方式读写数据。
internal class Program { static void Main(string[] args) { var t = ProcessAsynchronousIO(); t.GetAwaiter().GetResult(); Console.ReadKey(); } const int BUFFER_SIZE = 4096; async static Task ProcessAsynchronousIO() { using (var stream = new FileStream("test1.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE)) { Console.WriteLine("1. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = new FileStream("test2.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) { Console.WriteLine("2. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = File.Create("test3.txt", BUFFER_SIZE, FileOptions.Asynchronous)) using (var sw = new StreamWriter(stream)) { Console.WriteLine("3. Uses I/O Threads: {0}", stream.IsAsync); await sw.WriteAsync(CreateFileContent()); } using (var sw = new StreamWriter("test4.txt", true)) { Console.WriteLine("4. Uses I/O Threads: {0}", ((FileStream)sw.BaseStream).IsAsync); await sw.WriteAsync(CreateFileContent()); } Console.WriteLine("Starting parsing files in parallel"); Task<long>[] readTasks = new Task<long>[4]; for (int i = 0; i < 4; i++) { readTasks[i] = SumFileContent(string.Format("test{0}.txt", i + 1)); } long[] sums = await Task.WhenAll(readTasks); Console.WriteLine("Sum in all files: {0}", sums.Sum()); Console.WriteLine("Deleting files..."); Task[] deleteTasks = new Task[4]; for (int i = 0; i < 4; i++) { string fileName = string.Format("test{0}.txt", i + 1); deleteTasks[i] = SimulateAsynchronousDelete(fileName); } await Task.WhenAll(deleteTasks); Console.WriteLine("Deleting complete."); } static string CreateFileContent() { var sb = new StringBuilder(); for (int i = 0; i < 100000; i++) { sb.AppendFormat("{0}", new Random(i).Next(0, 99999)); sb.AppendLine(); } return sb.ToString(); } async static Task<long> SumFileContent(string fileName) { using (var stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) using (var sr = new StreamReader(stream)) { long sum = 0; while (sr.Peek() > -1) { string line = await sr.ReadLineAsync(); sum += long.Parse(line); } return sum; } } static Task SimulateAsynchronousDelete(string fileName) { return Task.Run(() => File.Delete(fileName)); } }
当程序运行时,咱们以不一样的方式建立了4个文件,而且填充了随机数据。在第一个例 子中,使用的是FileStream类以及其方法,将异步编程模型API转换成任务。第二个例子中也同样,可是给FileStream构造函数提供了FileStrearn.Asynchronous参数。
使用FileOptions.Asynchronous选项是很是重要的。若是忽略该选项,咱们依然能够以异步的方式使用该文件,但这只是在线程池中的异步委托调用。只有提供了该选项(或者在另外一个构造函数重载中使用bool useAsync),才能对FileStream类使用异步1O,
第三个例子使用了一些简化的API,好比File.Create方法和StreamWrite类。它也使用 1/0线程,咱们能够使用Stream.IsAsync属性来检查。最后一个例子说明了过度简化也很差。这里咱们借助于异步委托调用来模拟异步1O,其实并无使用异步1O。
接着并行地异步地从全部文件中读取数据,统计每一个文件内容,而后求总和。最后,删除全部文件。因为在任何非Windows商店应用程序中并无异步删除文件的API,咱们使用 Task.Run工厂方法来模拟异步删除文件。
本节展现了如何编写一个简单的异步HTTP服务器。
class Program { static void Main(string[] args) { var server = new AsyncHttpServer(portNumber: 1234); var t = Task.Run(() => server.Start()); Console.WriteLine("Listening on port 1234. Open http://localhost:1234 in your browser."); Console.WriteLine("Trying to connect:"); Console.WriteLine(); GetResponseAsync("http://localhost:1234").GetAwaiter().GetResult(); Console.WriteLine(); Console.WriteLine("Press Enter to stop the server."); Console.ReadLine(); server.Stop().GetAwaiter().GetResult(); Console.ReadKey(); } static async Task GetResponseAsync(string url) { using (var client = new HttpClient()) { HttpResponseMessage responseMessage = await client.GetAsync(url); string responseHeaders = responseMessage.Headers.ToString(); string response = await responseMessage.Content.ReadAsStringAsync(); Console.WriteLine("Response headers:"); Console.WriteLine(responseHeaders); Console.WriteLine("Response body:"); Console.WriteLine(response); } } class AsyncHttpServer { readonly HttpListener _listener; const string RESPONSE_TEMPLATE = "<html><head><title>Test</title></head><body><h2>Test page</h2><h4>Today is: {0}</h4></body></html>"; public AsyncHttpServer(int portNumber) { _listener = new HttpListener(); _listener.Prefixes.Add(string.Format("http://+:{0}/", portNumber)); } public async Task Start() { _listener.Start(); while (true) { var ctx = await _listener.GetContextAsync(); Console.WriteLine("Client connected..."); var response = string.Format(RESPONSE_TEMPLATE, DateTime.Now); using (var sw = new StreamWriter(ctx.Response.OutputStream)) { await sw.WriteAsync(response); await sw.FlushAsync(); } } } public async Task Stop() { _listener.Abort(); } } }
这里咱们经过HttpListener类实现了一个很是简单的web服务器。也使用了TcpListener类进行TCP套接字10操做。咱们配置该监听器接收任何主机到本地机器1234端口的链接。而后在单独的工做线程中启动该监听器,从而在主线程中能够控制该监听器。
当使用GetContextAsync方法时会发生异步I/O操做。遗憾的是,其并不接收, CancellationToken从而实现取消功能。因此若是想关闭该服务器,只需调用listener.Abort.方法,这将丢弃全部链接并关闭该服务器。
为了对该服务器执行一个异步请求,咱们使用了统一命名空间下的System.Net.Http集合中的HttpClient类。咱们使用Get.Async方法来发起一个异步的HTTP GET请求。还有其余的方法用于发起其余HTTP请求,好比POST, DELETE以及PUT, HttpClient还有不少其余,的选项,好比使用不一样的格式(好比XML和JSON)来序列化和反序列化对象,指定代理服,务器地址,认证以及其余配置。
当运行该程序时,能够看到该服务器被启动起来。在服务器端代码中,咱们使用, GetContextAsync方法来接收新的客户端链接。当有新的客户端链接时该方法就会返回,我,们简单的输出一个包含当前日期和时间的很是基础的HTML做为响应。而后咱们请求服务器,并打印出响应头和内容。你也能够打开浏览器访问http://localhost:1234/地址。你将看到相同的响应结果显示在浏览器窗口。
本节演示了建立数据库,以及异步地操做数据、读取数据的过程。
class Program { static void Main(string[] args) { const string dataBaseName = "CustomDatabase"; var t = ProcessAsynchronousIO(dataBaseName); t.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } async static Task ProcessAsynchronousIO(string dbName) { try { const string connectionString = @"Data Source=(LocalDB)\v11.0;Initial Catalog=master;Integrated Security=True"; string outputFolder = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); string dbFileName = Path.Combine(outputFolder, string.Format(@".\{0}.mdf", dbName)); string dbLogFileName = Path.Combine(outputFolder, string.Format(@".\{0}_log.ldf", dbName)); string dbConnectionString = string.Format(@"Data Source=(LocalDB)\v11.0;AttachDBFileName={1};Initial Catalog={0};Integrated Security=True;", dbName, dbFileName); using (var connection = new SqlConnection(connectionString)) { await connection.OpenAsync(); if (File.Exists(dbFileName)) { Console.WriteLine("Detaching the database..."); var detachCommand = new SqlCommand("sp_detach_db", connection); detachCommand.CommandType = CommandType.StoredProcedure; detachCommand.Parameters.AddWithValue("@dbname", dbName); await detachCommand.ExecuteNonQueryAsync(); Console.WriteLine("The database was detached succesfully."); Console.WriteLine("Deleteing the database..."); if(File.Exists(dbLogFileName)) File.Delete(dbLogFileName); File.Delete(dbFileName); Console.WriteLine("The database was deleted succesfully."); } Console.WriteLine("Creating the database..."); string createCommand = String.Format("CREATE DATABASE {0} ON (NAME = N'{0}', FILENAME = '{1}')", dbName, dbFileName); var cmd = new SqlCommand(createCommand, connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("The database was created succesfully"); } using (var connection = new SqlConnection(dbConnectionString)) { await connection.OpenAsync(); var cmd = new SqlCommand("SELECT newid()", connection); var result = await cmd.ExecuteScalarAsync(); Console.WriteLine("New GUID from DataBase: {0}", result); cmd = new SqlCommand(@"CREATE TABLE [dbo].[CustomTable]( [ID] [int] IDENTITY(1,1) NOT NULL, [Name] [nvarchar](50) NOT NULL, CONSTRAINT [PK_ID] PRIMARY KEY CLUSTERED ([ID] ASC) ON [PRIMARY]) ON [PRIMARY]", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Table was created succesfully."); cmd = new SqlCommand(@"INSERT INTO [dbo].[CustomTable] (Name) VALUES ('John'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Peter'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('James'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Eugene');", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Inserted data succesfully"); Console.WriteLine("Reading data from table..."); cmd = new SqlCommand(@"SELECT * FROM [dbo].[CustomTable]", connection); using (SqlDataReader reader = await cmd.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { var id = reader.GetFieldValue<int>(0); var name = reader.GetFieldValue<string>(1); Console.WriteLine("Table row: Id {0}, Name {1}", id, name); } } } } catch(Exception ex) { Console.WriteLine("Error: {0}", ex.Message); } } }
该程序使用了一个软件,叫作SOL Server 2012 LocalDb,安装Visual Studio 2012时会附带安装它,应该能正常使用。可是若是有什么错误,你能够经过安装向导来修复该组件。
先要配置数据库文件的存放路径。咱们将数据库文件放置在应用程序执行目录中。有两个文件,一个是数据库自己,另外一个是事务日志文件。咱们也配置了两个链接字符串来定义如何链接数据库。第一个字符串是链接到LocalDb引擎来分离数据库。若是数据库已经存在、则删除并重建。当打开链接以及单独使用OpenAsync和ExecuteNonQueryAsync方法执,行SQL命令时、咱们使用了10异步操做。
在该任务完成后,咱们附加了一个最新建立的数据库。咱们建立了一张新的表并插入了一些数据。除了以前提到的方法,咱们还使用了ExecuteScalarAsync来异步地从数据库引擎中获得一个标量值,而且使用SqIDataReaderReadAsync方法来从数据库表中异步地读取数据行。
若是在数据库有一个大数据量的表,里面数据行中包含大数据量的二进制值,能够使用CommandBehavior.SequentialAcess枚举来建立数据阅读器异步地经过数据阅读器获取大字段值。,并使用GetFieldValueAsync方法
本节描述了如何建立一个WCF服务,并宿主在命令行应用程序中。客户端能够访问服务元数据,并以异步的方式消费它
请执行如下步骤来了解如何使用WCF服务:
using System; using System.ServiceModel; using System.ServiceModel.Description; using System.Threading.Tasks;
const string SERVICE_URL = "http://localhost:1234/HelloWorld"; static async Task RunServiceClient() { var endpoint = new EndpointAddress(SERVICE_URL); var channel = ChannelFactory<IHelloWorldServiceClient>.CreateChannel(new BasicHttpBinding(), endpoint); var greeting = await channel.GreetAsync("Eugene"); Console.WriteLine(greeting); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldService { [OperationContract] string Greet(string name); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldServiceClient { [OperationContract] string Greet(string name); [OperationContract] Task<string> GreetAsync(string name); } public class HelloWorldService : IHelloWorldService { public string Greet(string name) { return string.Format("Greetings, {0}!", name); } }
ServiceHost host = null; try { host = new ServiceHost(typeof (HelloWorldService), new Uri(SERVICE_URL)); var metadata = host.Description.Behaviors.Find<ServiceMetadataBehavior>(); if (null == metadata) { metadata = new ServiceMetadataBehavior(); } metadata.HttpGetEnabled = true; metadata.MetadataExporter.PolicyVersion = PolicyVersion.Policy15; host.Description.Behaviors.Add(metadata); host.AddServiceEndpoint(ServiceMetadataBehavior.MexContractName, MetadataExchangeBindings.CreateMexHttpBinding(),"mex"); var endpoint = host.AddServiceEndpoint(typeof (IHelloWorldService), new BasicHttpBinding(), SERVICE_URL); host.Faulted += (sender, e) => Console.WriteLine("Error!"); host.Open(); Console.WriteLine("Greeting service is running and listening on:"); Console.WriteLine("{0} ({1})", endpoint.Address, endpoint.Binding.Name); var client = RunServiceClient(); client.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine("Error in catch block: {0}", ex); } finally { if (null != host) { if (host.State == CommunicationState.Faulted) { host.Abort(); } else { host.Close(); } } }
Windows Communication Foundation (简称WCF)是一个框架,用于以不一样的方式调用,远程服务。其中一个有一段时间很是流行,用于经过HTTP使用基于XML的协议来调用远,程服务,它叫作简单对象访问协议(Simple Object Access Protocol,简称SOAP)。
Visual Studio 2012对WCF服务有着很是丰富的支持。例如,你能够使用添加服务引用,菜单项给这样的服务添加引用。你也可对本节中的服务使用此功能,由于咱们提供了服务元数据。
为了建立这样的服务,咱们须要使用ServiceHost类来宿主咱们的服务。咱们经过提供,一个服务实现类型和服务地址URL来描述如何宿主服务。而后配置了元数据终端和服务终,端。最后,使用Faulted事件来处理错误,并运行该宿主服务。
为了消费该服务,咱们建立了一个客户端,这是主要的技巧所在。在服务器端,咱们有,.一个服务,是一个普通的同步方法,叫作Greet,服务契约1HelloWorldService定义了该方,法。然而,若是想使用异步网络1O,咱们须要异步地调用该方法。能够经过使用匹配的命名空间和服务名来建立一个新的服务契约,而后同时定义同步方法和基于任务的异步方法。尽管事实上在服务器端咱们没有异步方法,可是若是咱们遵循命名约定, WCF基础设施明白,咱们想建立一个异步的代理方法。
所以,当咱们建立一个1HelloworldServiceClient代理渠道, WCF会正确地路由一个异步调用到该服务器端同步方法。若是你运行程序,而后打开浏览器并使用该服务的URL http://localhost: 1234/Helloworld来访问该服务。你会看到该服务的描述,还能够浏览XML元数据,该元数据可用于从Visual Studio 2012添加服务引用。若是你尝试生成引用,将看到稍,微有点复杂的代码,但它是自动建立的,而且易于使用。