最近打算用C#实现一个基于文件的EventStore。算法
什么是EventStore
关于什么是EventStore,若是还不清楚的朋友能够去了解下CQRS/Event Sourcing这种架构,我博客中也有大量介绍。EventStore是在Event Sourcing(下面简称ES)模式中,用于存储事件用的。从DDD的角度来讲,每一个聚合根在本身的状态发生变化时都会产生一个或多个领域事件,咱们须要把这些事件持久化起来。而后当咱们须要恢复聚合根的最新状态到内存时,能够经过ES这种技术,从EventStore获取该聚合根的全部事件,而后重演这些事件,就能将该聚合根恢复到最新状态了。这种技术和MySQL的Redo日志以及Redis的AOF日志或者leveldb的WAL日志的原理是相似的。可是区别是,redo/AOF/WAL日志是Command Sourcing,而咱们这里说的是Event Sourcing。关于这两个概念的区别,我很少展开了,有兴趣的朋友能够去了解下。数据库
为何要写一个EventStore
目前ENode使用的EventStore,是基于关系型数据库SqlServer的。虽然功能上彻底知足要求,可是性能上和数据容量上,离个人预期还有一些距离。好比:数组
- 关于性能,虽然能够经过SqlBulkCopy方法,实现较大的写入吞吐,可是我对EventStore的要求是,须要支持两个惟一索引:1)聚合根ID+事件版本号惟一;2)聚合根ID+命令ID惟一;当添加这两个惟一索引后,会很大影响SqlBulkCopy写入数据的性能;并且SqlBulkCopy只有SqlServer才有,其余数据库如MySQL没有,这样也无形之中限制了ENode的使用场景;
- 关于使用场景,DB是基于SQL的,他不是简单的帮咱们保存数据,每次写入数据都要解析SQL,执行SQL,写入RedoLOG,等;另外,DB还要支持修改数据、经过SQL查询数据等场景。因此,这就要求DB内部在设计存储结构时,要兼顾各类场景。而咱们如今要实现的EventStore,针对的场景比较简单:1)追求高吞吐的写入,没有修改和删除;2)查询很是少,不须要支持复杂的关系型查询,只须要能支持查询某个聚合根的全部事件便可;因此,针对这种特定的使用场景,若是有针对性的实现一个EventStore,我相信性能上能够有更大的提高空间;
- 关于数据量,一个EventStore可能须要存储大量的事件,百亿或千亿级别。若是采用DB,那咱们只能进行分库分表,由于单表能存储的记录数是有限的,好比1000W,超过这个数量,对写入性能也会有必定的影响。假设咱们如今要存储100亿事件记录,单表存储1000W,那就须要1000个表,若是单个物理库中分100个表,那就须要10个物理库;若是未来数据量再增长,则须要进一步扩容,那就须要牵涉到数据库的数据迁移(全量同步、增量同步)这种麻烦的事情。而若是是基于文件版本的EventStore,因为没有表的概念了,因此单机只要硬盘够大,就能存储很是多的数据。而且,最重要的,性能不会由于数据量的增长而降低。固然,EventStore也一样须要支持扩容,可是因为EventStore中的数据只会Append写入,不会修改,也不会删除,因此扩容方案相对于DB来讲,要容易作不少。
- 那为什么不使用NoSQL?NoSQL通常都是为大数据、可伸缩、高性能而设计的。由于一般NoSQL不支持上面第一点中所说的二级索引,固然一些文档型数据库如MongoDB是支持的,可是对我来讲是一个黑盒,我没法驾驭,也没有使用经验,因此没有考虑。
- 从长远来看,若是可以本身根据本身的场景实现一个有针对性的EventStore,那将来若是出现性能瓶颈的问题,本身就有足够的能力去解决。另外,对本身的技术能力的提升也是一个很大的锻炼机会。并且这个作好了,说不定又是本身的一个很好的做品,呵呵。因此,为什么不尝试一下呢?
EventStore的设计目标
- 要求高性能顺序写入事件;
- 要求严格判断聚合根的事件是否按版本号顺序递增写入;
- 支持命令ID的惟一性判断;
- 支持大量事件的存储;
- 支持按照聚合根ID查询该聚合根的全部事件;
- 支持动态扩容;
- 高可用(HA),须要支持集群和主备,二期再作;
EventStore核心问题描述、问题分析、设计思路
核心问题描述
一个EventStore须要解决的核心问题有两点:1)持久化事件;2)持久化事件以前判断事件版本号是否合法、事件对应的命令是否重复。一个事件包含的信息以下:缓存
- 聚合根ID
- 事件版本号
- 命令ID
- 事件内容
- 事件发生时间
为何是这些信息?
本文所提到的事件是CQRS架构中,由C端的某个命令操做某个聚合根后,致使该聚合根的状态发生变化,而后每次变化都会产生一个对应的事件。因此,针对聚合根的每一个事件,咱们关注的信息就是:哪一个命令操做哪一个聚合根,产生了什么版本号的一个事件,事件的内容和产生的时间分别是什么。服务器
事件的版本号是什么意思?
因为一个聚合根在生命周期内常常会被修改,也就是说常常会有命令去修改聚合根的状态,而每次状态的变化都会产生一个对应的事件,也就是说一个聚合根在生命周期内会产生多个事件。聚合根是领域驱动设计(DDD)中的一个概念,聚合根是一个具备全局惟一ID的实体,具备独立的生命周期,是数据强一致性的最小边界。为了保证聚合根内的数据的强一致性,针对单个聚合根的任何修改都必须是线性的,由于只有线性的操做,才能保证当前的操做所基于的聚合根的状态是最新的,这样才能保证聚合根内数据的完整性,老是知足业务规则的不变性。关于线性操做这点,就像对DB的一张表中的某一条记录的修改也必须是线性的同样,数据库中的同一条记录不可能同时被两个线程同时修改。因此,分析到这里,咱们知道同一个聚合根的多个事件的产生一定是有前后顺序的。那如何保证这个前后顺序呢?答案是,在聚合根上设计一个版本号,经过版本号的顺序递增来保证对同一个聚合根的修改也老是线性依次的。这个思路其实就是一种乐观并发控制的思路。聚合根的第一个事件的版本号为1,第二个事件的版本号为2,第N个事件的版本号为N。当第N个事件产生时,它所基于的聚合根的状态必须是N-1。当某个版本号为N的事件尝试持久化到EventStore时,若是EventStore中已经存在了一个版本号为N的事件,则认为出现并发冲突,须要告诉上层应用当前事件持久化遇到并发冲突了,而后上层应用须要获取该聚合根的最新状态,而后再重试当前命令,而后再产生新的版本号的事件,再持久化到EventStore。数据结构
但愿能自动检测命令是否重复处理
CQRS架构,任何聚合根的修改都是经过命令来完成的。命令就是一个DTO,当咱们要修改一个聚合根的状态时,就发送一个命令到分布式MQ便可,而后MQ的消费者处理该命令。可是你们都知道任何分布式MQ通常都只能作到至少投递一次(At Least Once)的消息投递语义。也就是说,一个命令可能会被消费者重复处理。在有些状况下,某个聚合根若是重复处理某个命令,会致使聚合根的最终状态不正确,好比重复扣款会致使帐号余额不正确。因此,咱们但愿在框架层面能支持命令的重复处理的检测。那最理想的检测位置在哪里呢?若是是传统的DB,咱们会在数据库层面经过创建惟一索引保证命令绝对不会重复执行。那对应到咱们的EventStore,天然也应该在EventStore内部检测。多线程
核心问题分析
经过上面的问题描述,咱们知道,其实一个EventStore须要解决的问题就两点:1)以文件的形式持久化事件;2)持久化以前判断事件的版本号是否冲突、事件的命令是否重复。架构
关于第一点,天然是经过顺序写文件来实现,机械硬盘在顺序写文件的状况下,性能也是很是高的。写文件的思路很是简单,咱们能够固定单个文件的大小,好比512MB。而后先写第一个文件,写满后新建一个文件,再写第二个,后面以此类推。并发
关于第二点,本质上是两个索引的需求:a. 聚合根ID+事件版本号惟一(固然,这里不只要保证惟一,还要判断是不是连续递增);b. 聚合根ID + 命令ID惟一,即针对同一个聚合根的命令不能重复处理;那如何实现这两个索引的需求呢?第一个索引的实现成本相对较低,咱们只须要在内存维护每一个聚合根的当前版本号,而后当一个事件过来时,判断事件的版本号是不是当前版本号的下一个版本号便可,若是不是,则认为版本号非法;第二个索引的事件成本比较高,咱们必须维护每一个聚合根的全部产生的事件对应的命令的ID,而后在某个事件过来时,判断该事件对应的命令ID是否和已经产生的任何一个事件的命令ID重复,若是有,则认为出现重复。因此,归根结底,当须要持久化某个聚合根的事件时,咱们须要加载该聚合根的全部已产生的事件的版本号以及事件对应的命令ID到内存,而后在内存进行判断,从而检查当前事件是否知足这两个索引需求。框架
好了,上面是基本的也是最直接的解决问题的思路了。可是咱们不难发现,要实现上面这两个问题并不容易。由于:首先咱们的机器的内存大小是有限的,也就是说,没法把全部的聚合根的事件的索引信息都放在内存。那么当某个聚合根的事件要持久化时,发现内存中并没有这个聚合根的事件索引时,必然要从磁盘中加载该聚合根的事件索引。但问题是,咱们的事件因为为了追求高性能的写入到文件,老是只是简单的Append追加到最后一个文件的末尾。这样必然致使某个聚合根的事件可能分散在多个文件中,这样就给咱们查找这个聚合根的全部相关事件带来了极大的困难。那该如何权衡的去设计这两个需求呢?
我以为设计是一种权衡,咱们老是应该根据咱们的实际业务场景去有侧重点的进行设计,优先解决主要问题,而后次要问题尽可能去解决。就像leveldb在设计时,也是侧重于写入时很是简单快速,而读取时,可能会比较迂回曲折。EventStore,是很是典型的高频写入但不多读取的系统。但写入时须要保证上述的两个索引需求,因此,应该说这个写入的要求比leveldb的写入要求还要高一些。那咱们该如何去权衡呢?
EventStore核心设计思路
- 在内存中维护每一个聚合根的版本索引eventVersion,eventVersion中维护了当前聚合根的全部的版本、每一个版本对应的cmdId,以及每一个版本的事件在event文件中的物理位置;当一个事件过来时,经过这个eventVersion来判断version,cmdId是否合法(version必须是currentVersion+1,cmdId必须惟一);
- 当写入一个事件时,只写入一个文件,event.file文件;假设一个文件的大小为512MB,一个事件的大小为1KB,则一个文件大概存储52W个事件;
- 一个event.file文件写满后:
- 完成当前event.file文件,而后新建一个新的event.file文件,接下来的事件写入新的event.file文件;
- 启动一个后台线程,在内存中对当前完成的event.file文件中的event按照聚合根ID和事件版本号进行排序;
- 排序完成后,咱们就知道了该文件中的事件涉及到哪些聚合根,他们的顺序,以及最大最小聚合根ID分别是什么;
- 新建一个和event.file文件同样大小的临时文件;
- 在临时文件的header中记录当前event.file已排序过;
- 在临时文件的数据区域将排好序的事件顺序写入文件;
- 临时文件写入完成后,将临时文件替换当前已完成的event.file文件;
- 为event.file文件新建一个对应的事件索引文件eventIndex.file;
- 将event.file文件中的最大和最小聚合根ID写入到eventIndex.file索引文件的header;每一个event.file的最大最小的聚合根ID的关系,会在EventStore启动时自动加载并缓存到内存中,这样能够方便咱们快速知道某个聚合根在某个event.file中是否存在事件,由于直接在内存中判断便可。这个缓存我暂时命名为aggregateIdRangeCache吧,以便下面更方便的进一步说明如何使用它。
- 将event.file文件中的每一个聚合根的每一个事件的索引信息写入eventIndex.file文件,事件索引信息包括:聚合根ID+事件版本号+事件的命令ID+事件在event.file文件中的物理位置这4个信息;有了这些索引信息,咱们就能够只须要访问事件索引文件就能获取某个聚合根的全部版本信息(就是上面说的eventVersion)了;
- 但仅仅在事件索引文件中记录最大最小聚合根ID以及每一个事件的索引信息还不是不够的。缘由是,当咱们要查找某个聚合根的全部版本信息时,虽然能够先根据内存中缓存的每一个event.file文件的最大最小聚合根ID快速定位该聚合根在哪些event.file中存在事件(也就是明确了在哪些对应的事件索引文件中存在版本信息),可是当咱们要从这些事件索引文件中找出该聚合根的事件索引到底具体在文件的哪一个位置时,只能从文件的起始位置顺序扫描文件才能知道,这样的顺序扫描无疑是不高效的。假设一个event.file文件的大小固定为512MB,一个事件的大小为1KB,则一个event.file文件大概存储52W个事件,每一个事件索引的大小大概为:24 + 4 + 24 + 8 = 60个字节。因此,这52W个事件的索引信息大概占用30MB,也就是最终一个事件索引文件的大小大概为30MB多一点。当咱们要获取某个聚合根的全部版本信息时,若是每次访问某个事件索引文件时,老是要顺序扫描30MB的文件数据,那无疑效率不高。因此,我还须要进一步想办法优化,由于事件索引文件里的事件索引信息都是按照聚合根ID和事件版本号排序的,假设如今有52W个事件索引,则咱们能够将这52W个事件索引记录均等切分为100个点,而后把每一个点对应的事件索引的聚合根ID都记录到事件索引文件的header中,一个聚合根ID的长度为24个字节,则100个也就2.4KB左右。这样一来,当咱们想要知道某个聚合根的事件索引大概在事件索引文件的哪一个位置时,咱们能够先经过访问header里的信息,快速知道应该从哪一个位置去扫描。这样一来,原本对于一个事件索引文件咱们要扫描30MB的数据,如今变为只须要扫描百分之一的数据,即300KB,这样扫描的速度就快不少了。这一段写的有点啰嗦,但一切都是为了尽可能详细的描述个人设计思路,不知道各位看官是否看懂了。
- 除了记录记录最大最小聚合根ID以及记录100个等分的切割点外,还有一点能够优化来提升获取聚合根的版本信息的性能,就是:若是内存足够,当某个eventIndex.file被读取一次后,EventStore能够自动将这个eventIndex.file文件缓存到非托管内存中;这样下次就能够直接在非托管内存访问这个eventIndex.file了,减小了磁盘IO的读取;
- 由于内存大小有限,因此eventVersion不可能所有缓存在内存;因此,当某个聚合根的eventVersion不在内存中时,须要从磁盘加载。加载的思路是:扫描aggregateIdRangeCache,快速找出该聚合根的事件在哪些event.file文件中存在;而后经过上面提到的查找算法快速查找这些event.file文件对应的eventIndex.file文件,这样就能快速获取该聚合根的eventVersion信息了;
- 另外,EventStore启动时,最好须要预加载一些热门聚合根的eventVersion信息到缓存。那该预加载哪些聚合根呢?咱们能够在内存中维护一个固定大小(N)的环形数组,环形数组中维护了最近修改的聚合根的ID;当某个聚合根有事件产生,则将该聚合根ID的hashcode取摸N获得环形数组的下标,而后将该聚合根ID放入该下标;定时将该环形数组中的聚合根ID dump到文件preloadAggregateId.file进行存储;这样当EventStore启动时,就能够从preloadAggregateId.file加载指定聚合根的eventVersion;
思路总结:
上面的设计的主要思路是:
- 写入一个事件前先内存中判断是否容许写入,若是容许,则顺序写入event.file文件;
- 对一个已经写入完成的event.file文件,则用一个后台异步线程对文件中的事件按照聚合根ID和事件版本号进行排序,而后将排序后的临时event.file文件替换原event.file文件,同时将排序后获得的事件索引信息写入eventIndex.file文件;
- 写入一个事件时,若是当前聚合根的版本信息不在内存,则须要从相关的eventIndex.file文件加载到内存;
- 因为加载版本信息可能须要访问多个eventIndex.file文件,会有屡次读磁盘的IO,对性能影响较大,因此,咱们老是应该尽可能在内存缓存聚合根的版本信息;
- 整个EventStore的性能瓶颈在于内存中能缓存多少聚合根版本信息,若是可以缓存百分百的聚合根版本信息,且能作到没有GC的问题(尽可能避免),那咱们就能够作到写入事件很是快速;因此,如何设计一个支持大容量缓存(好比缓存几十个GB的数据),且没有GC问题的高性能缓存服务,就变得很关键了;
- 因为有了事件索引信息以及这么多的缓存机制,因此,当要查询某个聚合根的全部事件,也就很是简单了;
如何解决多线程并发写的时候的CPU占用高的问题?
到这里,咱们分析了如何存储数据,如何写入数据,还有如何查询聚合根的全部事件,应该说核心功能的实现思路已经想好了。若是如今是单线程访问EventStore,我相信性能应该不会很低了。可是,实际的状况是N多客户端会同时并发的访问EventStore。这个时候就会致使EventStore服务器会有不少线程要求同时写入事件到数据文件,可是你们知道写文件必须是单线程的,若是是多线程,那也要用锁的机制,保证同一个时刻只能有一个线程在写文件。最简单的办法就是写文件时用一个lock搞定。可是通过测试发现简单的使用lock,在多线程的状况下,会致使CPU很高。由于每一个线程在处理当前事件时,因为要写文件或读文件,都是IO操做,因此锁的占用时间比较长,致使不少线程都在阻塞等待。
为了解决这个问题,我作了一些调研,最后决定使用双缓冲队列的技术来解决。大体思路是:
设计两个队列,将要写入的事件先放入队列1,而后当前要真正处理的事件放在队列2。这样就作到了把接收数据和处理数据这两个过程在物理上分离,先快速接收数据并放在队列1,而后处理时把队列1里的数据放入队列2,而后队列2里的数据单线程线性处理。这里的一个关键问题是,如何把队列1里的数据传给队列2呢?是一个个拷贝吗?不是。这种作法过低效。更好的办法是用交换两个队列的引用的方式。具体思路这里我不展开了,你们能够网上找一下双缓冲队列的概念。这个设计我以为最大的好处是,能够有效的下降多线程写入数据时对锁的占用时间,原本一次锁占用后要直接处理当前事件的,而如今只须要把事件放入队列便可。双缓冲队列能够在不少场景下被使用,我认为,只要是多个消息生产者并发产生消息,而后单个消费者单线程消费消息的场景,均可以使用。并且这个设计还有一个好处,就是咱们能够有机会单线程批量处理队列2里的数据,进一步提升处理数据的吞吐能力。
如何缓存大量事件索引信息?
最简单的办法是使用支持并发访问的字典,如ConcurrentDictionary<T,K>,Java中就是ConcurrentHashmap。可是通过测试发现ConcurrentDictionary在key增长到3000多万的时候就会很是慢,因此我本身实现了一个简单的缓存服务,初步测试下来,基本知足要求。具体的设计思路本文先不介绍了,总之咱们但愿实现一个进程内的,支持缓存大量key/value的一个字典,支持并发操做,不要由于内存占用越多而致使缓存能力的降低,尽可能不要有GC的问题,能知足这些需求就OK。
如何扩容?
咱们再来看一下最后一个我认为比较重要的问题,就是如何扩容。
虽然咱们单台EventStore机器只要硬盘够大,就能够存储至关多的事件。可是硬盘再大也有上限,因此扩容的需求老是有的。因此如何扩容(将数据迁移到其余服务器上)呢?经过上面的设计咱们了解到,EventStore中最核心的文件就是event.file,其他文件均可以经过event.file文件来生成。因此,咱们扩容时只须要迁移event.file文件便可。
那如何扩容呢?假设如今有4台EventStore机器,要扩容到8台。
有两个办法:
- 土豪的作法:准备8台全新的机器,而后把原来4台机器的所有数据分散到新准备的8台机器上,而后再把老机器上的数据所有删除;
- 屌丝的作法:准备4台全新的机器,而后把原来4台机器的一半数据分散到新准备的4台机器上,而后再把老机器上的那一半数据删除;
对比之下,能够很容易发现土豪的作法比较简单,由于只须要考虑如何迁移数据到新机器便可,不须要考虑迁移后把已经迁移过去的数据还要删除。大致的思路是:
- 采用拉的方式,新的8台目标机器都在向老的4台源机器拖事件数据;目标机器记录当前拖到哪里了,以便若是遇到意外中断中止后,下次重启能继续从该位置继续拖;
- 每台源机器都扫描全部的事件数据文件,一个个事件进行扫描,扫描的起始位置由当前要拖数据的目标机器给出;
- 每台目标机器该拖哪些事件数据?预先在源机器上配置好此次扩容的目标机器的全部惟一标识,如IP;而后当某一台目标机器过来拖数据时,告知本身的机器的IP。而后源机器根据IP就能知道该目标机器在全部目标机器中排第几,而后源机器就能知道应该把哪些事件数据同步给该目标机器了。举个例子:假设当前目标机器的IP在全部IP中排名第3,则针对每一个事件,获取事件的聚合根ID,而后将聚合根ID hashcode取摸8,若是余数为3,则认为该事件须要同步给该目标机器,不然就跳过该事件;经过这样的思路,咱们能够保证同一个聚合根的全部事件都最终同步到了同一台新的目标机器。只要咱们的聚合根ID够均匀,那最终必定是均匀的把全部聚合根的事件均匀的同步到目标机器上。
- 当目标机器上同步完整了一个event.file后,就自动异步生成其对应的eventIndex.file文件;
扩容过程的数据同步迁移的思路差很少了。可是扩容过程不只仅只有数据迁移,还有客户端路由切换等。那如客户端何动态切换路由信息呢?或者说如何作到不停机动态扩容呢?呵呵。这个实际上是一个外围的技术。只要数据迁移的速度跟得上数据写入的速度,而后再配合动态推送新的路由配置信息到全部的客户端。最终就能实现动态库容了。这个问题我这里先不深刻了,搞过数据库动态扩容的朋友应该都了解原理。无非就是一个全量数据迁移、增量数据迁移、数据校验、短暂中止写服务,切换路由配置信息这几个关键的步骤。我上面介绍的是最核心的数据迁移的思路。
结束语
本文介绍了我以前一直想作的一个基于文件版本的EventStore的关键设计思路,但愿经过这篇文章把本身的思路系统地整理出来。一方面经过写文章能够进一步确信本身的思路是否OK,由于若是你文章写不出来,其实思路必定是哪里有问题,写文章的过程就是大脑整理思绪的过程。因此,写文章也是检查本身设计的一种好方法。另外一方面,也能够经过本身的原创分享,但愿和你们交流,但愿你们能给我一些意见或建议。这样也许能够在我动手写代码前能及时纠正一些设计上的错误。最后再补充一点,语言不重要,重要的是架构设计、数据结构,以及算法。谁说C#语言作不出好东西呢?呵呵。