C#细说多线程

引言html

本文主要从线程的基础用法,CLR线程池当中工做者线程与I/O线程的开发,并行操做PLINQ等多个方面介绍多线程的开发。
其中委托的BeginInvoke方法以及回调函数最为经常使用。
而 I/O线程可能容易遭到你们的忽略,其实在开发多线程系统,更应该多留意I/O线程的操做。特别是在ASP.NET开发当中,可能更多人只会留意在客户端使用Ajax或者在服务器端使用UpdatePanel。其实合理使用I/O线程在通信项目或文件下载时,能尽量地减小IIS的压力。
并行编程是Framework4.0中极力推广的异步操做方式,更值得更深刻地学习。
但愿本篇文章能对各位的学习研究有所帮助,当中有所错漏的地方敬请点评。web

 

 

目录数据库

1、线程的定义编程

2、线程的基础知识数组

3、以ThreadStart方式实现多线程浏览器

4、CLR线程池的工做者线程安全

5、CLR线程池的I/O线程服务器

6、异步 SqlCommand网络

7、并行编程与PLINQ多线程

8、计时器与锁

 

 

 

 

1、线程的定义

 1. 1 进程、应用程序域与线程的关系

进程(Process)是Windows系统中的一个基本概念,它包含着一个运行程序所须要的资源。进程之间是相对独立的,一个进程没法访问另外一个进程的数据(除非利用分布式计算方式),一个进程运行的失败也不会影响其余进程的运行,Windows系统就是利用进程把工做划分为多个独立的区域的。进程能够理解为一个程序的基本边界。

应用程序域(AppDomain)是一个程序运行的逻辑区域,它能够视为一个轻量级的进程,.NET的程序集正是在应用程序域中运行的,一个进程能够包含有多个应用程序域,一个应用程序域也能够包含多个程序集。在一个应用程序域中包含了一个或多个上下文context,使用上下文CLR就可以把某些特殊对象的状态放置在不一样容器当中。

线程(Thread)是进程中的基本执行单元,在进程入口执行的第一个线程被视为这个进程的主线程。在.NET应用程序中,都是以Main()方法做为入口的,当调用此方法时系统就会自动建立一个主线程。线程主要是由CPU寄存器、调用栈和线程本地存储器(Thread Local Storage,TLS)组成的。CPU寄存器主要记录当前所执行线程的状态,调用栈主要用于维护线程所调用到的内存与数据,TLS主要用于存放线程的状态信息。

进程、应用程序域、线程的关系以下图,一个进程内能够包括多个应用程序域,也有包括多个线程,线程也能够穿梭于多个应用程序域当中。但在同一个时刻,线程只会处于一个应用程序域内。

 

 
因为本文是以介绍多线程技术为主题,对进程、应用程序域的介绍就到此为止。关于进程、线程、应用程序域的技术,在“C#综合揭秘——细说进程、应用程序域与上下文”会有详细介绍。

 

1.2 多线程

在单CPU系统的一个单位时间(time slice)内,CPU只能运行单个线程,运行顺序取决于线程的优先级别。若是在单位时间内线程未能完成执行,系统就会把线程的状态信息保存到线程的本地存储器(TLS) 中,以便下次执行时恢复执行。而多线程只是系统带来的一个假像,它在多个单位时间内进行多个线程的切换。由于切换频密并且单位时间很是短暂,因此多线程可被视做同时运行。

适当使用多线程能提升系统的性能,好比:在系统请求大容量的数据时使用多线程,把数据输出工做交给异步线程,使主线程保持其稳定性去处理其余问题。但须要注意一点,由于CPU须要花费很多的时间在线程的切换上,因此过多地使用多线程反而会致使性能的降低。

 

返回目录

2、线程的基础知识

2.1 System.Threading.Thread类

System.Threading.Thread是用于控制线程的基础类,经过Thread能够控制当前应用程序域中线程的建立、挂起、中止、销毁。

它包括如下经常使用公共属性:

属性名称 说明
CurrentContext 获取线程正在其中执行的当前上下文。
CurrentThread 获取当前正在运行的线程。
ExecutionContext 获取一个 ExecutionContext 对象,该对象包含有关当前线程的各类上下文的信息。
IsAlive 获取一个值,该值指示当前线程的执行状态。
IsBackground 获取或设置一个值,该值指示某个线程是否为后台线程。
IsThreadPoolThread 获取一个值,该值指示线程是否属于托管线程池。
ManagedThreadId 获取当前托管线程的惟一标识符。
Name 获取或设置线程的名称。
Priority 获取或设置一个值,该值指示线程的调度优先级。
ThreadState 获取一个值,该值包含当前线程的状态。

 

2.1.1 线程的标识符

ManagedThreadId是确认线程的惟一标识符,程序在大部分状况下都是经过Thread.ManagedThreadId来辨别线程的。而Name是一个可变值,在默认时候,Name为一个空值 Null,开发人员能够经过程序设置线程的名称,但这只是一个辅助功能。

 

2.1.2 线程的优先级别

.NET为线程设置了Priority属性来定义线程执行的优先级别,里面包含5个选项,其中Normal是默认值。除非系统有特殊要求,不然不该该随便设置线程的优先级别。

成员名称 说明
Lowest 能够将 Thread 安排在具备任何其余优先级的线程以后。
BelowNormal 能够将 Thread 安排在具备 Normal 优先级的线程以后,在具备 Lowest 优先级的线程以前。
Normal 默认选择。能够将 Thread 安排在具备 AboveNormal 优先级的线程以后,在具备BelowNormal 优先级的线程以前
AboveNormal 能够将 Thread 安排在具备 Highest 优先级的线程以后,在具备 Normal 优先级的线程以前。
Highest 能够将 Thread 安排在具备任何其余优先级的线程以前。

 

2.1.3 线程的状态

经过ThreadState能够检测线程是处于Unstarted、Sleeping、Running 等等状态,它比 IsAlive 属性能提供更多的特定信息。

前面说过,一个应用程序域中可能包括多个上下文,而经过CurrentContext能够获取线程当前的上下文。

CurrentThread是最经常使用的一个属性,它是用于获取当前运行的线程。

 

2.1.4 System.Threading.Thread的方法

Thread 中包括了多个方法来控制线程的建立、挂起、中止、销毁,之后来的例子中会常用。

方法名称 说明
Abort()     终止本线程。
GetDomain() 返回当前线程正在其中运行的当前域。
GetDomainId() 返回当前线程正在其中运行的当前域Id。
Interrupt() 中断处于 WaitSleepJoin 线程状态的线程。
Join() 已重载。 阻塞调用线程,直到某个线程终止时为止。
Resume() 继续运行已挂起的线程。
Start()   执行本线程。
Suspend() 挂起当前线程,若是当前线程已属于挂起状态则此不起做用
Sleep()   把正在运行的线程挂起一段时间。

 

2.1.5 开发实例

如下这个例子,就是经过Thread显示当前线程信息

按 Ctrl+C 复制代码
按 Ctrl+C 复制代码

 

运行结果

 

2.2  System.Threading 命名空间

在System.Threading命名空间内提供多个方法来构建多线程应用程序,其中ThreadPool与Thread是多线程开发中最经常使用到的,在.NET中专门设定了一个CLR线程池专门用于管理线程的运行,这个CLR线程池正是经过ThreadPool类来管理。而Thread是管理线程的最直接方式,下面几节将详细介绍有关内容。

类     说明
AutoResetEvent 通知正在等待的线程已发生事件。没法继承此类。
ExecutionContext 管理当前线程的执行上下文。没法继承此类。
Interlocked 为多个线程共享的变量提供原子操做。
Monitor 提供同步对对象的访问的机制。
Mutex 一个同步基元,也可用于进程间同步。
Thread 建立并控制线程,设置其优先级并获取其状态。
ThreadAbortException 在对 Abort 方法进行调用时引起的异常。没法继承此类。
ThreadPool 提供一个线程池,该线程池可用于发送工做项、处理异步 I/O、表明其余线程等待以及处理计时器。
Timeout 包含用于指定无限长的时间的常数。没法继承此类。
Timer 提供以指定的时间间隔执行方法的机制。没法继承此类。
WaitHandle 封装等待对共享资源的独占访问的操做系统特定的对象。


在System.Threading中的包含了下表中的多个经常使用委托,其中ThreadStart、ParameterizedThreadStart是最经常使用到的委托。
由ThreadStart生成的线程是最直接的方式,但由ThreadStart所生成并不受线程池管理。
而ParameterizedThreadStart是为异步触发带参数的方法而设的,在下一节将为你们逐一细说。

委托 说明
ContextCallback 表示要在新上下文中调用的方法。
ParameterizedThreadStart 表示在 Thread 上执行的方法。
ThreadExceptionEventHandler 表示将要处理 Application 的 ThreadException 事件的方法。
ThreadStart 表示在 Thread 上执行的方法。
TimerCallback 表示处理来自 Timer 的调用的方法。
WaitCallback 表示线程池线程要执行的回调方法。
WaitOrTimerCallback 表示当 WaitHandle 超时或终止时要调用的方法。

 

2.3 线程的管理方式

经过ThreadStart来建立一个新线程是最直接的方法,但这样建立出来的线程比较难管理,若是建立过多的线程反而会让系统的性能下载。有见及此,.NET为线程管理专门设置了一个CLR线程池,使用CLR线程池系统能够更合理地管理线程的使用。全部请求的服务都能运行于线程池中,当运行结束时线程便会回归到线程池。经过设置,能控制线程池的最大线程数量,在请求超出线程最大值时,线程池能按照操做的优先级别来执行,让部分操做处于等待状态,待有线程回归时再执行操做。

基础知识就为你们介绍到这里,下面将详细介绍多线程的开发。

 

 

返回目录

3、以ThreadStart方式实现多线程

3.1 使用ThreadStart委托

这里先以一个例子体现一下多线程带来的好处,首先在Message类中创建一个方法ShowMessage(),里面显示了当前运行线程的Id,并使用Thread.Sleep(int ) 方法模拟部分工做。在main()中经过ThreadStart委托绑定Message对象的ShowMessage()方法,而后经过Thread.Start()执行异步方法。

复制代码
 1       public class Message
2 {
3 public void ShowMessage()
4 {
5 string message = string.Format("Async threadId is :{0}",
6 Thread.CurrentThread.ManagedThreadId);
7 Console.WriteLine(message);
8
9 for (int n = 0; n < 10; n++)
10 {
11 Thread.Sleep(300);
12 Console.WriteLine("The number is:" + n.ToString());
13 }
14 }
15 }
16
17 class Program
18 {
19 static void Main(string[] args)
20 {
21 Console.WriteLine("Main threadId is:"+
22 Thread.CurrentThread.ManagedThreadId);
23 Message message=new Message();
24 Thread thread = new Thread(new ThreadStart(message.ShowMessage));
25 thread.Start();
26 Console.WriteLine("Do something ..........!");
27 Console.WriteLine("Main thread working is complete!");
28
29 }
30 }
复制代码


