Akka中的Message Stash

    Akka是JVM平台上构建高并发、分布式以及高度容错应用的工具包,其基于Actor模型实现了m:n的线程模式(m大于n,m是actor实例的个数,n是线程数量)。akka程序中每一个actor实例都扮演一种角色或者实现某一个功能,每个actor实例都有对应的消息邮箱,actor从本身的邮箱中消费消息,actor之间经过向对方的邮箱发送消息互相交流。经常遇到这样一种情形:某个actor须要在A和B两种状态下切换,以及分别在每种状态下接收各自对应的信息Messsage-A类型和Messsage-B类型的消息实例。因为当处于A状态时,仍然会收到本该处于B状态时处理的Messsage-B类型的消息实例;当处于B状态时,亦是如此。
    解决思路是把当前没法处理的消息暂存(stash),在切换对应的状态前把全部的暂存的消息添加到消息队列最前方,这些被释放出来的消息仍然按照它们被接收到顺序依次处理。node

简单应用

    实现一个akka集群监控功能(AkkaClusterHealthActor),等待http请求,在响应请求时向集群中全部其余节点请求健康状态,等待全部节点所有返回健康信息,汇总信息后,再完成http请求,最后回到最初状态:等待http请求。并发

编码分析

    以上问题能够大体分为两种状态:等待http请求、等待其余节点的健康状态,可是在等待其余节点健康状态时,仍会收到http请求,须要暂存http请求,稍后释放作进一步处理。tcp

object SomeApp {
  ..... //程序初始化 ActorSystem、ExecutorService ....
  val clusterhealthActor = system.actorOf(Props[AkkaClusterHealthActor], "clusterhealthActor") 
  ...
  def main(args: Array[String]): Unit = {
    ...
    val router = 
      path("healthState"){ //akka-http dsl
        get{
          imperativelyComplete {
            httpCtx =>
              clusterhealthActor ! httpCtx //向该actor发送http请求实例
          }
        }
      }
    ...
  }
}
class AkkaClusterHealthActor extends Actor with Stash {

  val cluster = Cluster(context.system) //获取集群状态信息,包括节点的列表、地址、状态等等

  override def preStart(): Unit = context.become(waitingForHttpRequest) //actor初始化时

  override def receive: Receive = {
    case _ =>
  }

  def waitingForHttpRequest: Receive = {
    case httpCtx: ImperativeRequestContext => //等待http请求
      val members = cluster.state.members.toList //包含自身节点
      members.foreach { member =>
        context.actorSelection(s"${member.address}/user/akkaMonitorActor") ! "FetchState" //请求集群的节点健康状态 ,member.address akka.tcp://itoa@127.0.0.1:2551
      }
      context.become(waitingForAkkaNode(httpCtx, members.length, Nil)) //利用FSM避免了局部变量的产生,members.length 须要等待几个节点的返回,有可能超时
      context.setReceiveTimeout(3 seconds)
  }

  def waitingForAkkaNode(ctx: ImperativeRequestContext, memberNum: Int, results: List[StateResult]): Receive = { //集群个数 用于接收到的信息个数
    case nodeStateResult: StateResult =>
      (memberNum - 1) match {
        case 0 => //完成任务 取消超时设置
          context.setReceiveTimeout(Duration.Inf)
          ctx.complete(nodeStateResult)
          context.become(waitingForHttpRequest)
          unstashAll() //完成该请求,处理其余请求
        case _ => //仍有部分节点没有返回健康状态结果,继续等待
          context.become(waitingForAkkaNode(ctx, memberNum - 1, nodeStateResult :: results)) //这里的代码有点相似于 `尾递归`
      }
    case ReceiveTimeout =>
      context.setReceiveTimeout(Duration.Inf) //完成任务 取消超时设置
      ctx.complete(results) //返回部分结果,完成http请求
      context.become(waitingForHttpRequest)
      unstashAll() //完成该次请求,释放暂存的http请求,处理其余请求
    case _ =>
      stash() //处理某个请求中,可是收到了其余http请求,暂存消息
  }
}

    以上代码大体实现了具体逻辑,actor在waitingForHttpRequestwaitingForAkkaNode两种状态之间来回切换,让咱们来看看akka中关于该功能的源码。分布式

...
trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]

    重启以前(并非重启以后)应当释放全部暂存的消息,一样的,并且须要在关闭以后释放暂存的消息。ide

...
trait UnrestrictedStash extends Actor with StashSupport {
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    try unstashAll() finally super.preRestart(reason, message)
  }
  override def postStop(): Unit = try unstashAll() finally super.postStop()
}
private[akka] trait StashSupport {

  private[akka] def context: ActorContext

  private[akka] def self: ActorRef

  //基于`scala.collection.immutable.Vector`,当存储的消息数量很大时,也能够得到很好的性能
  private var theStash = Vector.empty[Envelope]

  // ActorContext是ActorCell的子类
  private def actorCell = context.asInstanceOf[ActorCell] 
  
  //可为暂存的消息设置数量限制,经过`stash-capacity`配置,默认为-1,即容量不限
  private val capacity: Int =
    context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox)

  private[akka] val mailbox: DequeBasedMessageQueueSemantics = {
    actorCell.mailbox.messageQueue match {
      case queue: DequeBasedMessageQueueSemantics ⇒ queue
      case other ⇒ throw ActorInitializationException
    }
  }
  
  //1. 获取当前的消息,不能将同一个消息存两次(不能调用两次`stash()`函数,`eq`用于比较对象之间的内存地址)
  //2. capacity默认-1,存储容量不限
  //3. 暂存的消息被添加到`theStash`集合的头部(遍历或者迭代时,从尾向头遍历)
  def stash(): Unit = {
    val currMsg = actorCell.currentMessage
    if (theStash.nonEmpty && (currMsg eq theStash.last))
      throw new IllegalStateException(s"Can't stash the same message $currMsg more than once")
    if (capacity <= 0 || theStash.size < capacity) 
      theStash = theStash :+ currMsg 
    else throw new StashOverflowException
  }

   //当`others`很小时,而`theStash`很大,该方法很高效
   //从头向尾依次将`others`中元素添加到`theStash`尾
  private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
    theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)
    ...
  
  //`reverseIterator`反序输出`theStash`中的每一封邮件,依次添加到邮箱的头部
  def unstashAll(): Unit = {
    try {
      val i = theStash.reverseIterator 
      while (i.hasNext) enqueueFirst(i.next())
    } finally {
      theStash = Vector.empty[Envelope]
    }
  }
  ...
  
  //把该封信放在邮箱的第一个位置
  private def enqueueFirst(envelope: Envelope): Unit = {
    mailbox.enqueueFirst(self, envelope)
    envelope.message match {
      case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref)
      case _               ⇒
    }
  }
}

    基于scala.collection.immutable.Vector的实现,在存储数量很大时,仍然能够得到很好的性能。
    同一个消息不能被暂存两次,不然程序抛出IllegalStateException两次。
    theStash = theStash :+ currMsgtheStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)val i = theStash.reverseIterator这几句关于集合操做与迭代的代码确保了:这些被释放出来的消息依然按照它们被接收到顺序依次处理,先来后到规则要遵照。函数

相关文章
相关标签/搜索