Akka是JVM平台上构建高并发、分布式以及高度容错应用的工具包,其基于Actor模型实现了m:n的线程模式(m大于n,m是actor实例的个数,n是线程数量)。akka程序中每一个actor实例都扮演一种角色或者实现某一个功能,每个actor实例都有对应的消息邮箱,actor从本身的邮箱中消费消息,actor之间经过向对方的邮箱发送消息互相交流。经常遇到这样一种情形:某个actor须要在A和B两种状态下切换,以及分别在每种状态下接收各自对应的信息Messsage-A
类型和Messsage-B
类型的消息实例。因为当处于A状态时,仍然会收到本该处于B状态时处理的Messsage-B
类型的消息实例;当处于B状态时,亦是如此。
解决思路是把当前没法处理的消息暂存(stash
),在切换对应的状态前把全部的暂存的消息添加到消息队列最前方,这些被释放出来的消息仍然按照它们被接收到顺序依次处理。node
实现一个akka集群监控功能(AkkaClusterHealthActor
),等待http请求,在响应请求时向集群中全部其余节点请求健康状态,等待全部节点所有返回健康信息,汇总信息后,再完成http请求,最后回到最初状态:等待http请求。并发
以上问题能够大体分为两种状态:等待http请求、等待其余节点的健康状态,可是在等待其余节点健康状态时,仍会收到http请求,须要暂存http请求,稍后释放作进一步处理。tcp
object SomeApp { ..... //程序初始化 ActorSystem、ExecutorService .... val clusterhealthActor = system.actorOf(Props[AkkaClusterHealthActor], "clusterhealthActor") ... def main(args: Array[String]): Unit = { ... val router = path("healthState"){ //akka-http dsl get{ imperativelyComplete { httpCtx => clusterhealthActor ! httpCtx //向该actor发送http请求实例 } } } ... } }
class AkkaClusterHealthActor extends Actor with Stash { val cluster = Cluster(context.system) //获取集群状态信息,包括节点的列表、地址、状态等等 override def preStart(): Unit = context.become(waitingForHttpRequest) //actor初始化时 override def receive: Receive = { case _ => } def waitingForHttpRequest: Receive = { case httpCtx: ImperativeRequestContext => //等待http请求 val members = cluster.state.members.toList //包含自身节点 members.foreach { member => context.actorSelection(s"${member.address}/user/akkaMonitorActor") ! "FetchState" //请求集群的节点健康状态 ,member.address akka.tcp://itoa@127.0.0.1:2551 } context.become(waitingForAkkaNode(httpCtx, members.length, Nil)) //利用FSM避免了局部变量的产生,members.length 须要等待几个节点的返回,有可能超时 context.setReceiveTimeout(3 seconds) } def waitingForAkkaNode(ctx: ImperativeRequestContext, memberNum: Int, results: List[StateResult]): Receive = { //集群个数 用于接收到的信息个数 case nodeStateResult: StateResult => (memberNum - 1) match { case 0 => //完成任务 取消超时设置 context.setReceiveTimeout(Duration.Inf) ctx.complete(nodeStateResult) context.become(waitingForHttpRequest) unstashAll() //完成该请求,处理其余请求 case _ => //仍有部分节点没有返回健康状态结果,继续等待 context.become(waitingForAkkaNode(ctx, memberNum - 1, nodeStateResult :: results)) //这里的代码有点相似于 `尾递归` } case ReceiveTimeout => context.setReceiveTimeout(Duration.Inf) //完成任务 取消超时设置 ctx.complete(results) //返回部分结果,完成http请求 context.become(waitingForHttpRequest) unstashAll() //完成该次请求,释放暂存的http请求,处理其余请求 case _ => stash() //处理某个请求中,可是收到了其余http请求,暂存消息 } }
以上代码大体实现了具体逻辑,actor在waitingForHttpRequest
和waitingForAkkaNode
两种状态之间来回切换,让咱们来看看akka中关于该功能的源码。分布式
... trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]
重启以前(并非重启以后)应当释放全部暂存的消息,一样的,并且须要在关闭以后释放暂存的消息。ide
... trait UnrestrictedStash extends Actor with StashSupport { override def preRestart(reason: Throwable, message: Option[Any]): Unit = { try unstashAll() finally super.preRestart(reason, message) } override def postStop(): Unit = try unstashAll() finally super.postStop() }
private[akka] trait StashSupport { private[akka] def context: ActorContext private[akka] def self: ActorRef //基于`scala.collection.immutable.Vector`,当存储的消息数量很大时,也能够得到很好的性能 private var theStash = Vector.empty[Envelope] // ActorContext是ActorCell的子类 private def actorCell = context.asInstanceOf[ActorCell] //可为暂存的消息设置数量限制,经过`stash-capacity`配置,默认为-1,即容量不限 private val capacity: Int = context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox) private[akka] val mailbox: DequeBasedMessageQueueSemantics = { actorCell.mailbox.messageQueue match { case queue: DequeBasedMessageQueueSemantics ⇒ queue case other ⇒ throw ActorInitializationException } } //1. 获取当前的消息,不能将同一个消息存两次(不能调用两次`stash()`函数,`eq`用于比较对象之间的内存地址) //2. capacity默认-1,存储容量不限 //3. 暂存的消息被添加到`theStash`集合的头部(遍历或者迭代时,从尾向头遍历) def stash(): Unit = { val currMsg = actorCell.currentMessage if (theStash.nonEmpty && (currMsg eq theStash.last)) throw new IllegalStateException(s"Can't stash the same message $currMsg more than once") if (capacity <= 0 || theStash.size < capacity) theStash = theStash :+ currMsg else throw new StashOverflowException } //当`others`很小时,而`theStash`很大,该方法很高效 //从头向尾依次将`others`中元素添加到`theStash`尾 private[akka] def prepend(others: immutable.Seq[Envelope]): Unit = theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s) ... //`reverseIterator`反序输出`theStash`中的每一封邮件,依次添加到邮箱的头部 def unstashAll(): Unit = { try { val i = theStash.reverseIterator while (i.hasNext) enqueueFirst(i.next()) } finally { theStash = Vector.empty[Envelope] } } ... //把该封信放在邮箱的第一个位置 private def enqueueFirst(envelope: Envelope): Unit = { mailbox.enqueueFirst(self, envelope) envelope.message match { case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref) case _ ⇒ } } }
基于scala.collection.immutable.Vector
的实现,在存储数量很大时,仍然能够得到很好的性能。
同一个消息不能被暂存两次,不然程序抛出IllegalStateException
两次。
theStash = theStash :+ currMsg
、theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)
和val i = theStash.reverseIterator
这几句关于集合操做与迭代的代码确保了:这些被释放出来的消息依然按照它们被接收到顺序依次处理,先来后到
规则要遵照。函数