请注意运行结果,在调用Thread.Start()方法后,系统以异步方式运行Message.ShowMessage(),而主线程的操做是继续执行的,在Message.ShowMessage()完成前,主线程已完成全部的操做。

 

3.2 使用ParameterizedThreadStart委托

ParameterizedThreadStart委托与ThreadStart委托很是类似,但ParameterizedThreadStart委托是面向带参数方法的。注意ParameterizedThreadStart 对应方法的参数为object,此参数能够为一个值对象,也能够为一个自定义对象。

复制代码
 1     public class Person
2 {
3 public string Name
4 {
5 get;
6 set;
7 }
8 public int Age
9 {
10 get;
11 set;
12 }
13 }
14
15 public class Message
16 {
17 public void ShowMessage(object person)
18 {
19 if (person != null)
20 {
21 Person _person = (Person)person;
22 string message = string.Format("\n{0}'s age is {1}!\nAsync threadId is:{2}",
23 _person.Name,_person.Age,Thread.CurrentThread.ManagedThreadId);
24 Console.WriteLine(message);
25 }
26 for (int n = 0; n < 10; n++)
27 {
28 Thread.Sleep(300);
29 Console.WriteLine("The number is:" + n.ToString());
30 }
31 }
32 }
33
34 class Program
35 {
36 static void Main(string[] args)
37 {
38 Console.WriteLine("Main threadId is:"+Thread.CurrentThread.ManagedThreadId);
39
40 Message message=new Message();
41 //绑定带参数的异步方法
42 Thread thread = new Thread(new ParameterizedThreadStart(message.ShowMessage));
43 Person person = new Person();
44 person.Name = "Jack";
45 person.Age = 21;
46 thread.Start(person); //启动异步线程
47
48 Console.WriteLine("Do something ..........!");
49 Console.WriteLine("Main thread working is complete!");
50
51 }
52 }
复制代码


运行结果:

 

3.3 前台线程与后台线程

注意以上两个例子都没有使用Console.ReadKey(),但系统依然会等待异步线程完成后才会结束。这是由于使用Thread.Start()启动的线程默认为前台线程,而系统必须等待全部前台线程运行结束后,应用程序域才会自动卸载。

在第二节曾经介绍过线程Thread有一个属性IsBackground,经过把此属性设置为true,就能够把线程设置为后台线程!这时应用程序域将在主线程完成时就被卸载,而不会等待异步线程的运行。

 

3.4 挂起线程

为了等待其余后台线程完成后再结束主线程,就可使用Thread.Sleep()方法。

复制代码
 1     public class Message
2 {
3 public void ShowMessage()
4 {
5 string message = string.Format("\nAsync threadId is:{0}",
6 Thread.CurrentThread.ManagedThreadId);
7 Console.WriteLine(message);
8 for (int n = 0; n < 10; n++)
9 {
10 Thread.Sleep(300);
11 Console.WriteLine("The number is:" + n.ToString());
12 }
13 }
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 Console.WriteLine("Main threadId is:"+
21 Thread.CurrentThread.ManagedThreadId);
22
23 Message message=new Message();
24 Thread thread = new Thread(new ThreadStart(message.ShowMessage));
25 thread.IsBackground = true;
26 thread.Start();
27
28 Console.WriteLine("Do something ..........!");
29 Console.WriteLine("Main thread working is complete!");
30 Console.WriteLine("Main thread sleep!");
31 Thread.Sleep(5000);
32 }
33 }
复制代码

运行结果以下,此时应用程序域将在主线程运行5秒后自动结束

 

但系统没法预知异步线程须要运行的时间,因此用经过Thread.Sleep(int)阻塞主线程并非一个好的解决方法。有见及此,.NET专门为等待异步线程完成开发了另外一个方法thread.Join()。把上面例子中的最后一行Thread.Sleep(5000)修改成 thread.Join() 就能保证主线程在异步线程thread运行结束后才会终止。

 

3.5 Suspend 与 Resume (慎用)

Thread.Suspend()与 Thread.Resume()是在Framework1.0 就已经存在的老方法了,它们分别能够挂起、恢复线程。但在Framework2.0中就已经明确排斥这两个方法。这是由于一旦某个线程占用了已有的资源,再使用Suspend()使线程长期处于挂起状态,当在其余线程调用这些资源的时候就会引发死锁!因此在没有必要的状况下应该避免使用这两个方法。

 

3.6 终止线程

若想终止正在运行的线程,可使用Abort()方法。在使用Abort()的时候,将引起一个特殊异常 ThreadAbortException 。
若想在线程终止前恢复线程的执行,能够在捕获异常后 ,在catch(ThreadAbortException ex){...} 中调用Thread.ResetAbort()取消终止。
而使用Thread.Join()能够保证应用程序域等待异步线程结束后才终止运行。

复制代码
 1          static void Main(string[] args)
2 {
3 Console.WriteLine("Main threadId is:" +
4 Thread.CurrentThread.ManagedThreadId);
5
6 Thread thread = new Thread(new ThreadStart(AsyncThread));
7 thread.IsBackground = true;
8 thread.Start();
9 thread.Join();
10
11 }
12
13 //以异步方式调用
14 static void AsyncThread()
15 {
16 try
17 {
18 string message = string.Format("\nAsync threadId is:{0}",
19 Thread.CurrentThread.ManagedThreadId);
20 Console.WriteLine(message);
21
22 for (int n = 0; n < 10; n++)
23 {
24 //当n等于4时,终止线程
25 if (n >= 4)
26 {
27 Thread.CurrentThread.Abort(n);
28 }
29 Thread.Sleep(300);
30 Console.WriteLine("The number is:" + n.ToString());
31 }
32 }
33 catch (ThreadAbortException ex)
34 {
35 //输出终止线程时n的值
36 if (ex.ExceptionState != null)
37 Console.WriteLine(string.Format("Thread abort when the number is: {0}!",
38 ex.ExceptionState.ToString()));
39
40 //取消终止,继续执行线程
41 Thread.ResetAbort();
42 Console.WriteLine("Thread ResetAbort!");
43 }
44
45 //线程结束
46 Console.WriteLine("Thread Close!");
47 }
复制代码

运行结果以下

 

 

返回目录

4、CLR线程池的工做者线程

4.1 关于CLR线程池

使用ThreadStart与ParameterizedThreadStart创建新线程很是简单,但经过此方法创建的线程难于管理,若创建过多的线程反而会影响系统的性能。
有见及此,.NET引入CLR线程池这个概念。CLR线程池并不会在CLR初始化的时候马上创建线程,而是在应用程序要建立线程来执行任务时,线程池才初始化一个线程。线程的初始化与其余的线程同样。在完成任务之后,该线程不会自行销毁,而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。这样既节省了创建线程所形成的性能损耗,也可让多个任务反复重用同一线程,从而在应用程序生存期内节约大量开销。

注意经过CLR线程池所创建的线程老是默认为后台线程,优先级数为ThreadPriority.Normal。

 

4.2 工做者线程与I/O线程

CLR线程池分为工做者线程(workerThreads)与I/O线程 (completionPortThreads) 两种,工做者线程是主要用做管理CLR内部对象的运做,I/O(Input/Output) 线程顾名思义是用于与外部系统交换信息,IO线程的细节将在下一节详细说明。

经过ThreadPool.GetMax(out int workerThreads,out int completionPortThreads )和 ThreadPool.SetMax( int workerThreads, int completionPortThreads)两个方法能够分别读取和设置CLR线程池中工做者线程与I/O线程的最大线程数。在Framework2.0中最大线程默认为25*CPU数,在Framewok3.0、4.0中最大线程数默认为250*CPU数,在近年 I3,I5,I7 CPU出现后,线程池的最大值通常默认为1000、2000。
若想测试线程池中有多少的线程正在投入使用,能够经过ThreadPool.GetAvailableThreads( out int workerThreads,out int completionPortThreads ) 方法。

使用CLR线程池的工做者线程通常有两种方式,一是直接经过 ThreadPool.QueueUserWorkItem() 方法,二是经过委托,下面将逐一细说。

 

4.3 经过QueueUserWorkItem启动工做者线程

ThreadPool线程池中包含有两个静态方法能够直接启动工做者线程:
一为 ThreadPool.QueueUserWorkItem(WaitCallback)
二为 ThreadPool.QueueUserWorkItem(WaitCallback,Object) 

先把WaitCallback委托指向一个带有Object参数的无返回值方法,再使用 ThreadPool.QueueUserWorkItem(WaitCallback) 就能够异步启动此方法,此时异步方法的参数被视为null 。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 //把CLR线程池的最大值设置为1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7 //显示主线程启动时线程池信息
8 ThreadMessage("Start");
9 //启动工做者线程
10 ThreadPool.QueueUserWorkItem(new WaitCallback(AsyncCallback));
11 Console.ReadKey();
12 }
13
14 static void AsyncCallback(object state)
15 {
16 Thread.Sleep(200);
17 ThreadMessage("AsyncCallback");
18 Console.WriteLine("Async thread do work!");
19 }
20
21 //显示线程现状
22 static void ThreadMessage(string data)
23 {
24 string message = string.Format("{0}\n CurrentThreadId is {1}",
25 data, Thread.CurrentThread.ManagedThreadId);
26 Console.WriteLine(message);
27 }
28 }
复制代码

运行结果

 

使用 ThreadPool.QueueUserWorkItem(WaitCallback,Object) 方法能够把object对象做为参数传送到回调函数中。
下面例子中就是把一个string对象做为参数发送到回调函数当中。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 //把线程池的最大值设置为1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 ThreadMessage("Start");
9 ThreadPool.QueueUserWorkItem(new WaitCallback(AsyncCallback),"Hello Elva");
10 Console.ReadKey();
11 }
12
13 static void AsyncCallback(object state)
14 {
15 Thread.Sleep(200);
16 ThreadMessage("AsyncCallback");
17
18 string data = (string)state;
19 Console.WriteLine("Async thread do work!\n"+data);
20 }
21
22 //显示线程现状
23 static void ThreadMessage(string data)
24 {
25 string message = string.Format("{0}\n CurrentThreadId is {1}",
26 data, Thread.CurrentThread.ManagedThreadId);
27 Console.WriteLine(message);
28 }
29 }
复制代码

运行结果

 

经过ThreadPool.QueueUserWorkItem启动工做者线程虽然是方便,但WaitCallback委托指向的必须是一个带有Object参数的无返回值方法,这无疑是一种限制。若方法须要有返回值,或者带有多个参数,这将多费周折。有见及此,.NET提供了另外一种方式去创建工做者线程,那就是委托。

 

