两周前用长轮询作了一个Chat,并移植到了Azure,还写了篇博客http://www.cnblogs.com/indream/p/3187540.html,让你们帮忙测试。html
首先感谢300位注册用户,让我有充足的数据进行重构和优化。因此这两周都在进行大重构。ajax
其中最大的一个问题就是数据流量过大,原先已有更新,还会有Web传统“刷新”的形式把数据从新拿一次,而后再替换掉本地数据。json
但这一拿问题就来了,在10个Chat*300个用户的状况下,这一拿产生了一次8M多的流量,这是十分严重的事情,特别是其中绝大部分数据都是浪费掉了的。跨域
那么解决方案就很简单了,把“全量”改为“增量”,只传输修改的部分,同时大量增长往返次数,把每次往返量压缩。缓存
固然,这篇文章主要讲长轮询,也是以后被问得比较多的方面,因此就单独写篇文章出来了。服务器
此次比单纯的轮询多了一个缓存行为,以解决每次“心跳”中所产生的断线间隔数据丢失的问题。网络
首先列举一下所使用到的技术点:并发
长轮询是一种相似于JSONP同样畸形的Web通讯技术,用以实现Web与服务端之间的实时双向通讯。dom
在有人实现JSONP以前,单纯的JS或者说Web是没法实现原生地有效地实现跨域通讯的;而在有了JSONP以后,这项工做就变得简单了,虽然实现方法很“畸形(或者说有创意吧)”。异步
一样,在有长轮询以前,还没出现HTML5 Web Socket的时代,单纯的Web没法与服务器进行实时通讯,HTTP限制了通讯行为只能是有客户端发起请求,而后服务端针对该请求进行回应。
长轮询所作的就是把原有的协议“漏洞”利用起来,使得客户端和服务端之间在HTML 4.1(部分更低版本应该也能够兼容)下能够实时通讯。
HTTP协议自己有两个“漏洞”,也是如今网络通讯中没法避免的。
一个是请求(Request)和答复(Response)之间没法确认其链接情况,可就没法肯定其所用的时限了。
判断客户端与服务端是否相连的一个标准就是客户端的请求是否能收到服务端的答复,若是收获得,就说明链接上了,即时收到的是服务端错误的通知(好比404 not found)。
第二漏洞就是在获取到答复(Response)前,都没法知道所须要的数据内容是怎么样的(若是有还跟人家要啥)。
长轮询就是利用了这两个“漏洞”:服务端收到请求(Request)后,将该请求Hold住不立刻答复,而是一直等,等到服务端有信息须要发送给客户端的时候,经过将刚才Hold住的那条请求(Request)的答复(Response)发回给客户端,让客户端做出反应。而返回的内容,呵呵呵呵呵,那就随便服务端了。
而后,客户端收到答复(Response)后,立刻再从新发送一次请求(Request)给服务端,让服务端再Hold住这条链接。周而复始,就实现了从服务端向客户端发送消息的实时通讯,客户端向服务端发送消息则依旧利用传统的Post和Get进行。
受Web通讯现实状况限制,若是服务端长时间没有消息须要推送到客户端的时候,也不能一直Hold住那条连接,由于颇有可能被断定为网关超时等超时状况。因此即便没有消息,每间隔一段时间,服务端也要返回一个答复(Response),让客户端从新请求一个连接。
见过一些人喜欢把每次轮询的断开到下次轮询开始客户端的接收->再请求的行为称之为一次“心跳(Beat)”,也挺贴切的。
要实现真正的实时通讯,长轮询的实现并不那么简单,由于每次“心跳”时会产生一个小间隙,这个间隙的时候服务端已经将上一个答复(Response)返回,但尚未接收到客户端的下一次请求(Request)。那么这时候,服务端若是有最新消息,就没法推送给客户端了,因此须要将这些消息缓存起来,等到下一次机会到来的时候再XXOO。
若是是AJAX的话,通常都是用jQuery进行实现。何况,毕竟还用了JSONP,手动写起来在工做中实在不划算。
到了Web端的代码,就变得很容易了,如下内容直接从项目中节选,只是做了一些山间
1 getJsonp: function (url, data, callback, errorCallback) { 2 $.ajax({ 3 url: url, 4 data: data, 5 type: "POST", 6 dataType: "jsonp", 7 jsonpCallback: "callback" + Math.random().toString().replace('.', ''), 8 success: callback, 9 error: errorCallback 10 }); 11 }, 12 //轮询的锁,保证每一个轮询有且仅有一个 13 pollingLocks: { 14 }, 15 //轮询的重试时间 16 pollingRetries: { 17 }, 18 //轮询错误的callBack缓存 19 pollingCallbacks: [], 20 //轮询 21 //listeningCode: 监听编码,与服务器的一个契约,单个监听编码在服务器中有对应的一个缓冲池,以保留该监听相关信息 22 //url: 目标地址 23 //data: 请求时的参数 24 //lockName: 锁名,一样的锁名在同一时间只会出现一个轮询 25 //callbakc: 接收到服务端数据后的回调 26 polling: function (listeningCode, url, data, lockName, callback) { 27 var comet = chatConnectionProvider.connections.comet; 28 29 //判断是否有锁,排他,不容许重复监听,保持单一连接 30 if (!comet.pollingLocks[lockName]) { 31 //锁住监听 32 comet.pollingLocks[lockName] = true; 33 comet.getJsonp(url, data, function (cometCallbackData) { 34 var listeningCode = cometCallbackData.ListeningCode; 35 //将消息发回 36 for (var i in cometCallbackData.Callbacks) { 37 callback(cometCallbackData.Callbacks[i]); 38 } 39 //将监听编码添加到请求数据中,以和服务器的监听编码保持一致 40 data = data || {}; 41 data.listeningCode = cometCallbackData.ListeningCode; 42 //解锁后继续监听 43 comet.pollingLocks[lockName] = false; 44 comet.polling(listeningCode, url, data, lockName, callback); 45 }, function (jqXHR, textStatus, errorThrown) { 46 //若是发生错误,则重试,而且逐步加大重试时间,以减低服务器压力,以100毫秒开始,每次加倍 47 comet.pollingRetries[lockName] = comet.pollingRetries[lockName] * 2 || 100; 48 //将回调函数暂存 49 chatConnectionProvider.connections.comet.pollingCallbacks[lockName] = callback; 50 var rePollingMethors = 'chatConnectionProvider.connections.comet.pollingLocks["' + lockName + '"] = false;'//先解锁,在解锁以前排他,不容许重复轮询 51 + 'chatConnectionProvider.connections.comet.polling("' + listeningCode + '", "' + url + '", "' + data + '", "' + lockName + '", chatConnectionProvider.connections.comet.pollingCallbacks["' + lockName + '"]);'; 52 setTimeout(rePollingMethors, comet.pollingRetries[lockName]); 53 }); 54 } 55 },
一开始我花了比较长时间寻找服务端Hold住请求的方法。
普通状况下,一个Web的请求是同步执行的,若是须要转成异步的话,须要对线程进行操做。好比一开始我最白痴的想法是用自旋锁,或者用Thread相关的方法,而后在须要的时候采用一些Interup方法进行中断等等,都不容易写。
后来发现MVC中提供了比较合理的一种原生的异步页面方式,能够简单地实现同步转异步。
首先是Controller要由默认的Controller改成继承自AsyncController。该基类有一个私有成员AsyncManager,利用该对象能够简单地将同步转换成异步。
而本来有的方法,要拆分红两个方法来写,分别在两个方法用原名加上Async和Completed。
好比个人ListenController,里面有一个User方法,用以监听用户的数据。通过实现以后,就变成了ListenController : AsyncController,同时拥有一对User方法:UserAsync和UserCompleted。
那么,在页面请求Listen/User的时候,就会自动调用名称匹配的UserAsync方法。
在这以后,咱们就须要利用AsyncManager执行如下语句,将线程“挂起”(Hold住,这样懂了吧):
asyncManager.OutstandingOperations.Increment();
直到咱们有消息须要发送给用户的时候,经过如下方式对UserCompleted进行传参:
asyncManager.Parameters["listeningCode"] = Code;
而后再触发UserCompleted:
asyncManager.OutstandingOperations.Decrement();
再总体地看一次,ListenController就是长这个样子的:
1 public class ListenController : AsyncController 2 { 3 // 4 // GET: /Listen/ 5 6 ICometManager cometManager; 7 8 public ListenController() 9 { 10 cometManager = StructureMap.ObjectFactory.GetInstance<ICometManager>(); 11 } 12 13 /// <summary> 14 /// 监听用户的信息 15 /// </summary> 16 /// <param name="listeningCode">监听编码,若是为空则视为一次全新的监听,容许同以客户端开启多个网页进行多个监听</param> 17 public void UserAsync(int? listeningCode) 18 { 19 //开始监听用户 20 cometManager.ListenUser(listeningCode, AsyncManager); 21 } 22 23 /// <summary> 24 /// 返回用户的信息 25 /// </summary> 26 /// <param name="listeningCode">监听编码</param> 27 /// <returns></returns> 28 public JsonpResult UserCompleted(int listeningCode) 29 { 30 //获取用户全部的消息 31 var callbacks = cometManager.TakeAllUserCallbacks(listeningCode); 32 33 //将该消息返回 34 return Json(new 35 { 36 ListeningCode = listeningCode, 37 Callbacks = callbacks.Select(item => new CallbackModel(item)) 38 }) 39 .ToJsonp(); 40 } 41 }
CometManager就是我用来处理轮询的对象。
注意到在UserCompleted是经过了一个ICometManager.TakeAllUserCallbacks来获取用户的全部回调数据,而不是直接经过AsyncManager.Parameters发送。缘由是实现过程当中我发现没法经过AsyncManager.Parameters将自定义对象传参,因此采起了这种方式。或许,实现序列化后或者引用相关序列化方法,能实现如此传参。
在CometManager : ICometManager中,相关实现如此:
1 /// <summary> 2 /// 监听用户的方法 3 /// </summary> 4 /// <param name="listeningCode">指定监听编码,若是为空则为全新的监听</param> 5 /// <param name="asyncManager">监听来源页面的AsyncManager,用以处理异步与回调</param> 6 public void ListenUser(int? listeningCode, System.Web.Mvc.Async.AsyncManager asyncManager) 7 { 8 //监听新消息 9 userListenerQuery.Add(chatUserProvider.Current.Id, listeningCode, userListenManager, asyncManager); 10 } 11 12 /// <summary> 13 /// 取走用户全部回调结果 14 /// </summary> 15 /// <param name="listeningCode">监听者的Id</param> 16 /// <returns></returns> 17 public IEnumerable<CallbackModel> TakeAllUserCallbacks(int listeningCode) 18 { 19 return userListenerQuery.TakeAllCallback(listeningCode); 20 }
userListenerQuery是一个单例(Singleton)的监听队列;而UserListenManager是往上一层的监听管理对象,毕竟Chat自己不单止支持轮询,还须要支持其余通讯方式,因此往上有一个公共层管理着全部消息。
除了MVC自己提供的特有方法外,还须要一些传统的行为才能实现完整的长轮询。
接着上面,参照ListenQuery的实现:
1 Dictionary<int, CometListener> listenersDic; 2 Dictionary<int, DateTime> lastAddTimeDic; 3 4 public ListenerQuery() 5 { 6 listenersDic = new Dictionary<int, CometListener>(); 7 lastAddTimeDic = new Dictionary<int, DateTime>(); 8 } 9 10 /// <summary> 11 /// 添加一个监听 12 /// </summary> 13 /// <param name="listenToId">监听对象的Id</param> 14 /// <param name="listeningCode">原有监听者的编码</param> 15 /// <param name="listenManager">监听的相关业务管理对象</param> 16 /// <param name="asyncManager">页面的异步管理对象</param> 17 /// <returns>监听编码</returns> 18 public int Add(int listenToId, int? listeningCode, IListenManager<int> listenManager, AsyncManager asyncManager) 19 { 20 lock (listenersDic) 21 { 22 lock (lastAddTimeDic) 23 { 24 CometListener listener; 25 //若是监听者不存在,则生成,不然用原有的监听者 26 if (listeningCode == null || !listenersDic.ContainsKey(listeningCode.Value)) 27 { 28 ////生成其随机编码 29 //var seed = 10000; 30 //var random = new Random(seed); 31 //listeningCode = random.Next(seed); 32 //while (listenersDic.ContainsKey(listeningCode.Value)) 33 //{ 34 // listeningCode = random.Next(seed); 35 //} 36 //改成采用原有编码 37 38 //生成监听者并开始监听 39 Action<int> setListenerCode; 40 listener = new CometListener(out setListenerCode); 41 listenManager.ListenAsnyc(listenToId, listener, setListenerCode); 42 43 listeningCode = listener.Code; 44 //添加入本列表字典 45 listenersDic.Add(listeningCode.Value, listener); 46 //添加监听时间 47 lastAddTimeDic.Add(listeningCode.Value, DateTime.Now); 48 } 49 else 50 { 51 listener = listenersDic[listeningCode.Value]; 52 lastAddTimeDic[listeningCode.Value] = DateTime.Now; 53 } 54 55 //开始监听 56 listener.Begin(asyncManager); 57 58 //定时一次检查,若是监听超时,则清除监听 59 //设计倒计时,按期从新监听,以避免超时 60 var timeLimitInMilliSecond = 60000; 61 System.Timers.Timer timer = new System.Timers.Timer(timeLimitInMilliSecond); 62 63 //设置计时终结方法 64 timer.Elapsed += (sender, e) => 65 { 66 if (lastAddTimeDic[listeningCode.Value].AddSeconds(45) < DateTime.Now) 67 { 68 listenManager.StopListenAsnyc(listener); 69 } 70 }; 71 72 //启动倒计时 73 timer.Start(); 74 } 75 } 76 77 return listeningCode.Value; 78 } 79 80 /// <summary> 81 /// 取走全部回调结果 82 /// </summary> 83 /// <param name="listeningCode">监听者的Id</param> 84 /// <returns></returns> 85 public IEnumerable<CallbackModel> TakeAllCallback(int listeningCode) 86 { 87 return listenersDic[listeningCode].ShiftAllCallbacks(); 88 } 89 }
这里用了一个字典来记录每一个ListeningCode以及相关的Listener。
注意Add方法内有一个Timer。就像注释上所说的,按期检查用户是否在监听。我在这里设置了每30秒有一次“心跳”(Beat),而每次监听后的第60秒会来检查45秒内(暂时这么设置的,有待时间考验是否是个合适值)用户是否再来监听,若是没有则中止监听。
这么作的缘由是防止客户端单方面离婚毁约,而后服务端的Comet傻傻地在这里痴情地帮客户端继续保留缓存消息。这种状况时有出现,好比客户端还没等到答复(Response)就私奔关掉了页面,留下服务单在那边Hold住链接傻傻地等待。
注意凡是处理队列类的地方都有锁,以防止并发问题。
那么最后,CometListener的实现就以下:
1 public class CometListener : Listen.IListener 2 { 3 AsyncManager asyncManager; 4 5 List<CallbackModel> callbacks; 6 7 /// <summary> 8 /// 构造函数 9 /// </summary> 10 public CometListener(out Action<int> setListenerCode) 11 { 12 setListenerCode = setCode; 13 14 callbacks = new List<CallbackModel>(); 15 } 16 17 internal void setCode(int code) 18 { 19 this.Code = code; 20 } 21 22 /// <summary> 23 /// 开始监听的方法 24 /// </summary> 25 /// <param name="asyncManager">页面的异步处理对象</param> 26 public void Begin(AsyncManager asyncManager) 27 { 28 //先把原有数据返回 29 Return(); 30 31 lock (asyncManager) 32 { 33 this.asyncManager = asyncManager; 34 lock (this.asyncManager) 35 { 36 //启动异步 37 asyncManager.OutstandingOperations.Increment(); 38 39 //设计倒计时,按期断开监听,以避免网关超时 40 var timeLimitInMilliSecond = 30000; 41 System.Timers.Timer timer = new System.Timers.Timer(timeLimitInMilliSecond); 42 43 //设置计时终结方法 44 timer.Elapsed += (sender, e) => 45 { 46 if (this.asyncManager == asyncManager) 47 { 48 Return(); 49 } 50 }; 51 52 //启动倒计时 53 timer.Start(); 54 } 55 } 56 } 57 58 /// <summary> 59 /// 将现有的值返回给客户端 60 /// </summary> 61 public void Return() 62 { 63 if (asyncManager != null) 64 { 65 lock (asyncManager) 66 { 67 //返回最新值 68 asyncManager.Parameters["listeningCode"] = Code; 69 70 //返回最新值 71 asyncManager.OutstandingOperations.Decrement(); 72 73 //清空当前页面异步对象,以等待下一个轮询请求 74 asyncManager = null; 75 } 76 } 77 } 78 79 /// <summary> 80 /// 拿走并清除callbacks 81 /// </summary> 82 public IEnumerable<CallbackModel> ShiftAllCallbacks() 83 { 84 lock (callbacks) 85 { 86 var result = callbacks.ToList(); 87 callbacks.Clear(); 88 return result; 89 } 90 } 91 92 93 #region IListener members 94 95 /// <summary> 96 /// 惟一的监听编码,用以隔开并区分监听 97 /// </summary> 98 public int Code 99 { 100 get; 101 private set; 102 } 103 104 /// <summary> 105 /// 回调方法,经过该方法将新的数据发送回给监听者 106 /// </summary> 107 /// <param name="typeCode">数据的类型</param> 108 /// <param name="data">数据内容</param> 109 /// <returns></returns> 110 public async Task CallAsync(int typeCode, object args) 111 { 112 lock (callbacks) 113 { 114 callbacks.Add(new CallbackModel(typeCode, args)); 115 } 116 Return(); 117 } 118 119 #endregion 120 }
两周前单次通讯的往返大约在200ms~300ms之间,此次重构后,将Chat内核中大量同步行为改为了异步并发,已经将单次通讯往返压缩在了30ms~50ms之间。固然最但愿是能压缩在10ms~20ms,那样就能够用长轮询进行高同步性的游戏应用了,好比射击、即时战略。可是,到时候就没那么简单了吧,毕竟心跳(Beat)的时候是会有两次往返,也就是必须将单次往返压缩在10ms之内才有可能实现,页面的数据支撑也是个问题,须要大量套用字页面来存放数据,Balabalabalabala.......
和JSONP同样,长轮询是一个畸形的技术,也更加是开发人员在备受显示状况限制下智慧的结晶。固然,从通讯上来说,它不是一项“优秀”的技术或者协议,它浪费了太多“没必要要”的资源在没必要要的事情上了。就像期待IE6今早从市场上消失同样,我也期待你们广泛早日统一用上诸如Web Socket通常更好的通讯技术。但现时来讲,咱们不得不以相似于长轮训、Hack的一些方式向底端的用户妥协,毕竟用户才是产品的最终使用者。
最后,再次感谢各位当时在Chat贡献的测试数据,特别感谢诸位在上面约架(pao)、求关(zhong)注(子)和发#ffd800网地址的几位同胞。Azure帐号已经到期,因此已经上不去了。你们对数据感兴趣吗?(呵呵呵呵呵呵呵呵呵呵~)