akka-typed(8) - CQRS读写分离模式

 前面介绍了事件源(EventSource)和集群(cluster),如今到了讨论CQRS的时候了。CQRS即读写分离模式,由独立的写方程序和读方程序组成,具体原理在之前的博客里介绍过了。akka-typed应该天然支持CQRS模式,最起码自己提供了对写方编程的支持,这点从EventSourcedBehavior 能够知道。akka-typed提供了新的EventSourcedBehavior-Actor,极大方便了对persistentActor的应用开发,但同时也给编程者形成了一些限制。如手工改变状态会更困难了、EventSourcedBehavior不支持多层式的persist,也就是说经过persist某些特定的event而后在event-handler程序里进行状态处理是不可能的了。我这里有个例子,是个购物车应用:当完成支付后须要取个快照(snapshot),下面是这个snapshot的代码:java

 snapshotWhen { (state,evt,seqNr) => CommandHandler.takeSnapshot(state,evt,seqNr) } ... def takeSnapshot(state: Voucher, evt: Events.Action, lstSeqNr: Long)(implicit pid: PID) = { if (evt.isInstanceOf[Events.PaymentMade] || evt.isInstanceOf[Events.VoidVoucher.type] || evt.isInstanceOf[Events.SuspVoucher.type]) if (state.items.isEmpty) { log.step(s"#${state.header.num} taking snapshot at [$lstSeqNr] ...") true } else
        false
    else
      false

}

判断event类型是没有问题的,由于正是当前的事件,但另外一个条件是购物车必须是清空了的。这个有点为难,由于这个状态要依赖这几个event运算的结果才能肯定,也就是下一步,但肯定结果又须要对购物车内容进行计算,好像是个死循环。在akka-classic里咱们能够在判断了event运算结果后,若是须要改变状态就再persist一个特殊的event,而后在这个event的handler进行状态处理。没办法,EventSourcedBehavior不支持多层persist,只有这样作:node

 

      case PaymentMade(acct, dpt, num, ref,amount) => ... writerInternal.lastVoucher = Voucher(vchs, vItems) endVoucher(Voucher(vchs,vItems),TXNTYPE.sales) Voucher(vchs.nextVoucher, List()) ...  

 

我只能先吧当前状态保存下来、进行结单运算、而后清空购物车,这样snapshot就能够顺利进行了。数据库

好了,akka的读方编程是经过PersistentQuery实现的。reader的做用就是把event从数据库读出来后再恢复成具体的数据格式。咱们从reader的调用了解一下这个应用里reader的实现细节:编程

 

    val readerShard = writerInternal.optSharding.get val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId") readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)

能够看到这个reader是一个集群分片,sharding-entity。想法是每单完成购买后发个消息给一个entity、这个entity再完成reader功能后自动终止,当即释放出占用的资源。reader-actor的定义以下:json

object POSReader extends LogSupport { val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("POSReader") def apply(nodeAddress: String, trace: Boolean): Behavior[Command] = { log.stepOn = trace implicit var pid: PID = PID("","") Behaviors.supervise( Behaviors.setup[Command] { ctx => Behaviors.withTimers { timer =>
          implicit val ec = ctx.executionContext Behaviors.receiveMessage { case PerformRead(shopid, posid, vchnum, opr, bseq, eseq, txntype, xurl, xacct, xpass) => pid = PID(shopid, posid) log.step(s"POSReader: PerformRead($shopid,$posid,$vchnum,$opr,$bseq,$eseq,$txntype,$xurl,$xacct,$xpass)")(PID(shopid, posid)) val futReadSaveNExport = for { txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype) _ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend, { if(txntype == Events.TXNTYPE.voidall) txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall)) else txnitems }, trace)(ctx.system.toClassic, pid) } yield () ctx.pipeToSelf(futReadSaveNExport) { case Success(_) => { timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds) StopReader } case Failure(err) => log.error(s"POSReader: Error: ${err.getMessage}") timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds) StopReader } Behaviors.same case StopReader => Behaviors.same case ReaderFinish(shopid, posid, vchnum) => Behaviors.stopped( () => log.step(s"POSReader: {$shopid,$posid} finish reading voucher#$vchnum and stopped")(PID(shopid, posid)) ) } } } ).onFailure(SupervisorStrategy.restart) }

reader就是一个普通的actor。值得注意的是读方程序多是一个庞大复杂的程序,确定须要分割成多个模块,因此咱们能够按照流程顺序进行模块功能切分:这样下面的模块可能会须要上面模块产生的结果才能继续。记住,在actor中绝对避免阻塞线程,全部的模块都返回Future, 而后用for-yield串起来。上面咱们用了ctx.pipeToSelf 在Future运算完成后发送ReaderFinish消息给本身,通知本身中止。api