4.4  委托类       

使用CLR线程池中的工做者线程,最灵活最经常使用的方式就是使用委托的异步方法,在此先简单介绍一下委托类。

当定义委托后,.NET就会自动建立一个表明该委托的类,下面能够用反射方式显示委托类的方法成员(对反射有兴趣的朋友能够先参考一下“.NET基础篇——反射的奥妙”)

复制代码
 1     class Program
2 {
3 delegate void MyDelegate();
4
5 static void Main(string[] args)
6 {
7 MyDelegate delegate1 = new MyDelegate(AsyncThread);
8 //显示委托类的几个方法成员
9 var methods=delegate1.GetType().GetMethods();
10 if (methods != null)
11 foreach (MethodInfo info in methods)
12 Console.WriteLine(info.Name);
13 Console.ReadKey();
14 }
15 }
复制代码

委托类包括如下几个重要方法

复制代码
1     public class MyDelegate:MulticastDelegate
2 {
3 public MyDelegate(object target, int methodPtr);
4 //调用委托方法
5 public virtual void Invoke();
6 //异步委托
7 public virtual IAsyncResult BeginInvoke(AsyncCallback callback,object state);
8 public virtual void EndInvoke(IAsyncResult result);
9 }
复制代码

当调用Invoke()方法时,对应此委托的全部方法都会被执行。而BeginInvoke与EndInvoke则支持委托方法的异步调用,由BeginInvoke启动的线程都属于CLR线程池中的工做者线程,在下面将详细说明。

 

4.5  利用BeginInvoke与EndInvoke完成异步委托方法

首先创建一个委托对象,经过IAsyncResult BeginInvoke(string name,AsyncCallback callback,object state) 异步调用委托方法,BeginInvoke 方法除最后的两个参数外,其它参数都是与方法参数相对应的。经过 BeginInvoke 方法将返回一个实现了 System.IAsyncResult 接口的对象,以后就能够利用EndInvoke(IAsyncResult ) 方法就能够结束异步操做,获取委托的运行结果。

复制代码
 1     class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //创建委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //异步调用委托,获取计算结果
12 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
13 //完成主线程其余工做
14 .............
15 //等待异步方法完成,调用EndInvoke(IAsyncResult)获取运行结果
16 string data=myDelegate.EndInvoke(result);
17 Console.WriteLine(data);
18
19 Console.ReadKey();
20 }
21
22 static string Hello(string name)
23 {
24 ThreadMessage("Async Thread");
25 Thread.Sleep(2000); //虚拟异步工做
26 return "Hello " + name;
27 }
28
29 //显示当前线程
30 static void ThreadMessage(string data)
31 {
32 string message = string.Format("{0}\n ThreadId is:{1}",
33 data,Thread.CurrentThread.ManagedThreadId);
34 Console.WriteLine(message);
35 }
36 }
复制代码

运行结果

 

4.6  善用IAsyncResult

在以上例子中能够看见,若是在使用myDelegate.BeginInvoke后当即调用myDelegate.EndInvoke,那在异步线程未完成工做之前主线程将处于阻塞状态,等到异步线程结束获取计算结果后,主线程才能继续工做,这明显没法展现出多线程的优点。此时能够好好利用IAsyncResult 提升主线程的工做性能,IAsyncResult有如下成员:

复制代码
1 public interface IAsyncResult
2 {
3 object AsyncState {get;} //获取用户定义的对象,它限定或包含关于异步操做的信息。
4 WailHandle AsyncWaitHandle {get;} //获取用于等待异步操做完成的 WaitHandle。
5 bool CompletedSynchronously {get;} //获取异步操做是否同步完成的指示。
6 bool IsCompleted {get;} //获取异步操做是否已完成的指示。
7 }
复制代码

经过轮询方式,使用IsCompleted属性判断异步操做是否完成,这样在异步操做未完成前就可让主线程执行另外的工做。

复制代码
 1     class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //创建委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //异步调用委托,获取计算结果
12 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
13 //在异步线程未完成前执行其余工做
14 while (!result.IsCompleted)
15 {
16 Thread.Sleep(200); //虚拟操做
17 Console.WriteLine("Main thead do work!");
18 }
19 string data=myDelegate.EndInvoke(result);
20 Console.WriteLine(data);
21
22 Console.ReadKey();
23 }
24
25 static string Hello(string name)
26 {
27 ThreadMessage("Async Thread");
28 Thread.Sleep(2000);
29 return "Hello " + name;
30 }
31
32 static void ThreadMessage(string data)
33 {
34 string message = string.Format("{0}\n ThreadId is:{1}",
35 data,Thread.CurrentThread.ManagedThreadId);
36 Console.WriteLine(message);
37 }
38 }
复制代码

运行结果:

 

除此之外,也可使用WailHandle完成一样的工做,WaitHandle里面包含有一个方法WaitOne(int timeout),它能够判断委托是否完成工做,在工做未完成前主线程能够继续其余工做。运行下面代码可获得与使用 IAsyncResult.IsCompleted 一样的结果,并且更简单方便 。

复制代码
 1 namespace Test
2 {
3 class Program
4 {
5 delegate string MyDelegate(string name);
6
7 static void Main(string[] args)
8 {
9 ThreadMessage("Main Thread");
10
11 //创建委托
12 MyDelegate myDelegate = new MyDelegate(Hello);
13
14 //异步调用委托,获取计算结果
15 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
16
17 while (!result.AsyncWaitHandle.WaitOne(200))
18 {
19 Console.WriteLine("Main thead do work!");
20 }
21 string data=myDelegate.EndInvoke(result);
22 Console.WriteLine(data);
23
24 Console.ReadKey();
25 }
26
27 static string Hello(string name)
28 {
29 ThreadMessage("Async Thread");
30 Thread.Sleep(2000);
31 return "Hello " + name;
32 }
33
34 static void ThreadMessage(string data)
35 {
36 string message = string.Format("{0}\n ThreadId is:{1}",
37 data,Thread.CurrentThread.ManagedThreadId);
38 Console.WriteLine(message);
39 }
40 }
复制代码

当要监视多个运行对象的时候,使用IAsyncResult.WaitHandle.WaitOne可就派不上用场了。
幸亏.NET为WaitHandle准备了另外两个静态方法:WaitAny(waitHandle[], int)与WaitAll (waitHandle[] , int)。
其中WaitAll在等待全部waitHandle完成后再返回一个bool值。
而WaitAny是等待其中一个waitHandle完成后就返回一个int,这个int是表明已完成waitHandle在waitHandle[]中的数组索引。
下面就是使用WaitAll的例子,运行结果与使用 IAsyncResult.IsCompleted 相同。

复制代码
 1     class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //创建委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11
12 //异步调用委托,获取计算结果
13 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
14
15 //此处可加入多个检测对象
16 WaitHandle[] waitHandleList = new WaitHandle[] { result.AsyncWaitHandle,........ };
17 while (!WaitHandle.WaitAll(waitHandleList,200))
18 {
19 Console.WriteLine("Main thead do work!");
20 }
21 string data=myDelegate.EndInvoke(result);
22 Console.WriteLine(data);
23
24 Console.ReadKey();
25 }
26
27 static string Hello(string name)
28 {
29 ThreadMessage("Async Thread");
30 Thread.Sleep(2000);
31 return "Hello " + name;
32 }
33
34 static void ThreadMessage(string data)
35 {
36 string message = string.Format("{0}\n ThreadId is:{1}",
37 data,Thread.CurrentThread.ManagedThreadId);
38 Console.WriteLine(message);
39 }
40 }
复制代码

 


4.7 回调函数

使用轮询方式来检测异步方法的状态很是麻烦,并且效率不高,有见及此,.NET为 IAsyncResult BeginInvoke(AsyncCallback , object)准备了一个回调函数。使用 AsyncCallback 就能够绑定一个方法做为回调函数,回调函数必须是带参数 IAsyncResult 且无返回值的方法: void AsycnCallbackMethod(IAsyncResult result) 。在BeginInvoke方法完成后,系统就会调用AsyncCallback所绑定的回调函数,最后回调函数中调用 XXX EndInvoke(IAsyncResult result) 就能够结束异步方法,它的返回值类型与委托的返回值一致。

复制代码
 1     class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //创建委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //异步调用委托,获取计算结果
12 myDelegate.BeginInvoke("Leslie", new AsyncCallback(Completed), null);
13 //在启动异步线程后,主线程能够继续工做而不须要等待
14 for (int n = 0; n < 6; n++)
15 Console.WriteLine(" Main thread do work!");
16 Console.WriteLine("");
17
18 Console.ReadKey();
19 }
20
21 static string Hello(string name)
22 {
23 ThreadMessage("Async Thread");
24 Thread.Sleep(2000); \\模拟异步操做
25 return "\nHello " + name;
26 }
27
28 static void Completed(IAsyncResult result)
29 {
30 ThreadMessage("Async Completed");
31
32 //获取委托对象,调用EndInvoke方法获取运行结果
33 AsyncResult _result = (AsyncResult)result;
34 MyDelegate myDelegate = (MyDelegate)_result.AsyncDelegate;
35 string data = myDelegate.EndInvoke(_result);
36 Console.WriteLine(data);
37 }
38
39 static void ThreadMessage(string data)
40 {
41 string message = string.Format("{0}\n ThreadId is:{1}",
42 data, Thread.CurrentThread.ManagedThreadId);
43 Console.WriteLine(message);
44 }
45 }
复制代码


能够看到,主线在调用BeginInvoke方法能够继续执行其余命令,而无需再等待了,这无疑比使用轮询方式判断异步方法是否完成更有优点。
在异步方法执行完成后将会调用AsyncCallback所绑定的回调函数,注意一点,回调函数依然是在异步线程中执行,这样就不会影响主线程的运行,这也使用回调函数最值得青昧的地方。
在回调函数中有一个既定的参数IAsyncResult,把IAsyncResult强制转换为AsyncResult后,就能够经过 AsyncResult.AsyncDelegate 获取原委托,再使用EndInvoke方法获取计算结果。
运行结果以下:


若是想为回调函数传送一些外部信息,就能够利用BeginInvoke(AsyncCallback,object)的最后一个参数object,它容许外部向回调函数输入任何类型的参数。只须要在回调函数中利用 AsyncResult.AsyncState 就能够获取object对象。

复制代码
 1     class Program
