前面咱们全面介绍了在akka-cluster环境下实现的CQRS写端write-side。简单来讲就是把发生事件描述做为对象严格按发生时间顺序写入数据库。这些事件对象通常是按照二进制binary方式如blob存入数据库的。cassandra-plugin的表结构以下:数据库
CREATE KEYSPACE IF NOT EXISTS akka WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 }; CREATE TABLE IF NOT EXISTS akka.messages ( used boolean static, persistence_id text, partition_nr bigint, sequence_nr bigint, timestamp timeuuid, timebucket text, writer_uuid text, ser_id int, ser_manifest text, event_manifest text, event blob, meta_ser_id int, meta_ser_manifest text, meta blob, message blob, tags set<text>, PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket)) WITH gc_grace_seconds =864000 AND compaction = { 'class' : 'SizeTieredCompactionStrategy', 'enabled' : true, 'tombstone_compaction_interval' : 86400, 'tombstone_threshold' : 0.2, 'unchecked_tombstone_compaction' : false, 'bucket_high' : 1.5, 'bucket_low' : 0.5, 'max_threshold' : 32, 'min_threshold' : 4, 'min_sstable_size' : 50 };
事件对象是存放在event里的,是个blob类型字段。下面是个典型的写动做示范: 安全
val receiveCommand: Receive = { case Cmd(data) => persist(ActionGo) { event => updateState(event) } }
这些事件描述的存写即写这个ActionGo时不会影响到实际业务数据状态。真正发生做用,改变当前业务数据状态的是在读端read-side。也就是说在另外一个线程里有个程序也按时间顺序把这些二进制格式的对象读出来、恢复成某种结构如ActionGo类型、而后按照结构内的操做指令对业务数据进行实际操做处理,这时才会产生对业务数据的影响。作个假设:若是这些事件不会依赖时间顺序的话是否是能够偷偷懒直接用一种pub/sub模式把reader放在订阅subscriber端,以下: app
//写端 import DistributedPubSubMediator.Publish val mediator = DistributedPubSub(context.system).mediator val receiveCommand: Receive = { case Cmd(data) => persist(DataUpdated) { event => updateState(event) mediator ! Publish(persistentId, event,sendOneMessageToEachGroup = true) } } //读端 val mediator = DistributedPubSub(context.system).mediator mediator ! Subscribe(persistentId, self) def receive = { case DataUpdated: Event ⇒ updateDataTables() }
这种pub/sub模式的特色是消息收发双方耦合度很是松散,但同时也存在订阅方sub即reader十分难以控制的问题,并且能够确定的是订阅到达消息没法保证是按发出时间顺序接收的,咱们没法控制akka传递消息的过程。由于业务逻辑中一个动做的发生时间顺序每每会对周围业务数据产生不一样的影响,因此如今只能考虑事件源event-sourcing这种模式了。es方式的CQRS是经过数据库表做为读写间隔实现写端程序和读端程序的分离。写端只管往数据库写数据操做指令,读端从同一数据库位置读出指令进行实质的数据处理操做,因此读写过程当中会产生必定的延迟,读端须要不断从数据库抽取pull事件。而具体pull的时段间隔如何设定也是一个比较棘手的问题。不管如何,akka提供了Persistence-Query做为一种CQRS读端工具。咱们先从一个简单的cassandra-persistence-query用例开始: 负载均衡
// obtain read journal by plugin id val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) // issue query to journal val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue) // materialize stream, consuming events implicit val mat = ActorMaterializer() source.runForeach { pack => updateDatabase(pack.event) }
eventsByPersistenceId(...)构建了一个akka-stream的Source[EventEnvelope,_]。这个EventEnvelope类定义以下: ide
/** * Event wrapper adding meta data for the events in the result stream of * [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries. */ final case class EventEnvelope( offset: Offset, persistenceId: String, sequenceNr: Long, event: Any)
上面这个event字段就是从数据库读取的事件对象。EventEnvelope是以流元素的形式从数据库中提出。eventsByPersistenceId(...)启动了一个数据流,而后akka-persistence-query会按refresh-interval时间间隔重复运算这个流stream。refresh-interval能够在配置文件中设置,以下面的cassandra-plugin配置: 函数
cassandra-query-journal { # Implementation class of the Cassandra ReadJournalProvider class = "akka.persistence.cassandra.query.CassandraReadJournalProvider" # Absolute path to the write journal plugin configuration section write-plugin = "cassandra-journal" # New events are retrieved (polled) with this interval. refresh-interval = 3s ... }
以上描述的是一种接近实时的读取模式。通常来说,为了实现高效、安全的事件存写,咱们会尽可能简化事件结构,这样就会高几率出现一个业务操做单位须要多个事件来描述,那么若是在完成一项业务操做单元的全部事件存写后才开始读端的动做不就简单多了吗?并且还比较容易控制。虽然这样会形成某种延迟,但若是以业务操做为衡量单位,这种延迟应该是很正常的,能够接受的。如今每当完成一项业务的全部事件存写后在读端一次性成批把事件读出来而后进行实质的数据操做,应当可取。akka-persistence-query提供了下面这个函数: 工具
/** * Same type of query as `eventsByPersistenceId` but the event stream * is completed immediately when it reaches the end of the "result set". Events that are * stored after the query is completed are not included in the event stream. */ override def currentEventsByPersistenceId( persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] = ...
咱们能够run这个stream把数据读入一个集合里,而后能够在任何一个线程里用这个集合演算业务逻辑(如咱们前面提到的写端状态转变维护过程),能够用router/routee模式来实现一种在集群节点中负载均衡式的分配reader-actor做业节点。post
下一篇准备对应前面的CQRS Writer Actor 示范里的akka-cluster-pos进行rCQRS-Reader-Actor示范。ui