.NET中的Socket类提供了网络通讯经常使用的方法,分别提供了同步和异步两个版本,其中异步的实现是基于APM异步模式实现,即BeginXXX/EndXXX的方式。异步方法因为其非阻塞的特性,在需考虑程序性能和伸缩性的状况下,通常会选择使用异步方法。但使用过Socket提供的异步方法的同窗,应该都会注意到了Socket的异步方法是没法设置Timeout的。以Receive操做为例,Socket提供了一个ReceiveTimeout属性,但该属性设置的是同步版本的Socket.Receive()方法的Timeout值,该设置对异步的Socket.BeginReceive()无效:若是对方没有返回任何消息,则BeginReceive操做将没法完成,其中提供的回调函数也将不会调用。以下示例代码所示:html
private static void TestSocketBeginReceive() { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); byte[] content = Encoding.ASCII.GetBytes("Hello world"); IPAddress ip = Dns.Resolve("www.google.com").AddressList[0]; IPEndPoint receiver = new IPEndPoint(ip, 80); socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, receiver, SendToCb, socket); Console.WriteLine("Sent bytes: " + content.Length); } private static void SendToCb(IAsyncResult ar) { var socket = ar.AsyncState as Socket; socket.EndSendTo(ar); byte[] buffer = new byte[1024]; IAsyncResult receiveAr = socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, null, null); int received = socket.EndReceive(receiveAr); Console.WriteLine("Received bytes: " + received); }
因为接收方不会返回任何消息,Socket.BeginReceive将永远不会完成,SentToCb方法中的socket.EndReceive()调用将永远阻塞,应用程序也没法得知操做的状态。网络
在个别的应用场景下,咱们但愿既能使用Socket的异步通讯方法,保证程序的性能,同时又但愿能指定Timeout值,当操做没有在指定的时间内完成时,应用程序能获得通知,以进行下一步的操做,如retry等。如下介绍的就是一种支持Timeout的Socket异步Receive操做的实现,方式以下:异步
如下代码简化了全部的参数检查和异常处理,实际使用中需添加相关逻辑。socket
首先看一下IAsyncResult接口的实现:async
public class AsyncResultWithTimeout : IAsyncResult { private ManualResetEvent m_waitHandle = new ManualResetEvent(false); public AsyncResultWithTimeout(AsyncCallback cb, object state) { this.AsyncState = state; this.Callback = cb; } #region IAsyncResult public object AsyncState { get; private set; } public WaitHandle AsyncWaitHandle { get { return m_waitHandle; } } public bool CompletedSynchronously { get { return false; } } public bool IsCompleted { get; private set; } #endregion public AsyncCallback Callback { get; private set; } public int ReceivedCount { get; private set; } public bool TimedOut { get; private set; } public void SetResult(int count) { this.IsCompleted = true; this.ReceivedCount = count; this.m_waitHandle.Set(); if (Callback != null) Callback(this); } public void SetTimeout() { this.TimedOut = true; this.IsCompleted = true; this.m_waitHandle.Set(); } }
AsyncResultWithTimeOut类中包含了IAsyncResult接口中4个属性的实现、用户传入的AsyncCallback委托、接收到的字节数ReceivedCount以及两个额外的方法:函数
StateInfo类用于保存相关的状态信息,该对象会做为Socket.BeginReceive()的最后一个参数传入。当接收到消息时,接收到的字节数会保存到AsyncResult属性中,并设置操做完成。当超时时,WatchTimeOut方法会将AsyncResult设置为TimeOut状态,并经过RegisteredWaitHandle属性取消注册的WaitOrTimerCallback.post
public class StateInfo { public StateInfo(AsyncResultWithTimeout result, Socket socket) { this.AsycResult = result; this.Socket = socket; } public Socket Socket { get; private set; } public AsyncResultWithTimeout AsycResult { get; private set; } public RegisteredWaitHandle RegisteredWaitHandle { get; set; } }
封装Socket.BeginReceive性能
与Socket.BeginReceive方法相比,BeginReceive2添加了一个参数timeout,能够设置该操做的超时时间,单位为毫秒。BeginReceive2中调用Socket.BeginReceive()方法,其中指定的ReceiveCb回调将在正常接收到消息后将结果保存在stateInfo对象的AsyncResult属性中,该属性中的值就是BeginReceive2()方法返回的IAsyncResult。BeginReceive2调用Socket.BeginReceive后,在ThreadPool中注册了一个WaitOrTimerCallback委托。ThreadPool将在Receive操做完成或者Timeout时调用该委托。this
public static class SocketExtension { public static int EndReceive2(IAsyncResult ar) { var result = ar as AsyncResultWithTimeout; result.AsyncWaitHandle.WaitOne(); return result.ReceivedCount; } public static AsyncResultWithTimeout BeginReceive2 ( this Socket socket, int timeout, byte[] buffer, int offset, int size, SocketFlags flags, AsyncCallback callback, object state ) { var result = new AsyncResultWithTimeout(callback, state); var stateInfo = new StateInfo(result, socket); socket.BeginReceive(buffer, offset, size, flags, ReceiveCb, state); var registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject( result.AsyncWaitHandle, WatchTimeOut, stateInfo, // 做为state传递给WatchTimeOut timeout, true); // stateInfo中保存RegisteredWaitHandle,以方便在úWatchTimeOut // 中unregister. stateInfo.RegisteredWaitHandle = registeredWaitHandle; return result; } private static void WatchTimeOut(object state, bool timeout) { var stateInfo = state as StateInfo; // 设置的timeout前,操做未完成,则设置为操做Timeout if (timeout) { stateInfo.AsycResult.SetTimeout(); } // 取消以前注册的WaitOrTimerCallback stateInfo.RegisteredWaitHandle.Unregister( stateInfo.AsycResult.AsyncWaitHandle); } private static void ReceiveCb(IAsyncResult result) { var state = result.AsyncState as StateInfo; var asyncResultWithTimeOut = state.AsycResult; var count = state.Socket.EndReceive(result); state.AsycResult.SetResult(count); } }
如下代码演示了如何使用BeginReceive2:google
private static void TestSocketBeginReceive2() { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); byte[] content = Encoding.ASCII.GetBytes("Hello world"); IPAddress ip = Dns.Resolve("www.google.com").AddressList[0]; IPEndPoint receiver = new IPEndPoint(ip, 80); socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, receiver, SendToCb2, socket); Console.WriteLine("Sent bytes: " + content.Length); } private static void SendToCb2(IAsyncResult ar) { var socket = ar.AsyncState as Socket; socket.EndSendTo(ar); byte[] buffer = new byte[1024]; AsyncResultWithTimeout receiveAr = socket.BeginReceive2(2000, buffer, 0, buffer.Length, SocketFlags.None, null, null); receiveAr.AsyncWaitHandle.WaitOne(); if (receiveAr.TimedOut) { Console.WriteLine("Operation timed out."); } else { int received = socket.EndReceive(ar); Console.WriteLine("Received bytes: " + received); } }
输出结果以下:
上述实现是针对BeginReceive的封装,还能够以相同的方式将Send/Receive封装以支持Timeout, 或者更进一步支持retry操做。
附示例代码:下载
出处:http://www.cnblogs.com/dytes/archive/2012/08/13/SocketAsyncOpWithTimeout.html