2 {
3 public class Person
4 {
5 public string Name;
6 public int Age;
7 }
8
9 delegate string MyDelegate(string name);
10
11 static void Main(string[] args)
12 {
13 ThreadMessage("Main Thread");
14
15 //创建委托
16 MyDelegate myDelegate = new MyDelegate(Hello);
17
18 //创建Person对象
19 Person person = new Person();
20 person.Name = "Elva";
21 person.Age = 27;
22
23 //异步调用委托,输入参数对象person, 获取计算结果
24 myDelegate.BeginInvoke("Leslie", new AsyncCallback(Completed), person);
25
26 //在启动异步线程后,主线程能够继续工做而不须要等待
27 for (int n = 0; n < 6; n++)
28 Console.WriteLine(" Main thread do work!");
29 Console.WriteLine("");
30
31 Console.ReadKey();
32 }
33
34 static string Hello(string name)
35 {
36 ThreadMessage("Async Thread");
37 Thread.Sleep(2000);
38 return "\nHello " + name;
39 }
40
41 static void Completed(IAsyncResult result)
42 {
43 ThreadMessage("Async Completed");
44
45 //获取委托对象,调用EndInvoke方法获取运行结果
46 AsyncResult _result = (AsyncResult)result;
47 MyDelegate myDelegate = (MyDelegate)_result.AsyncDelegate;
48 string data = myDelegate.EndInvoke(_result);
49 //获取Person对象
50 Person person = (Person)result.AsyncState;
51 string message = person.Name + "'s age is " + person.Age.ToString();
52
53 Console.WriteLine(data+"\n"+message);
54 }
55
56 static void ThreadMessage(string data)
57 {
58 string message = string.Format("{0}\n ThreadId is:{1}",
59 data, Thread.CurrentThread.ManagedThreadId);
60 Console.WriteLine(message);
61 }
62 }
复制代码

运行结果:

 

 

5、CLR线程池的I/O线程

在前一节所介绍的线程都属于CLR线程池的工做者线程,这一节开始为你们介绍一下CLR线程池的I/O线程

I/O 线程是.NET专为访问外部资源所设置的一种线程,由于访问外部资源经常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET为多个I/O操做都创建起了异步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,并且每一个异步方法的使用方式都很是相似,都是以BeginXXX为开始,以EndXXX结束,下面为你们一一解说。

 

5.1  异步读写 FileStream

须要在 FileStream 异步调用 I/O线程,必须使用如下构造函数创建 FileStream 对象,并把useAsync设置为 true。

FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;

其中 path 是文件的相对路径或绝对路径; mode 肯定如何打开或建立文件; access 肯定访问文件的方式; share 肯定文件如何进程共享; bufferSize 是表明缓冲区大小,通常默认最小值为8,在启动异步读取或写入时,文件大小通常大于缓冲大小; userAsync表明是否启动异步I/O线程。

注意:当使用 BeginRead 和 BeginWrite 方法在执行大量读或写时效果更好,但对于少许的读/写,这些方法速度可能比同步读取还要慢,由于进行线程间的切换须要大量时间。

 

5.1.1 异步写入

FileStream中包含BeginWrite、EndWrite 方法能够启动I/O线程进行异步写入。

public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject )
public override void EndWrite (IAsyncResult asyncResult )

 

BeginWrite 返回值为IAsyncResult, 使用方式与委托的BeginInvoke方法类似,最好就是使用回调函数,避免线程阻塞。在最后两个参数中,参数AsyncCallback用于绑定回调函数; 参数Object用于传递外部数据。要注意一点:AsyncCallback所绑定的回调函数必须是带单个 IAsyncResult 参数的无返回值方法。
在例子中,把FileStream做为外部数据传递到回调函数当中,而后在回调函数中利用IAsyncResult.AsyncState获取FileStream对象,最后经过FileStream.EndWrite(IAsyncResult)结束写入。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 //把线程池的最大值设置为1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //新立文件File.sour
10 FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate,
11 FileAccess.ReadWrite,FileShare.ReadWrite,1024,true);
12 byte[] bytes = new byte[16384];
13 string message = "An operating-system ThreadId has no fixed relationship........";
14 bytes = Encoding.Unicode.GetBytes(message);
15
16 //启动异步写入
17 stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream);
18 stream.Flush();
19
20 Console.ReadKey();
21 }
22
23 static void Callback(IAsyncResult result)
24 {
25 //显示线程池现状
26 Thread.Sleep(200);
27 ThreadPoolMessage("AsyncCallback");
28 //结束异步写入
29 FileStream stream = (FileStream)result.AsyncState;
30 stream.EndWrite(result);
31 stream.Close();
32 }
33
34 //显示线程池现状
35 static void ThreadPoolMessage(string data)
36 {
37 int a, b;
38 ThreadPool.GetAvailableThreads(out a, out b);
39 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
40 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
41 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
复制代码

由输出结果能够看到,在使用FileStream.BeginWrite方法后,系统将自动启动CLR线程池中I/O线程。



5.1.2 异步读取

FileStream 中包含 BeginRead 与 EndRead 能够异步调用I/O线程进行读取。

public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)
public override int EndRead(IAsyncResult asyncResult)

 

其使用方式与BeginWrite和EndWrite类似,AsyncCallback用于绑定回调函数; Object用于传递外部数据。在回调函数只须要使用IAsyncResut.AsyncState就可获取外部数据。EndWrite 方法会返回从流读取到的字节数量。

首先定义 FileData 类,里面包含FileStream对象,byte[] 数组和长度。而后把FileData对象做为外部数据传到回调函数,在回调函数中,把IAsyncResult.AsyncState强制转换为FileData,而后经过FileStream.EndRead(IAsyncResult)结束读取。最后比较一下长度,若读取到的长度与输入的数据长度不一至,则抛出异常。

复制代码
 1      class Program
2 {
3 public class FileData
4 {
5 public FileStream Stream;
6 public int Length;
7 public byte[] ByteData;
8 }
9
10 static void Main(string[] args)
11 {
12 //把线程池的最大值设置为1000
13 ThreadPool.SetMaxThreads(1000, 1000);
14 ThreadPoolMessage("Start");
15 ReadFile();
16
17 Console.ReadKey();
18 }
19
20 static void ReadFile()
21 {
22 byte[] byteData=new byte[80961024];
23 FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate,
24 FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
25
26 //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数
27 FileData fileData = new FileData();
28 fileData.Stream = stream;
29 fileData.Length = (int)stream.Length;
30 fileData.ByteData = byteData;
31
32 //启动异步读取
33 stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData);
34 }
35
36 static void Completed(IAsyncResult result)
37 {
38 ThreadPoolMessage("Completed");
39
40 //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取
41 FileData fileData = (FileData)result.AsyncState;
42 int length=fileData.Stream.EndRead(result);
43 fileData.Stream.Close();
44
45 //若是读取到的长度与输入长度不一致,则抛出异常
46 if (length != fileData.Length)
47 throw new Exception("Stream is not complete!");
48
49 string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length);
50 Console.WriteLine(data.Substring(2,22));
51 }
52
53 //显示线程池现状
54 static void ThreadPoolMessage(string data)
55 {
56 int a, b;
57 ThreadPool.GetAvailableThreads(out a, out b);
58 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
59 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
60 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
61 Console.WriteLine(message);
62 }
63
64 }
复制代码

由输出结果能够看到,在使用FileStream.BeginRead方法后,系统将自动启动CLR线程池中I/O线程。

 

注意:若是你看到的测试结果正好相反:工做者线程为999,I/O线程为1000,这是由于FileStream的文件容量小于缓冲值1024所致的。此时文件将会一次性读取或写入,而系统将启动工做者线程而非I/O线程来处理回调函数。

 

 

5.2 异步操做TCP/IP套接字

在介绍 TCP/IP 套接字前先简单介绍一下 NetworkStream 类,它是用于网络访问的基础数据流。 NetworkStream 提供了好几个方法控制套接字数据的发送与接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 可以实现异步操做,并且异步线程是来自于CLR线程池的I/O线程。

public override int ReadByte ()
public override int Read (byte[] buffer,int offset, int size)

public override void WriteByte (byte value)
public override void Write (byte[] buffer,int offset, int size)

public override IAsyncResult BeginRead (byte [] buffer, int offset, int size,  AsyncCallback callback, Object state )
public override int EndRead(IAsyncResult result)

public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size,  AsyncCallback callback, Object state )
public override void EndWrite(IAsyncResult result)

 

若要建立 NetworkStream,必须提供已链接的 Socket。而在.NET中使用TCP/IP套接字不须要直接与Socket打交道,由于.NET把Socket的大部分操做都放在System.Net.TcpListener和System.Net.Sockets.TcpClient里面,这两个类大大地简化了Socket的操做。通常套接字对象Socket包含一个Accept()方法,此方法能产生阻塞来等待客户端的请求,而在TcpListener类里也包含了一个类似的方法 public TcpClient AcceptTcpClient()用于等待客户端的请求。此方法将会返回一个TcpClient 对象,经过 TcpClient 的 public NetworkStream GetStream()方法就能获取NetworkStream对象,控制套接字数据的发送与接收。

 

下面以一个例子说明异步调用TCP/IP套接字收发数据的过程。

首先在服务器端创建默认地址127.0.0.1用于收发信息,使用此地址与端口500新建TcpListener对象,调用TcpListener.Start 侦听传入的链接请求,再使用一个死循环来监听信息。

在ChatClient类包括有接收信息与发送信息两个功能:当接收到客户端请求时,它会利用 NetworkStream.BeginRead 读取客户端信息,并在回调函数ReceiveAsyncCallback中输出信息内容,若接收到的信息的大小小于1时,它将会抛出一个异常。当信息成功接收后,再使用 NetworkStream.BeginWrite 方法回馈信息到客户端

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置CLR线程池最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //默认地址为127.0.0.1
9 IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
10 TcpListener tcpListener = new TcpListener(ipAddress, 500);
11 tcpListener.Start();
12
13 //以一个死循环来实现监听
14 while (true)
15 { //调用一个ChatClient对象来实现监听
16 ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient());
17 }
18 }
19 }
20
21 public class ChatClient
22 {
23 static TcpClient tcpClient;
24 static byte[] byteMessage;
25 static string clientEndPoint;
26
27 public ChatClient(TcpClient tcpClient1)
28 {
29 tcpClient = tcpClient1;
30 byteMessage = new byte[tcpClient.ReceiveBufferSize];
31
32 //显示客户端信息
33 clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString();
34 Console.WriteLine("Client's endpoint is " + clientEndPoint);
35
36 //使用NetworkStream.BeginRead异步读取信息
37 NetworkStream networkStream = tcpClient.GetStream();
38 networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
39 new AsyncCallback(ReceiveAsyncCallback), null);
40 }
41
42 public void ReceiveAsyncCallback(IAsyncResult iAsyncResult)
43 {
44 //显示CLR线程池状态
45 Thread.Sleep(100);
46 ThreadPoolMessage("\nMessage is receiving");
47
48 //使用NetworkStream.EndRead结束异步读取
49 NetworkStream networkStreamRead = tcpClient.GetStream();
50 int length=networkStreamRead.EndRead(iAsyncResult);
51
52 //若是接收到的数据长度少于1则抛出异常
53 if (length < 1)
54 {
55 tcpClient.GetStream().Close();
56 throw new Exception("Disconnection!");
57 }
58
59 //显示接收信息
60 string message = Encoding.UTF8.GetString(byteMessage, 0, length);
61 Console.WriteLine("Message:" + message);
62
63 //使用NetworkStream.BeginWrite异步发送信息
64 byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!");
65 NetworkStream networkStreamWrite=tcpClient.GetStream();
66 networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length,
67 new AsyncCallback(SendAsyncCallback), null);
68 }
69
70 //把信息转换成二进制数据,而后发送到客户端
71 public void SendAsyncCallback(IAsyncResult iAsyncResult)
72 {
73 //显示CLR线程池状态
74 Thread.Sleep(100);
75 ThreadPoolMessage("\nMessage is sending");
76
77 //使用NetworkStream.EndWrite结束异步发送
78 tcpClient.GetStream().EndWrite(iAsyncResult);
79
80 //从新监听
81 tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
82 new AsyncCallback(ReceiveAsyncCallback), null);
83 }
84
85 //显示线程池现状
86 static void ThreadPoolMessage(string data)
87 {
88 int a, b;
89 ThreadPool.GetAvailableThreads(out a, out b);
90 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
91 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
92 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
93
94 Console.WriteLine(message);
95 }
96 }
复制代码

