开源地址:https://github.com/tangxuehua/enodehtml
上一篇文章,简单介绍了enode框架内部的总体实现思路,用到了staged event-driven architecture的思想。经过前一篇文章,咱们知道了enode内部有两种队列:command queue、event queue;用户发送的command会进入command queue排队,domain model产生的domain event会进入event queue,而后等待被dispatch到全部的event handlers。本文介绍一下enode框架中这两种消息队列究竟是如何设计的。java
先贴一下enode框架的内部实现架构图,这样对你们理解后面的分析有帮助。node
enode的设计初衷是在单个进程内提供基于DDD+CQRS+EDA的应用开发。若是咱们的业务须要和其余系统交互,那也能够,就是经过在event handler中与其余外部系统交互,好比广播消息出去或者调用远程接口,均可以。也许未来,enode也会内置支持远程消息通讯的功能。可是不支持远程通讯并不表示enode只能开发单机应用了。enode框架须要存储的数据主要有三种:git
好,经过上面的分析,咱们知道enode框架运行时的全部数据,就存储在mongodb和redis这两个地方。而这两种存储都是部署在独立的服务器上,与web服务器无关。因此运行enode框架的每台web服务器上是无状态的。因此,咱们就能方便的对web服务器进行集群,咱们能够随时当用户访问量的增长时增长新的web服务器,以提升系统的响应能力;固然,当你发现随着web服务器的增长,致使单台mongodb服务器或单台redis服务器处理不过来成为瓶颈时,也能够对mongodb和redis作集群,或者对数据作sharding(固然这两种作法不是很好作,须要对mongodb,redis很熟悉才行),这样就能够提升mongodb,redis的吞吐量了。github
好了,上面的分析主要是为了说明enode框架的使用范围,讨论清楚这一点对咱们分析须要什么样的消息队列有很大帮助。web
如今咱们知道,咱们彻底不须要分布式的消息队列了,好比不须要MSMQ、RabbitMQ,等重量级成熟的支持远程消息传递的消息队列了。咱们须要的消息队列的特征是:redis
内存队列,特色是快。可是咱们不光是须要快,还要能支持并发的入队和出对。那么看起来ConcurrentQueue<T>彷佛能知足咱们的要求了,一方面性能还能够,另外一方面内置支持了并发操做。可是有一点没知足,那就是咱们但愿当队列里没有消息的时候,队列的消费者不能让CPU空转,CPU空转会直接致使CPU占用100%,致使机器没法工做。幸运的是,.net中也有一个支持这种功能的集合,那就是:BlockingCollection<T>,这种集合能提供在队列内无元素的时候block当前线程的功能。咱们能够用如下的方式来实例化一个队列:算法
private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>());
并发入队的时候,咱们只要写下面的代码便可:mongodb
_queue.Add(message);
并发出队的时候,只要:缓存
_queue.Take();
咱们不难看出,ConcurrentQueue<T>是提供了队列加并发访问的支持,而BlockingCollection<T>是在此基础上再增长blocking线程的功能。
是否是很是简单,通过个人测试,BlockingCollection<T>的性能已经很是好,每秒10万次入队出对确定没问题,因此没必要担忧成为瓶颈。
关于Disruptor的调研:
了解过LMAX架构的朋友应该据说过Disruptor,LMAX架构能支持每秒处理600W订单,并且是单线程。这个速度是否是很惊人?你们有兴趣的能够去了解下。LMAX架构是彻底in memory的架构,全部的业务逻辑基于纯内存实现,粗粒度的架构图以下:
LMAX架构之因此能这么快,除了彻底基于in memory的架构外,还归功于延迟率在纳秒级别的disruptor队列组件。下面是disruptor与java中的Array Blocking Queue的延迟率对比图:
ns是纳秒,咱们能够从数据上看到,Disruptor的延迟时间比Array Blocking Queue快的不是一个数量级。因此,当初LMAX架构出来时,一时很是轰动。我曾经也对这个架构很好奇,但由于有些细节问题没想清楚,就不敢贸然实践。
经过上面的分析,咱们知道,Disruptor也是一种队列,而且也彻底能够替代BlockingCollection,可是由于咱们的BlockingCollection目前已经知足咱们的须要,且暂时不会成为瓶颈,因此,我暂时没有采用Disruptor来实现咱们的内存队列。关于LMAX架构,你们还能够看一下这篇我之前写的文章。
咱们不光须要一个高性能且支持并发的内存队列,还要支持队列消息的持久化功能,这样咱们才能保证消息不会丢失,从而才能谈消息至少被处理一次。
那消息何时持久化?
当咱们发送一个消息给队列,一旦发生成功,咱们确定认为消息已经不会丢了。因此,很明显,消息队列内部确定是要在接收到入队的消息时先持久化该消息,而后才能返回。
那么如何高效的持久化呢?
第一个想法:
基于txt文本文件的顺序写。原理是:当消息入队时,将消息序列化为文本,而后append到一个txt1文件;当消息被处理完以后,再把该消息append到另外一个txt2文件;而后,若是当前机器没重启,那内存队列里当前存在的消息就是还未被处理的消息;若是机器重启了,那如何知道哪些消息还没被处理?很简单,就是对比txt1,txt2这两个文本文件,而后只要是txt1中存在,可是txt2中不存在的消息,就认为是没被处理过,那须要在enode框架启动时读取txt1中这些没被处理的消息文本,反序列化为消息对象,而后从新放入内存队列,而后开始处理。这个思路其实挺好,关键的一点,这种作法性能很是高。由于咱们知道顺序写文本文件是很是快的,通过个人测试,每秒200W行普通消息的文本不在话下。这意味着咱们每秒能够持久化200W个消息,固然实际上咱们确定达不到这个高的速度,由于消息的序列化性能达不到这个速度,因此瓶颈是在序列化上面。可是,经过这种持久化消息的思路,也会有不少细节问题比较难解决,好比txt文件愈来愈大,怎么办?txt文件很差管理和维护,万一不当心被人删除了呢?还有,如何比较这两个txt文件?按行比较吗?不行,由于消息入队的顺序和处理的顺序不必定相同,好比command就是如此,当用户发送一个command到队列,可是处理的时候发现第一次因为并发冲突,致使command执行没成功,因此会重试command,若是重试成功了,而后持久化该command,可是咱们知道,此时持久化的时候,它的顺序也许已经在后面的command的后面了。因此,咱们不能按行比较;那么就要按消息的ID比较了?就算能作到,那这个比较过程也是很耗时的,假设txt1有100W个消息;txt2中有80W个消息,那若是按照ID来比较txt1中哪20W个消息还没被处理,有什么算法能高效比较出来吗?因此,咱们发现,这个思路仍是有不少细节问题须要考虑。
第二个想法:
采用NoSQL来存储消息,经过一些思考和比较后,以为仍是MongoDB比较合适。一方面MongoDB实际上全部的存取操做优先使用内存,也就是说不会立刻持久化到磁盘。因此性能很快。另外一方面,mongodb支持可靠的持久化功能,能够放心的用来持久化消息。性能方面,虽然没有写txt那么快,但也基本能接受了。由于咱们毕竟不是整个网站的全部用户请求的command都是放在一个队列,若是咱们的网站用户量很大,那确定会用web服务器集群,且每一个集群机器上都会有不止一个command queue,因此,单个command queue里的消息咱们能够控制为不会太多,并且,单个command queue里的消息都是放在不一样的mongodb collection中存储;固然持久化瓶颈永远是IO,因此真的要快,那只能一个独立的mongodb server上设计一个collection,该collection存放一个command queue里的消息;其余的command queue的消息就也采用这样的作法放在另外的mongodb server上;这样就能作到IO的并行,从而根本上提升持久化速度。可是这样作代价很大的,可能须要好多机器呢,整个系统有多少个queue,那就须要多少台机器,呵呵。总而言之,持久化方面,咱们仍是有一些办法能够去尝试,还有优化的余地。
再回过头来简单说一下,采用mongodb来持久化消息的实现思路:入队的时候持久化消息,出队的时候删除该消息;这样当机器重启时,要查看某个队列有多少消息,只要经过一个简单的查询返回mongodb collection中当前存在的消息便可。这种作法设计简单,稳定,性能方面目前应该还能够接受。因此,目前enode就是采用这种方法来持久化全部enode用到的内存队列的消息。
代码示意,有兴趣的能够看看:
public abstract class QueueBase<T> : IQueue<T> where T : class, IMessage { #region Private Variables private IMessageStore _messageStore; private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>()); private ReaderWriterLockSlim _enqueueLocker = new ReaderWriterLockSlim(); private ReaderWriterLockSlim _dequeueLocker = new ReaderWriterLockSlim(); #endregion public string Name { get; private set; } protected ILogger Logger { get; private set; } public QueueBase(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentNullException("name"); } Name = name; _messageStore = ObjectContainer.Resolve<IMessageStore>(); Logger = ObjectContainer.Resolve<ILoggerFactory>().Create(GetType().Name); } public void Initialize() { _messageStore.Initialize(Name); var messages = _messageStore.GetMessages<T>(Name); foreach (var message in messages) { _queue.Add(message); } OnInitialized(messages); } protected virtual void OnInitialized(IEnumerable<T> initialQueueMessages) { } public void Enqueue(T message) { _enqueueLocker.AtomWrite(() => { _messageStore.AddMessage(Name, message); _queue.Add(message); }); } public T Dequeue() { return _queue.Take(); } public void Complete(T message) { _dequeueLocker.AtomWrite(() => { _messageStore.RemoveMessage(Name, message); }); } }
思路应该很容易想到,就是先把消息从内存队列dequeue出来,而后交给消费者处理,而后由消费者告诉咱们当前消息是否被处理了,若是没被处理好,那须要尝试重试处理,若是重试几回后仍是不行,那也不能把消息丢弃了,但也不能无休止的一直只处理这个消息,因此须要把该消息丢到另外一个专门用于处理须要重试的本地纯内存队列。若是消息被处理成功了,那就把该消息从持久化设备中删除便可。看一下代码比较清晰吧:
private void ProcessMessage(TMessageExecutor messageExecutor) { var message = _bindingQueue.Dequeue(); if (message != null) { ProcessMessageRecursively(messageExecutor, message, 0, 3); } } private void ProcessMessageRecursively(TMessageExecutor messageExecutor, TMessage message, int retriedCount, int maxRetryCount) { var result = ExecuteMessage(messageExecutor, message); //这里表示在消费(即处理)消息 //若是处理成功了,就通知队列从持久化设备删除该消息,经过调用Complete方法实现 if (result == MessageExecuteResult.Executed) { _bindingQueue.Complete(message); } //若是处理失败了,就重试几回,目前是3次,若是仍是失败,那就丢到一个重试队列,进行永久的定时重试 else if (result == MessageExecuteResult.Failed) { if (retriedCount < maxRetryCount) { _logger.InfoFormat("Retring to handle message:{0} for {1} times.", message.ToString(), retriedCount + 1); ProcessMessageRecursively(messageExecutor, message, retriedCount + 1, maxRetryCount); } else { //这里是丢到一个重试队列,进行永久的定时重试,目前是每隔5秒重试一下,_retryQueue是一个简单的内存队列,也是一个BlockingCollection<T> _retryQueue.Add(message); } } }
代码应该很清楚了,我就很少作解释了。
本文主要介绍了enode框架中消息队列的设计思路,由于enode中有command queue和event queue,两种queue,因此逻辑是相似的;因此原本还想讨论一下如何抽象和设计这些queue,已去掉重复代码。但时间不早了,下次再详细讲吧。