在这个例子里咱们把reader任务分红:session

一、从数据库读取事件app

二、事件重演一次产生状态数据(购物车内容)ide

三、将造成的购物车内容做为交易单据项目存入数据库post

四、向用户提供的restapi输出交易数据

event读取是经过cassandra-persistence-plugin实现的:

    val query = PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) // issue query to journal
    val source: Source[EventEnvelope, NotUsed] = query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq) // materialize stream, consuming events
    val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

这部分比较简单:定义一个PersistenceQuery,用它产生一个Source,而后run这个Source获取Future[List[Any]]。

重演事件产生交易数据:

    def buildVoucher(actions: List[Any]): List[TxnItem] = { log.step(s"POSReader: read actions: $actions") val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided]) val listOfActions = onlytxns.reverse zip (LazyList from 1)   //zipWithIndex
      listOfActions.foreach { case (txn,idx) => txn.asInstanceOf[Action] match { case Voided(_) =>
          case ti@_ => curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr) if(voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) { curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr=cshr) log.step(s"POSReader: voided txnitem: $curTxnItem") } val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true) vchState = vch.header vchItems = vch.txnItems log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}") } } log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}") vchItems.txnitems }

重演List[Event],产生了List[TxnItem]。

向数据库里写List[TxnItem]:

 

 def writeTxnsToDB(vchnum: Int, txntype: Int, bseq: Long, eseq: Long, txns: List[TxnItem])( implicit system: akka.actor.ActorSystem, session: CassandraSession, pid: PID): Future[Seq[TxnItem]] = ???

注意返回结果类型Future[Seq[TxnItem]]。咱们用for-yield把这几个动做串起来:

  val txnitems: Future[List[Events.TxnItem]] = for { lst1 <- readActions    //read list from Source
      lstTxns <- if (lst1.length < (endSeq -startSeq))    //if imcomplete list read again
 readActions else FastFuture.successful(lst1) items <- FastFuture.successful( buildVoucher(lstTxns) ) _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items) _ <- session.close(ec) } yield items

注意返回结果类型Future[Seq[TxnItem]]。咱们用for-yield把这几个动做串起来:

  val txnitems: Future[List[Events.TxnItem]] = for { lst1 <- readActions    //read list from Source
      lstTxns <- if (lst1.length < (endSeq -startSeq))    //if imcomplete list read again
 readActions else FastFuture.successful(lst1) items <- FastFuture.successful( buildVoucher(lstTxns) ) _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items) _ <- session.close(ec) } yield items

注意:这个for返回的Future[List[TxnItem]],是提供给restapi输出功能的。在那里List[TxnItem]会被转换成json做为post的包嵌数据。

如今全部子任务的返回结果类型都是Future了。咱们能够再用for来把它们串起来:

             val futReadSaveNExport = for { txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype) _ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend, { if(txntype == Events.TXNTYPE.voidall) txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall)) else txnitems }, trace)(ctx.system.toClassic, pid) } yield ()

说到EventSourcedBehavior,由于用了cassandra-plugin,突然想起配置文件里新旧有很大区别。如今这个application.conf是这样的: 

akka { loglevel = INFO actor { provider = cluster serialization-bindings { "com.datatech.pos.cloud.CborSerializable" = jackson-cbor } } remote { artery { canonical.hostname = "192.168.11.189" canonical.port = 0 } } cluster { seed-nodes = [ "akka://cloud-pos-server@192.168.11.189:2551"] sharding { passivate-idle-entity-after = 5 m } } # use Cassandra to store both snapshots and the events of the persistent actors persistence { journal.plugin = "akka.persistence.cassandra.journal" snapshot-store.plugin = "akka.persistence.cassandra.snapshot" } } akka.persistence.cassandra { # don't use autocreate in production
  journal.keyspace = "poc2g" journal.keyspace-autocreate = on journal.tables-autocreate = on snapshot.keyspace = "poc2g_snapshot" snapshot.keyspace-autocreate = on snapshot.tables-autocreate = on } datastax-java-driver { basic.contact-points = ["192.168.11.189:9042"] basic.load-balancing-policy.local-datacenter = "datacenter1" }

akka.persitence.cassandra段落里能够定义keyspace名称,这样新旧版本应用能够共用一个cassandra,同时在线。

相关文章
相关标签/搜索