而在客户端只是使用简单的开发方式,利用TcpClient链接到服务器端,而后调用NetworkStream.Write方法发送信息,最后调用NetworkStream.Read方法读取回馈信息

复制代码
 1         static void Main(string[] args)
2 {
3 //链接服务端
4 TcpClient tcpClient = new TcpClient("127.0.0.1", 500);
5
6 //发送信息
7 NetworkStream networkStream = tcpClient.GetStream();
8 byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!");
9 networkStream.Write(sendMessage, 0, sendMessage.Length);
10 networkStream.Flush();
11
12 //接收信息
13 byte[] receiveMessage=new byte[1024];
14 int count=networkStream.Read(receiveMessage, 0,1024);
15 Console.WriteLine(Encoding.UTF8.GetString(receiveMessage));
16 Console.ReadKey();
17 }
复制代码

注意观察运行结果,服务器端的异步操做线程都是来自于CLR线程池的I/O线程



5.3 异步WebRequest

System.Net.WebRequest 是 .NET 为实现访问 Internet 的 “请求/响应模型” 而开发的一个 abstract 基类, 它主要有三个子类:FtpWebRequest、HttpWebRequest、FileWebRequest。当使用WebRequest.Create(string uri)建立对象时,应用程序就能够根据请求协议判断实现类来进行操做。FileWebRequest、FtpWebRequest、HttpWebRequest 各有其做用:FileWebRequest 使用 “file://路径” 的URI方式实现对本地资源和内部文件的请求/响应、FtpWebRequest 使用FTP文件传输协议实现文件请求/响应、HttpWebRequest 用于处理HTTP的页面请求/响应。因为使用方法相相似,下面就以经常使用的HttpWebRequest为例子介绍一下异步WebRequest的使用方法。

在使用ASP.NET开发网站的时候,每每会忽略了HttpWebRequest的使用,由于开发都假设客户端是使用浏览器等工具去阅读页面的。但若是你对REST开发方式有所了解,那对 HttpWebRequest 就应该很是熟悉。它能够在路径参数、头文件、页面主体、Cookie 等多处地方加入请求条件,而后对回复数据进行适当处理。HttpWebRequest 包含有如下几个经常使用方法用于处理请求/响应:

public override Stream GetRequestStream ()
public override WebResponse GetResponse ()

public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )
public override Stream EndGetRequestStream ( IAsyncResult asyncResult )
public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )
public override WebResponse EndGetResponse ( IAsyncResult asyncResult )

其中BeginGetRequestStream、EndGetRequestStream 用于异步向HttpWebRequest对象写入请求信息;  BeginGetResponse、EndGetResponse 用于异步发送页面请求并获取返回信息。使用异步方式操做Internet的“请求/响应”,避免主线程长期处于等待状态,而操做期间异步线程是来自CLR线程池的I/O线程。

注意:请求与响应不能使用同步与异步混合开发模式,即当请求写入使用GetRequestStream同步模式,即便响应使用BeginGetResponse异步方法,操做也与GetRequestStream方法在于同一线程内。

下面以简单的例子介绍一下异步请求的用法。

首先为Person类加上可序列化特性,在服务器端创建Hanlder.ashx,经过Request.InputStream 获取到请求数据并把数据转化为String对象,此实例中数据是以 “Id:1” 的形式实现传送的。而后根据Id查找对应的Person对象,并把Person对象写入Response.OutStream 中返还到客户端。

在客户端先把 HttpWebRequird.Method 设置为 "post",使用异步方式经过BeginGetRequireStream获取请求数据流,而后写入请求数据 “Id:1”。再使用异步方法BeginGetResponse 获取回复数据,最后把数据反序列化为Person对象显示出来。

注意:HttpWebRequire.Method默认为get,在写入请求前必须把HttpWebRequire.Method设置为post,不然在使用BeginGetRequireStream 获取请求数据流的时候,系统就会发出 “没法发送具备此谓词类型的内容正文" 的异常。

Model

复制代码
 1 namespace Model
2 {
3 [Serializable]
4 public class Person
5 {
6 public int ID
7 {
8 get;
9 set;
10 }
11 public string Name
12 {
13 get;
14 set;
15 }
16 public int Age
17 {
18 get;
19 set;
20 }
21 }
22 }
复制代码

 

服务器端

复制代码
 1 public class Handler : IHttpHandler {
2
3 public void ProcessRequest(HttpContext context)
4 {
5 //把信息转换为String,找出输入条件Id
6 byte[] bytes=new byte[1024];
7 int length=context.Request.InputStream.Read(bytes,0,1024);
8 string condition = Encoding.Default.GetString(bytes);
9 int id = int.Parse(condition.Split(new string[] { ":" },
10 StringSplitOptions.RemoveEmptyEntries)[1]);
11
12 //根据Id查找对应Person对象
13 var person = GetPersonList().Where(x => x.ID == id).First();
14
15 //所Person格式化为二进制数据写入OutputStream
16 BinaryFormatter formatter = new BinaryFormatter();
17 formatter.Serialize(context.Response.OutputStream, person);
18 }
19
20 //模拟源数据
21 private IList<Person> GetPersonList()
22 {
23 var personList = new List<Person>();
24
25 var person1 = new Person();
26 person1.ID = 1;
27 person1.Name = "Leslie";
28 person1.Age = 30;
29 personList.Add(person1);
30 ...........
31 return personList;
32 }
33
34 public bool IsReusable
35 {
36 get { return true;}
37 }
38 }
复制代码

客户端

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Request();
7 Console.ReadKey();
8 }
9
10 static void Request()
11 {
12 ThreadPoolMessage("Start");
13 //使用WebRequest.Create方法创建HttpWebRequest对象
14 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(
15 "http://localhost:5700/Handler.ashx");
16 webRequest.Method = "post";
17
18 //对写入数据的RequestStream对象进行异步请求
19 IAsyncResult result=webRequest.BeginGetRequestStream(
20 new AsyncCallback(EndGetRequestStream),webRequest);
21 }
22
23 static void EndGetRequestStream(IAsyncResult result)
24 {
25 ThreadPoolMessage("RequestStream Complete");
26 //获取RequestStream
27 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
28 Stream stream=webRequest.EndGetRequestStream(result);
29
30 //写入请求条件
31 byte[] condition = Encoding.Default.GetBytes("Id:1");
32 stream.Write(condition, 0, condition.Length);
33
34 //异步接收回传信息
35 IAsyncResult responseResult = webRequest.BeginGetResponse(
36 new AsyncCallback(EndGetResponse), webRequest);
37 }
38
39 static void EndGetResponse(IAsyncResult result)
40 {
41 //显出线程池现状
42 ThreadPoolMessage("GetResponse Complete");
43
44 //结束异步请求,获取结果
45 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
46 WebResponse webResponse = webRequest.EndGetResponse(result);
47
48 //把输出结果转化为Person对象
49 Stream stream = webResponse.GetResponseStream();
50 BinaryFormatter formatter = new BinaryFormatter();
51 var person=(Person)formatter.Deserialize(stream);
52 Console.WriteLine(string.Format("Person Id:{0} Name:{1} Age:{2}",
53 person.ID, person.Name, person.Age));
54 }
55
56 //显示线程池现状
57 static void ThreadPoolMessage(string data)
58 {
59 int a, b;
60 ThreadPool.GetAvailableThreads(out a, out b);
61 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
62 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
63 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
64
65 Console.WriteLine(message);
66 }
67 }
复制代码

从运行结果能够看到,BeginGetRequireStream、BeginGetResponse方法是使用CLR线程池的I/O线程。

 

 

5.4 异步调用WebService

相比TCP/IP套接字,在使用WebService的时候,服务器端须要更复杂的操做处理,使用时间每每会更长。为了不客户端长期处于等待状态,在配置服务引用时选择 “生成异步操做”,系统能够自动创建异步调用的方式。

以.NET 2.0之前,系统都是使用ASMX来设计WebService,而近年来WCF可说是火热登场,下面就以WCF为例子简单介绍一下异步调用WebService的例子。

因为系统能够自动生成异步方法,使用起来很是简单,首先在服务器端创建服务ExampleService,里面包含方法Method。客户端引用此服务时,选择 “生成异步操做”。而后使用 BeginMethod 启动异步方法, 在回调函数中调用EndMethod结束异步调用。

服务端

复制代码
 1      [ServiceContract]
2 public interface IExampleService
3 {
4 [OperationContract]
5 string Method(string name);
6 }
7
8 public class ExampleService : IExampleService
9 {
10 public string Method(string name)
11 {
12 return "Hello " + name;
13 }
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 ServiceHost host = new ServiceHost(typeof(ExampleService));
21 host.Open();
22 Console.ReadKey();
23 host.Close();
24 }
25 }
26
27 <configuration>
28 <system.serviceModel>
29 <services>
30 <service name="Example.ExampleService">
31 <endpoint address="" binding="wsHttpBinding" contract="Example.IExampleService">
32 <identity>
33 <dns value="localhost" />
34 </identity>
35 </endpoint>
36 <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
37 <host>
38 <baseAddresses>
39 <add baseAddress="http://localhost:7200/Example/ExampleService/" />
40 </baseAddresses>
41 </host>
42 </service>
43 </services>
44 </system.serviceModel>
45 </configuration>
复制代码

