一年前,当我第一次开发完EQueue后,写过一篇文章介绍了其总体架构,作这个框架的背景,以及架构中的全部基本概念。经过那篇文章,你们能够对EQueue有一个基本的了解。通过了1年多的完善,EQueue不管是功能上仍是成熟性上都完善了很多。因此,但愿再写一篇文章,介绍一下EQueue的总体架构和关键特性。html
EQueue是一个分布式的、轻量级、高性能、具备必定可靠性,纯C#编写的消息队列,支持消费者集群消费模式。算法
主要包括三个部分:producer, broker, consumer。producer就是消息发送者;broker就是消息队列服务器,负责接收producer发送过来的消息,以及持久化消息;consumer就是消息消费者,consumer从broker采用拉模式到broker拉取消息进行消费,具体采用的是long polling(长轮训)的方式。这种方式的最大好处是可让broker很是简单,不须要主动去推消息给consumer,而是只要负责持久化消息便可,这样就减轻了broker server的负担。同时,consumer因为是本身主动去拉取消息,因此消费速度能够本身控制,不会出现broker给consumer消息推的太快致使consumer来不及消费而挂掉的状况。在消息实时性方面,因为是长轮训的方式,因此消息消费的实时性也能够保证,实时性和推模型基本至关。sql
EQueue是面向topic的架构,和传统的MSMQ这种面向queue的方式不一样。使用EQueue,咱们不须要关心queue。producer发送消息时,指定的是消息的topic,而不须要指定具体发送到哪一个queue。一样,consumer发送消息也是同样,订阅的是topic,不须要关心本身想从哪一个queue接收消息。而后,producer客户端框架内部,会根据当前的topic获取全部可用的queue,而后经过某种queue select strategy选择一个queue,而后把消息发送到该queue;一样,consumer端,也会根据当前订阅的topic,获取其下面的全部的queue,以及当前全部订阅这个topic的consumer,按照平均的方式计算出当前consumer应该分配到哪些queue。这个分配的过程就是消费者负载均衡。数据库
Broker的主要职责是:服务器
发送消息时:负责接收producer的消息,而后持久化消息,而后创建消息索引信息(把消息的全局offset和其在queue中的offset简历映射关系),而后返回结果给producer;网络
消费消息时:负责根据consumer的pull message request,查询一批消息(默认是一次pull request拉取最多32个消息),而后返回给consumer;架构
各位看官若是对EQueue中的一些基本概念还不太清楚,能够看一下我去年写的介绍1,写的很详细。下面,我想介绍一下EQueue的一些有特点的地方。负载均衡
网络通讯模型,采用.NET自带的SocketAsyncEventArgs,内部基于Windows IOCP网络模型。发送消息支持async, sync, oneway三种模式,不管是哪一种模式,内部都是异步模式。当同步发送消息时,就是框架帮咱们在异步发送消息后,同步等待消息发送结果,等到结果返回后,才返回给消息发送者;若是必定时间还不返回,则报超时异常。在异步发送消息时,采用从EventStore开源项目中学习到的优秀的socket消息发送设计,目前测试下来,性能高效、稳定。经过几个案例运行很长时间,没有出现通讯层方面的问题。框架
broker消息持久化的设计。采用WAL(Write-Ahead Log)技术,以及异步批量持久化到SQL Server的方式确保消息高效持久化且不会丢。消息到达broker后,先写入本地日志文件,这种设计在db, nosql等数据库中很常见,都是为了确保消息或请求不丢失。而后,再异步批量持久化消息到SQL Server,采用.NET自带的SqlBulkCopy技术。这种方式,咱们能够确保消息持久化的实时性和很高的吞吐量,由于一条消息只要写入本地日志文件,而后放入内存的一个dict便可。异步
当broker意外宕机,可能会有一些消息还没持久化到SQL Server;因此,咱们在重启broker时,咱们除了先从SQL Server恢复全部未消费的消息到内存外,同时记录当前SQL Server中的最后一条消息的offset,而后咱们从本地日志文件扫描offset+1开始的全部消息,所有恢复到SQL Server以及内存。
须要简单提一下的是,咱们在把消息写入到本地日志文件时,不可能所有写入到一个文件,因此要拆文件。目前是根据log4net来写消息日志,每100MB一个日志文件。为何是100MB?是由于,咱们的这个消息日志文件的用途主要是用来在Broker重启时,恢复SQL Server中最后还没来得及持久化的那些消息的。正常状况下,这些消息量应该不会不少。因此,咱们但愿,当扫描本地日志文件时,尽可能能快速的扫描文件。一般100MB的消息日志文件,已经能够存储很多的消息量,而SQL Server中未持久化的消息一般不会超过这个量,除非当机前,出现长时间消息没法持久化的状况,这种状况,应该会被咱们监控到并及时发现,并采起措施。固然,每一个消息日志文件的大小,能够支持配置。另一点,就是从日志文件恢复的时候,仍是须要有一个算法的,由于未被持久化的消息,有可能不仅在最近的一个消息日志文件里,有可能在多个日志文件里,由于就像前面所说,会出现大量消息没有持久化到SQL Server的状况。
但总之,在保证高性能的前提下,消息不丢(可靠性)是彻底能够保证的。
消费消息方面,采用批量拉取消息进行消费的方式。默认consumer一个pull message request会最多拉取32个消息(只要存在这么多未消费消息的话);而后consumer会并行消费这些消费,除了并行消费外,也能够配置为单线程线性消费。broker在查询消息时,通常状况未消费消息老是在内存的,只有有一种状况不在内存,这个下面详细分析。因此,查询消息应该说很是快。
不过上面提到的消息可靠性,只能尽可能保证单机不丢消息。因为消息是放在DB,以及本地日志。因此,若是DB服务器硬盘坏了,或者broker的硬盘坏了,那就会有丢消息的可能性。要解决这个问题,就须要作replication了。EQueue下一步会支持broker的集群和故障转移(failover)。目前,我开发了一个守护进程服务,会监控broker进程是否挂掉,若是挂掉,则自动重启,必定程度上也会提升broker的可用性。
我以为,作事情,越简单越好,不要一开始就搞的太复杂。复杂的东西,每每难以维护和驾驭,虽然理论很美好,但老是会出现各类问题,呵呵。就像去中心化的架构虽然理论好像很美好,但实际使用中,发现仍是中心化的架构更好,更具有实战性。
消费者负载均衡是指某个topic的全部消费者,能够平均消费这个topic下的全部queue。咱们使用消息队列,我认为这个特性很是重要。设想,某一天,咱们的网站搞了一个活动,而后producer产生的消息猛增。此时,若是咱们的consumer服务器若是仍是只有原来的数量,那极可能会来不及处理这么多的消息,致使broker上的消息大量堆积。最终会影响用户请求的响应时间,由于不少消息没法及时被处理。
因此,遇到这种状况,咱们但愿分布式消息队列能够方便的容许咱们动态添加消费者机器,提升消费能力。EQueue支持这样的动态扩展能力。假如某个topic,默认有4个queue,而后每一个queue对应一台consumer机器进行消费。而后,咱们但愿增长一倍的consumer时,只要在EQueue Web控制台上,为这个topic增长4个queue,而后咱们再新增4台consumer机器便可。这样EQueue客户端会支持自动负载均衡,几秒钟后,8个consumer就能够各自消费对应的queue了。而后,当活动事后,消息量又会回退到正常水平,那么咱们就能够再减小queue,并下线多余的consumer机器。
另外,EQueue还充分考虑到了下线queue时的平滑性,能够支持先冻结某个queue,这样能够确保不会有新的消息发送到该queue。而后咱们等到这个queue的消息都消费完后,就能够下线consumer机器和删除该queue了。这点,应该说,阿里的rocketmq也没有作到,呵呵。
这个特性,我以前专门写过一篇文章,详细介绍设计思路,这里也简单介绍一下。MQ的一个很重要的做用就是削峰,就是在遇到一瞬间大量消息产生而消费者来不及一会儿消费时,消息队列能够起到一个缓冲的做用,从而能够确保消息消费者服务器不会垮掉,这个就是削峰。若是使用RPC的方式,那最后全部的请求,都会压倒DB,DB就会承受不了这么多的请求而挂掉。
因此,咱们但愿MQ支持消息堆积的能力,不能由于为了快,而只能支持把消息放入服务器内存。由于服务器内存的大小是有限的,假设咱们的消息服务器内存大小是128G,每一个消息大小为1KB,那差很少最多只能堆积1.3亿个消息。不过通常来讲1.3亿也够了,呵呵。但这个毕竟要求大内存做为前提的。但有时咱们可能没有那么大的服务器内存,但也须要堆积这么多的消息的能力。那就须要咱们的MQ在设计上也提供支持。EQueue能够容许咱们在启动时配置broker服务器上容许在内存里存放的消息数以及消息队列里消息的全局offset和queueOffset的映射关系(我称之为消息索引信息)的数量。咱们能够根据咱们的服务器内存的大小进行配置。而后,broker上会有定时的扫描线程,定时扫描是否有多出来的消息和消息索引,若是有,则移除多出来的部分。经过这个设计,能够确保服务器内存必定不会用完。可是否要移除也有一个前提,就是必需要求这个消息已经持久化到SQL Server了。不然就不能移除。这个应该一般能够保证,由于通常不会出现1亿个消息都还没持久化到DB,若是出现这个状况,说明DB必定出了什么严重的问题,或者broker没法与db创建链接了。这种状况下,咱们应该早就已经发现了,EQueue Web监控控制台上随时能够查看消息的最大全局offset,已经持久化的最大全局offset。
上面这个设计带来的一个问题是,假如如今consumer要拉取的消息不在内存了怎么办?一种办法是从DB把这个消息拉取到内存,但一条条拉,确定太慢了。因此,咱们能够作一个优化,就是发现当前消息不在内存时,由于极可能下一条消息也不在内存,因此咱们能够一次性从Sql Server DB拉取10000个消息(可配置),这样后续的10000个消息就必定在内存了,咱们须要再访问DB。这个设计实际上是在内存使用和拉取消息性能之间的一个权衡后的设计。Linux的pagecache的目的也是这个。
另一点,就是咱们broker重启时,不能所有把全部消息都恢复到内存,而是要判断是否已经到达内存能够承受的最大消息数了。若是已经到达,那就不能再放入内存了;同理,消息索引信息的恢复也是同样。不然,在消息堆积过多的时候,就会致使broker重启时,内存爆掉了。
EQueue的消息消费进度的设计,和kafka, rocketmq是一个思路。就是定时保存每一个queue的消费进度(queue consumed offset),一个long值。这样设计的好处是,咱们不用每次消费完一个消息后,就当即发送一个ack回复消息到broker。若是是这样,对broker的压力是很大的。而若是只是定时发送一个消费进度,那对broker的压力很小。那这个消费进度怎么来?就是采用滑动门技术。就是consumer端,在拉取到一批消息后,先放入本地内存的一个SortedDictionary里。而后继续去拉下一批消息。而后会启动task去并行消费这些刚刚拉取到的消息。因此,这个本地的SortedDictionary就是存放了全部已经拉取到本地但尚未被消费掉的消息。而后当某个task thread消费掉一个消息后,会把它从SortedDictionary中移除。而后,我上面所说的滑动门技术,就是指,在每次移除一个消息后,获取当前SortedDictionary里key最小的那个消息的queue offset。随着消息的不断消费,这个queue offset也会不断增大,从宏观的角度看来,就像是一扇门在不停的往前移动。
但这个设计有个问题,就是假如这个Dict里,有一个offset=100的消息一直没被消费掉,那就算后面的消息都被消费了,最后这个滑动门仍是不会前进。由于这个dict里的最后的那个queue offset老是100。这个应该好理解的吧。因此这种状况下,当consumer重启后,下次消费的位置仍是会从100开始,后面的也会从新消费一遍。因此,咱们的消费者内部,须要都支持幂等处理消息。
由于broker上的消息,不是消息消费掉了就当即删除,而是定时删除,好比每2天删除一次(能够配置)。因此,当咱们哪天但愿从新消费1天前的消息的时候,EQueue也是彻底支持的。只要在consumer启动前,修改消费进度到之前的某个特定的值便可。
EQueue有一个完善的Web管理控制台,咱们能够经过该控制台管理topic,管理queue,查看消息,查看消息消费进度,查看消息堆积状况等信息。可是目前还不支持报警,之后会慢慢增长报警功能。
经过这个控制台,咱们使用EQueue就会方便不少,能够实时了解消息队列服务器的健康情况。贴一个管理控制台的UI界面,让你们有个印象:
我相信:没有作很差,只有没耐心。