客户端

复制代码
 1      class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //创建服务对象,异步调用服务方法
10 ExampleServiceReference.ExampleServiceClient exampleService = new
11 ExampleServiceReference.ExampleServiceClient();
12 exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod),
13 exampleService);
14 Console.ReadKey();
15 }
16
17 static void AsyncCallbackMethod(IAsyncResult result)
18 {
19 Thread.Sleep(1000);
20 ThreadPoolMessage("Complete");
21 ExampleServiceReference.ExampleServiceClient example =
22 (ExampleServiceReference.ExampleServiceClient)result.AsyncState;
23 string data=example.EndMethod(result);
24 Console.WriteLine(data);
25 }
26
27 //显示线程池现状
28 static void ThreadPoolMessage(string data)
29 {
30 int a, b;
31 ThreadPool.GetAvailableThreads(out a, out b);
32 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
33 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
34 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
35
36 Console.WriteLine(message);
37 }
38 }
39
40 <configuration>
41 <system.serviceModel>
42 <bindings>
43 <wsHttpBinding>
44 <binding name="WSHttpBinding_IExampleService" closeTimeout="00:01:00"
45 openTimeout="00:01:00" receiveTimeout="00:10:00" sendTimeout="00:01:00"
46 bypassProxyOnLocal="false" transactionFlow="false"
47 hostNameComparisonMode="StrongWildcard" maxBufferPoolSize="524288"
48 maxReceivedMessageSize="65536" messageEncoding="Text" textEncoding="utf-8"
49 useDefaultWebProxy="true" allowCookies="false">
50 <readerQuotas maxDepth="32" maxStringContentLength="8192" maxArrayLength="16384"
51 maxBytesPerRead="4096" maxNameTableCharCount="16384" />
52 <reliableSession ordered="true" inactivityTimeout="00:10:00" enabled="false" />
53 <security mode="Message">
54 <transport clientCredentialType="Windows" proxyCredentialType="None"
55 realm="" />
56 <message clientCredentialType="Windows" negotiateServiceCredential="true"
57 algorithmSuite="Default" />
58 </security>
59 </binding>
60 </wsHttpBinding>
61 </bindings>
62 <client>
63 <endpoint address="http://localhost:7200/Example/ExampleService/"
64 binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_IExampleService"
65 contract="ExampleServiceReference.IExampleService"
66 name="WSHttpBinding_IExampleService">
67 <identity>
68 <dns value="localhost" />
69 </identity>
70 </endpoint>
71 </client>
72 </system.serviceModel>
73 </configuration>
复制代码

注意观察运行结果,异步调用服务时,回调函数都是运行于CLR线程池的I/O线程当中。




回到目录

6、异步 SqlCommand

从ADO.NET 2.0开始,SqlCommand就新增了几个异步方法执行SQL命令。相对于同步执行方式,它使主线程不须要等待数据库的返回结果,在使用复杂性查询或批量插入时将有效提升主线程的效率。使用异步SqlCommand的时候,请注意把ConnectionString 的 Asynchronous Processing 设置为 true 。

注意:SqlCommand异步操做的特别之处在于线程并不依赖于CLR线程池,而是由Windows内部提供,这比使用异步委托更有效率。但若是须要使用回调函数的时候,回调函数的线程依然是来自于CLR线程池的工做者线程。

SqlCommand有如下几个方法支持异步操做:

public IAsyncResult BeginExecuteNonQuery (......)
public int EndExecuteNonQuery(IAsyncResult)

public IAsyncResult BeginExecuteReader(......)
public SqlDataReader EndExecuteReader(IAsyncResult)

public IAsyncResult BeginExecuteXmlReader (......)
public XmlReader EndExecuteXmlReader(IAsyncResult)

 

因为使用方式类似,此处就以 BeginExecuteNonQuery 为例子,介绍一下异步SqlCommand的使用。首先创建connectionString,注意把Asynchronous Processing设置为true来启动异步命令,而后把SqlCommand.CommandText设置为 WAITFOR DELAY "0:0:3" 来虚拟数据库操做。再经过BeginExecuteNonQuery启动异步操做,利用轮询方式监测操做状况。最后在操做完成后使用EndExecuteNonQuery完成异步操做。

复制代码
 1     class Program
2 {
3 //把Asynchronous Processing设置为true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;“+
5 "Integrated Security=True;Asynchronous Processing=true";
6
7 static void Main(string[] args)
8 {
9 //把CLR线程池最大线程数设置为1000
10 ThreadPool.SetMaxThreads(1000, 1000);
11 ThreadPoolMessage("Start");
12
13 //使用WAITFOR DELAY命令来虚拟操做
14 SqlConnection connection = new SqlConnection(connectionString);
15 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
16 connection.Open();
17
18 //启动异步SqlCommand操做,利用轮询方式监测操做
19 IAsyncResult result = command.BeginExecuteNonQuery();
20 ThreadPoolMessage("BeginRead");
21 while (!result.AsyncWaitHandle.WaitOne(500))
22 Console.WriteLine("Main thread do work........");
23
24 //结束异步SqlCommand
25 int count= command.EndExecuteNonQuery(result);
26 ThreadPoolMessage("\nCompleted");
27 Console.ReadKey();
28 }
29
30 //显示线程池现状
31 static void ThreadPoolMessage(string data)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
36 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
37 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
38 Console.WriteLine(message);
39 }
40 }
复制代码

注意运行结果,SqlCommand的异步执行线程并不属于CLR线程池。

 

若是以为使用轮询方式过于麻烦,可使用回调函数,但要注意当调用回调函数时,线程是来自于CLR线程池的工做者线程。

复制代码
 1     class Program
2 {
3 //把Asynchronous Processing设置为true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;”+
5 “Integrated Security=True;Asynchronous Processing=true";
6 static void Main(string[] args)
7 {
8 //把CLR线程池最大线程数设置为1000
9 ThreadPool.SetMaxThreads(1000, 1000);
10 ThreadPoolMessage("Start");
11
12 //使用WAITFOR DELAY命令来虚拟操做
13 SqlConnection connection = new SqlConnection(connectionString);
14 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
15 connection.Open();
16
17 //启动异步SqlCommand操做,并把SqlCommand对象传递到回调函数
18 IAsyncResult result = command.BeginExecuteNonQuery(
19 new AsyncCallback(AsyncCallbackMethod),command);
20 Console.ReadKey();
21 }
22
23 static void AsyncCallbackMethod(IAsyncResult result)
24 {
25 Thread.Sleep(200);
26 ThreadPoolMessage("AsyncCallback");
27 SqlCommand command = (SqlCommand)result.AsyncState;
28 int count=command.EndExecuteNonQuery(result);
29 command.Connection.Close();
30 }
31
32 //显示线程池现状
33 static void ThreadPoolMessage(string data)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
38 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
39 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
复制代码

运行结果:

 

 

回到目录

7、并行编程与PLINQ

要使用多线程开发,必须很是熟悉Thread的使用,并且在开发过程当中可能会面对不少未知的问题。为了简化开发,.NET 4.0 特别提供一个并行编程库System.Threading.Tasks,它能够简化并行开发,你无需直接跟线程或线程池打交道,就能够简单创建多线程应用程序。此外,.NET还提供了新的一组扩展方法PLINQ,它具备自动分析查询功能,若是并行查询能提升系统效率,则同时运行,若是查询未能从并行查询中受益,则按原顺序查询。下面将详细介绍并行操做的方式。

 

7.1 泛型委托

使用并行编程能够同时操做多个委托,在介绍并行编程前先简单介绍一下两个泛型委托System.Func<>与System.Action<>。

Func<>是一个能接受多个参数和一个返回值的泛型委托,它能接受0个到16个输入参数, 其中 T1,T2,T3,T4......T16 表明自定的输入类型,TResult为自定义的返回值。
public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)
..............
public delegate TResult Func<T1,T2, T3, ,T4, ...... ,T16,TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

Action<>与Func<>十分类似,不一样在于Action<>的返回值为void,Action能接受0~16个参数
public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

 

7.2 任务并行库(TPL)

System.Threading.Tasks中的类被统称为任务并行库(Task Parallel Library,TPL),TPL使用CLR线程池把工做分配到CPU,并能自动处理工做分区、线程调度、取消支持、状态管理以及其余低级别的细节操做,极大地简化了多线程的开发。

注意:TPL比Thread更具智能性,当它判断任务集并无从并行运行中受益,就会选择按顺序运行。但并不是全部的项目都适合使用并行开发,建立过多并行任务可能会损害程序的性能,下降运行效率。

TPL包括经常使用的数据并行与任务并行两种执行方式:

7.2.1 数据并行

数据并行的核心类就是System.Threading.Tasks.Parallel,它包含两个静态方法 Parallel.For 与 Parallel.ForEach, 使用方式与for、foreach相仿。经过这两个方法能够并行处理System.Func<>、System.Action<>委托。

如下一个例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法对List<Person>进行并行查询。
假设使用单线程方式查询3个Person对象,须要用时大约6秒,在使用并行方式,只需使用2秒就能完成查询,并且可以避开Thread的繁琐处理。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7 //并行查询
8 Parallel.For(0, 3,n =>
9 {
10 Thread.Sleep(2000); //模拟查询
11 ThreadPoolMessage(GetPersonList()[n]);
12 });
13 Console.ReadKey();
14 }
15
16 //模拟源数据
17 static IList<Person> GetPersonList()
18 {
19 var personList = new List<Person>();
20
21 var person1 = new Person();
22 person1.ID = 1;
23 person1.Name = "Leslie";
24 person1.Age = 30;
25 personList.Add(person1);
26 ...........
27 return personList;
28 }
29
30 //显示线程池现状
31 static void ThreadPoolMessage(Person person)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
37 " CompletionPortThreads is :{5}\n",
38 person.ID, person.Name, person.Age,
39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
复制代码

观察运行结果,对象并不是按照原排列顺序进行查询,而是使用并行方式查询。

 

若想中止操做,能够利用ParallelLoopState参数,下面以ForEach做为例子。
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
其中source为数据集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState参数当中包含有Break()和 Stop()两个方法均可以使迭代中止。Break的使用跟传统for里面的使用方式类似,但由于处于并行处理当中,使用Break并不能保证全部运行能当即中止,在当前迭代以前的迭代会继续执行。若想当即中止操做,可使用Stop方法,它能保证当即终止全部的操做,不管它们是处于当前迭代的以前仍是以后。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //并行查询
9 Parallel.ForEach(GetPersonList(), (person, state) =>
10 {
11 if (person.ID == 2)
12 state.Stop();
13 ThreadPoolMessage(person);
14 });
15 Console.ReadKey();
16 }
17
18 //模拟源数据
19 static IList<Person> GetPersonList()
20 {
21 var personList = new List<Person>();
22
23 var person1 = new Person();
24 person1.ID = 1;
25 person1.Name = "Leslie";
26 person1.Age = 30;
27 personList.Add(person1);
28 ..........
29 return personList;
30 }
31
32 //显示线程池现状
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5}\n",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42
43 Console.WriteLine(message);
44 }
45 }
复制代码

观察运行结果,当Person的ID等于2时,运行将会中止。

 

当要在多个线程中调用本地变量,可使用如下方法:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
其中第一个参数为数据集;
第二个参数是一个Func委托,用于在每一个线程执行前进行初始化;
第 三个参数是委托Func<Of T1,T2,T3,TResult>,它能对数据集的每一个成员进行迭代,当中T1是数据集的成员,T2是一个ParallelLoopState对 象,它能够控制迭代的状态,T3是线程中的本地变量;
第四个参数是一个Action委托,用于对每一个线程的最终状态进行最终操做。

在如下例子中,使用ForEach计算多个Order的整体价格。在ForEach方法中,首先把参数初始化为0f,而后用把同一个Order的多个OrderItem价格进行累加,计算出Order的价格,最后把多个Order的价格进行累加,计算出多个Order的整体价格。

复制代码
 1     public class Order
2 {
3 public int ID;
4 public float Price;
5 }
6
7 public class OrderItem
8 {
9 public int ID;
10 public string Goods;
11 public int OrderID;
12 public float Price;
13 public int Count;
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 //设置最大线程数
21 ThreadPool.SetMaxThreads(1000, 1000);
22 float totalPrice = 0f;
23 //并行查询
24 var parallelResult = Parallel.ForEach(GetOrderList(),
25 () => 0f, //把参数初始值设为0
26 (order, state, orderPrice) =>
27 {
28 //计算单个Order的价格
29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID)
30 .Sum(item => item.Price * item.Count);
31 order.Price = orderPrice;
32 ThreadPoolMessage(order);
33
34 return orderPrice;
35 },
36 (finallyPrice) =>
37 {
38 totalPrice += finallyPrice;//计算多个Order的整体价格
39 }
40 );
41
42 while (!parallelResult.IsCompleted)
43 Console.WriteLine("Doing Work!");
44
45 Console.WriteLine("Total Price is:" + totalPrice);
46 Console.ReadKey();
47 }
48 //虚拟数据
49 static IList<Order> GetOrderList()
50 {
51 IList<Order> orderList = new List<Order>();
52 Order order1 = new Order();
53 order1.ID = 1;
54 orderList.Add(order1);
55 ............
56 return orderList;
57 }
58 //虚拟数据
59 static IList<OrderItem> GetOrderItem()
60 {
61 IList<OrderItem> itemList = new List<OrderItem>();
62
63 OrderItem orderItem1 = new OrderItem();
64 orderItem1.ID = 1;
65 orderItem1.Goods = "iPhone 4S";
66 orderItem1.Price = 6700;
67 orderItem1.Count = 2;
68 orderItem1.OrderID = 1;
69 itemList.Add(orderItem1);
70 ...........
71 return itemList;
72 }
73
74 //显示线程池现状
75 static void ThreadPoolMessage(Order order)
76 {
77 int a, b;
78 ThreadPool.GetAvailableThreads(out a, out b);
79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" +
80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" +
81 " CompletionPortThreads is:{4}\n",
82 order.ID, order.Price,
83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
84
85 Console.WriteLine(message);
86 }
87 }
复制代码

运行结果

 

 7.2.2 任务并行

在TPL当中还可使用Parallel.Invoke方法触发多个异步任务,其中 actions 中能够包含多个方法或者委托,parallelOptions用于配置Parallel类的操做。
public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
下面例子中利用了Parallet.Invoke并行查询多个Person,actions当中能够绑定方法、lambda表达式或者委托,注意绑定方法时必须是返回值为void的无参数方法。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //任务并行
9 Parallel.Invoke(option,
10 PersonMessage,
11 ()=>ThreadPoolMessage(GetPersonList()[1]),
12 delegate(){
13 ThreadPoolMessage(GetPersonList()[2]);
14 });
15 Console.ReadKey();
16 }
17
18 static void PersonMessage()
19 {
20 ThreadPoolMessage(GetPersonList()[0]);
21 }
22
23 //显示线程池现状
24 static void ThreadPoolMessage(Person person)
25 {
26 int a, b;
27 ThreadPool.GetAvailableThreads(out a, out b);
28 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
29 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
30 " CompletionPortThreads is :{5}\n",
31 person.ID, person.Name, person.Age,
32 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
33
34 Console.WriteLine(message);
35 }
36
37 //模拟源数据
38 static IList<Person> GetPersonList()
39 {
40 var personList = new List<Person>();
41
42 var person1 = new Person();
43 person1.ID = 1;
44 person1.Name = "Leslie";
45 person1.Age = 30;
46 personList.Add(person1);
47 ..........
48 return personList;
49 }
50 }
复制代码

运行结果

 

 

7.3 Task简介

以Thread建立的线程被默认为前台线程,固然你能够把线程IsBackground属性设置为true,但TPL为此提供了一个更简单的类Task。
Task存在于System.Threading.Tasks命名空间当中,它能够做为异步委托的简单替代品。
经过Task的Factory属性将返回TaskFactory类,以TaskFactory.StartNew(Action)方法能够建立一个新线程,所建立的线程默认为后台线程。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Task.Factory.StartNew(() => ThreadPoolMessage());
7 Console.ReadKey();
8 }
9
10 //显示线程池现状
11 static void ThreadPoolMessage()
12 {
13 int a, b;
14 ThreadPool.GetAvailableThreads(out a, out b);
15 string message = string.Format("CurrentThreadId is:{0}\n" +
16 "CurrentThread IsBackground:{1}\n" +
17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n",
18 Thread.CurrentThread.ManagedThreadId,
19 Thread.CurrentThread.IsBackground.ToString(),
20 a.ToString(), b.ToString());
21 Console.WriteLine(message);
22 }
23 }
复制代码

运行结果

 

 

若要取消处理,能够利用CancellationTakenSource对象,在TaskFactory中包含有方法
public Task StartNew( Action action, CancellationToken cancellationToken )
在方法中加入CancellationTakenSource对象的CancellationToken属性,能够控制任务的运行,调用CancellationTakenSource.Cancel时任务就会自动中止。下面以图片下载为例子介绍一下TaskFactory的使用。

服务器端页面

复制代码
 1 <html xmlns="http://www.w3.org/1999/xhtml">
2 <head runat="server">
3 <title></title>
4 <script type="text/C#" runat="server">
5 private static List<string> url=new List<string>();
6
7 protected void Page_Load(object sender, EventArgs e)
8 {
9 if (!Page.IsPostBack)
10 {
11 url.Clear();
12 Application["Url"] = null;
13 }
14 }
15
16 protected void CheckBox_CheckedChanged(object sender, EventArgs e)
17 {
18 CheckBox checkBox = (CheckBox)sender;
19 if (checkBox.Checked)
20 url.Add(checkBox.Text);
21 else
22 url.Remove(checkBox.Text);
23 Application["Url"]= url;
24 }
25 </script>
26 </head>
27 <body>
28 <form id="form1" runat="server" >
29 <div align="left">
30 <div align="center" style="float: left;">
31 <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br />
32 <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
33 oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" />
34 </div>
35 <div align="center" style="float: left">
36 <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br />
37 <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
38 oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" />
39 </div>
40 <div align="center" style="float: left">
41 <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br />
42 <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
43 oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" />
44 </div>
45 <div align="center" style="float: left">
46 <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br />
47 <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
48 oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" />
49 </div>
50 <div align="center" style="float: left">
51 <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br />
52 <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
53 oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" />
54 </div>
55 </div>
56 </form>
57 </body>
58 </html>
复制代码

首先在服务器页面中显示多个*.jpg图片,每一个图片都有对应的CheckBox检测其选择状况。
所选择图片的路径会记录在Application["Url"]当中传递到Handler.ashx当中。

注意:Application是一个全局变量,此处只是为了显示Task的使用方式,在ASP.NET开发应该慎用Application。

Handler.ashx 处理图片的下载,它从 Application["Url"] 当中获取所选择图片的路径,并把图片转化成byte[]二进制数据。
再把图片的数量,每副图片的二进制数据的长度记录在OutputStream的头部。
最后把图片的二进制数据记入 OutputStream 一并输出。

复制代码
 1 public class Handler : IHttpHandler 
2 {
3 public void ProcessRequest(HttpContext context)
4 {
5 //获取图片名,把图片数量写OutputStream
6 List<String> urlList = (List<string>)context.Application["Url"];
7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4);
8
9 //把图片转换成二进制数据
10 List<string> imageList = GetImages(urlList);
11
12 //把每副图片长度写入OutputStream
13 foreach (string image in imageList)
14 {
15 byte[] imageByte=Convert.FromBase64String(image);
16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4);
17 }
18
19 //把图片写入OutputStream
20 foreach (string image in imageList)
21 {
22 byte[] imageByte = Convert.FromBase64String(image);
23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length);
24 }
25 }
26
27 //获取多个图片的二进制数据
28 private List<string> GetImages(List<string> urlList)
29 {
30 List<string> imageList = new List<string>();
31 foreach (string url in urlList)
32 imageList.Add(GetImage(url));
33 return imageList;
34 }
35
36 //获取单副图片的二进制数据
37 private string GetImage(string url)
38 {
39 string path = "E:/My Projects/Example/WebSite/Images/"+url;
40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read);
41 byte[] imgBytes = new byte[10240];
42 int imgLength = stream.Read(imgBytes, 0, 10240);
43 return Convert.ToBase64String(imgBytes,0,imgLength);
44 }
45
46 public bool IsReusable
47 {
48 get{ return false;}
49 }
50 }
复制代码

 

客户端

创建一个WinForm窗口,里面加入一个WebBrowser链接到服务器端的Default.aspx页面。
当按下Download按键时,系统就会利用TaskFactory.StartNew的方法创建异步线程,使用WebRequest方法向Handler.ashx发送请求。
接收到回传流时,就会根据头文件的内容判断图片的数量与每副图片的长度,把二进制数据转化为*.jpg文件保存。

系统利用TaskFactory.StartNew(action,cancellationToken) 方式异步调用GetImages方法进行图片下载。 
当用户按下Cancel按钮时,异步任务就会中止。值得注意的是,在图片下载时调用了CancellationToken.ThrowIfCancellationRequested方法,目的在检查并行任务的运行状况,在并行任务被中止时释放出OperationCanceledException异常,确保用户按下Cancel按钮时,中止全部并行任务。

复制代码
 1     public partial class Form1 : Form
2 {
3 private CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 public Form1()
6 {
7 InitializeComponent();
8 ThreadPool.SetMaxThreads(1000, 1000);
9 }
10
11 private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
12 {
13 Task.Factory.StartNew(GetImages,tokenSource.Token);
14 }
15
16 private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
17 {
18 tokenSource.Cancel();
19 }
20
21 private void GetImages()
22 {
23 //发送请求,获取输出流
24 WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx");
25 Stream responseStream=webRequest.GetResponse().GetResponseStream();
26
27 byte[] responseByte = new byte[81960];
28 IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null);
29 int responseLength = responseStream.EndRead(result);
30
31 //获取图片数量
32 int imageCount = BitConverter.ToInt32(responseByte, 0);
33
34 //获取每副图片的长度
35 int[] lengths = new int[imageCount];
36 for (int n = 0; n < imageCount; n++)
37 {
38 int length = BitConverter.ToInt32(responseByte, (n + 1) * 4);
39 lengths[n] = length;
40 }
41 try
42 {
43 //保存图片
44 for (int n = 0; n < imageCount; n++)
45 {
46 string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n);
47 FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite);
48
49 //计算字节偏移量
50 int offset = (imageCount + 1) * 4;
51 for (int a = 0; a < n; a++)
52 offset += lengths[a];
53
54 file.Write(responseByte, offset, lengths[n]);
55 file.Flush();
56
57 //模拟操做
58 Thread.Sleep(1000);
59
60 //检测CancellationToken变化
61 tokenSource.Token.ThrowIfCancellationRequested();
62 }
63 }
64 catch (OperationCanceledException ex)
65 {
66 MessageBox.Show("Download cancel!");
67 }
68 }
69 }
复制代码



7.4 并行查询(PLINQ)

并行 LINQ (PLINQ) 是 LINQ 模式的并行实现,主要区别在于 PLINQ 尝试充分利用系统中的全部处理器。 它利用全部处理器的方法,把数据源分红片断,而后在多个处理器上对单独工做线程上的每一个片断并行执行查询, 在许多状况下,并行执行意味着查询运行速度显著提升。但这并不说明全部PLINQ都会使用并行方式,当系统测试要并行查询会对系统性能形成损害时,那将自动化地使用同步执行。
在System.Linq.ParallelEnumerable类中,包含了并行查询的大部分方法。
 

方法成员 

说明

AsParallel

PLINQ 的入口点。 指定若是可能,应并行化查询的其他部分。

AsSequential(Of TSource)

指定查询的其他部分应像非并行 LINQ 查询同样按顺序运行。

AsOrdered

指定 PLINQ 应保留查询的其他部分的源序列排序,直到例如经过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。

AsUnordered(Of TSource)

指定查询的其他部分的 PLINQ 不须要保留源序列的排序。

WithCancellation(Of TSource)

指定 PLINQ 应按期监视请求取消时提供的取消标记和取消执行的状态。

WithDegreeOfParallelism(Of TSource)

指定 PLINQ 应当用来并行化查询的处理器的最大数目。

WithMergeOptions(Of TSource)

提供有关 PLINQ 应当如何(若是可能)将并行结果合并回到使用线程上的一个序列的提示。

WithExecutionMode(Of TSource)

指定 PLINQ 应当如何并行化查询(即便默认行为是按顺序运行查询)。

ForAll(Of TSource)

多线程枚举方法,与循环访问查询结果不一样,它容许在不首先合并回到使用者线程的状况下并行处理结果。

Aggregate 重载

对于 PLINQ 惟一的重载,它启用对线程本地分区的中间聚合以及一个用于合并全部分区结果的最终聚合函数。

 

7.4.1 AsParallel

一般想要实现并行查询,只需向数据源添加 AsParallel 查询操做便可。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel()
6 .Where(x=>x.Age>30);
7 Console.ReadKey();
8 }
9
10 //模拟源数据
11 static IList<Person> GetPersonList()
12 {
13 var personList = new List<Person>();
14
15 var person1 = new Person();
16 person1.ID = 1;
17 person1.Name = "Leslie";
18 person1.Age = 30;
19 personList.Add(person1);
20 ...........
21 return personList;
22 }
23 }
复制代码

 

7.4.2 AsOrdered

若要使查询结果必须保留源序列排序方式,可使用AsOrdered方法。 
AsOrdered依然使用并行方式,只是在查询过程加入额外信息,在并行结束后把查询结果再次进行排列。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().AsOrdered()
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {......}
12 }
复制代码


7.4.3 WithDegreeOfParallelism

默认状况下,PLINQ 使用主机上的全部处理器,这些处理器的数量最多可达 64 个。
经过使用 WithDegreeOfParallelism(Of TSource) 方法,能够指示 PLINQ 使用很少于指定数量的处理器。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {.........}
12 }
复制代码

 

7.4.4 ForAll

若是要对并行查询结果进行操做,通常会在for或foreach中执行,执行枚举操做时会使用同步方式。
有见及此,PLINQ中包含了ForAll方法,它可使用并行方式对数据集进行操做。

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 GetPersonList().AsParallel().ForAll(person =>{
7 ThreadPoolMessage(person);
8 });
9 Console.ReadKey();
10 }
11
12 static IList<Person> GetPersonList()
13 {.......}
14
15 //显示线程池现状
16 static void ThreadPoolMessage(Person person)
17 {
18 int a, b;
19 ThreadPool.GetAvailableThreads(out a, out b);
20 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
21 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
22 " CompletionPortThreads is :{5}\n",
23 person.ID, person.Name, person.Age,
24 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
25 Console.WriteLine(message);
26 }
27 }
复制代码

运行结果

 

7.4.5 WithCancellation

若是须要中止查询,可使用 WithCancellation(Of TSource) 运算符并提供 CancellationToken 实例做为参数。 
与第三节Task的例子类似,若是标记上的 IsCancellationRequested 属性设置为 true,则 PLINQ 将会注意到它,并中止全部线程上的处理,而后引起 OperationCanceledException。这能够保证并行查询可以当即中止。

复制代码
 1     class Program
2 {
3 static CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 static void Main(string[] args)
6 {
7 Task.Factory.StartNew(Cancel);
8 try
9 {
10 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
11 .ForAll(person =>
12 {
13 ThreadPoolMessage(person);
14 });
15 }
16 catch (OperationCanceledException ex)
17 { }
18 Console.ReadKey();
19 }
20
21 //在10~50毫秒内发出中止信号
22 static void Cancel()
23 {
24 Random random = new Random();
25 Thread.Sleep(random.Next(10,50));
26 tokenSource.Cancel();
27 }
28
29 static IList<Person> GetPersonList()
30 {......}
31
32 //显示线程池现状
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5}\n",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
45
复制代码

 

回到目录

8、定时器与锁

8.1定时器

若要长期定时进行一些工做,好比像邮箱更新,实时收听信息等等,能够利用定时器Timer进行操做。
在System.Threading命名空间中存在Timer类与对应的TimerCallback委托,它能够在后台线程中执行一些长期的定时操做,使主线程不受干扰。
Timer类中最经常使用的构造函数为 public Timer( timerCallback , object , int , int )
timerCallback委托能够绑定执行方法,执行方法必须返回void,它能够是无参数方法,也能够带一个object参数的方法。
第二个参数是为 timerCallback 委托输入的参数对象。
第三个参数是开始执行前等待的时间。
第四个参数是每次执行之间的等待时间。

开发实例

复制代码
 1     class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6
7 TimerCallback callback = new TimerCallback(ThreadPoolMessage);
8 Timer t = new Timer(callback,"Hello Jack! ", 0, 1000);
9 Console.ReadKey();
10 }
11
12 //显示线程池现状
13 static void ThreadPoolMessage(object data)
14 {
15 int a, b;
16 ThreadPool.GetAvailableThreads(out a, out b);
17 string message = string.Format("{0}\n CurrentThreadId is:{1}\n" +
18 " CurrentThread IsBackground:{2}\n" +
19 " WorkerThreads is:{3}\n CompletionPortThreads is:{4}\n",
20 data + "Time now is " + DateTime.Now.ToLongTimeString(),
21 Thread.CurrentThread.ManagedThreadId,
22 Thread.CurrentThread.IsBackground.ToString(),
23 a.ToString(), b.ToString());
24 Console.WriteLine(message);
25 }
26 }
复制代码

注意观察运行结果,每次调用Timer绑定的方法时不必定是使用同一线程,但线程都会是来自工做者线程的后台线程。


8.2 锁

在使用多线程开发时,存在必定的共用数据,为了不多线程同时操做同一数据,.NET提供了lock、Monitor、Interlocked等多个锁定数据的方式。

8.2.1 lock

lock的使用比较简单,若是须要锁定某个对象时,能够直接使用lock(this)的方式。

复制代码
1 private void Method()
2 {
3 lock(this)
4 {
5 //在此进行的操做能保证在同一时间内只有一个线程对此对象操做
6 }
7 }
复制代码

若是操做只锁定某段代码,能够事先创建一个object对象,并对此对象进行操做锁定,这也是.net提倡的锁定用法。

复制代码
 1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 lock(obj)
8 {.......}
9 }
10 }
复制代码

 

8.2.2 Montior

Montior存在于System.Thread命名空间内,相比lock,Montior使用更灵活。
它存在 Enter, Exit 两个方法,它能够对对象进行锁定与解锁,比lock使用更灵活。

复制代码
 1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 Monitor.Enter(obj);
8 try
9 {......}
10 catch(Excetion ex)
11 {......}
12 finally
13 {
14 Monitor.Exit(obj);
15 }
16 }
17 }
18
复制代码

使用try的方式,能确保程序不会因死锁而释放出异常!
并且在finally中释放obj对象可以确保不管是否出现死锁状态,系统都会释放obj对象。
并且Monitor中还存在Wait方法可让线程等待一段时间,而后在完成时使用Pulse、PulseAll等方法通知等待线程。

 

8.2.3 Interlocked

Interlocked存在于System.Thread命名空间内,它的操做比Monitor使用更简单。
它存在CompareExchange、Decrement、Exchange、Increment等经常使用方法让参数在安全的状况进行数据交换。

Increment、Decrement 可使参数安全地加1或减1并返回递增后的新值。

复制代码
1 class Example
2 {
3 private int a=1;
4
5 public void AddOne()
6 {
7 int newA=Interlocked.Increment(ref a);
8 }
9 }
复制代码

Exchange能够安全地变量赋值。

1 public void SetData()
2 {
3 Interlocked.Exchange(ref a,100);
4 }

CompareExchange使用特别方便,它至关于if的用法,当a等于1时,则把100赋值给a。

1 public void CompareAndExchange()2 {3     Interlocked.CompareExchange(ref a,100,1);4 }
相关文章
相关标签/搜索