在某项目里,有个 actor 须要作一些持久化的操做,这些操做耗时比较久,理应使用异步的代码来写,可是需求又强调每次只能作一个持久化操做,后来的请求应该等待。一个显然的作法是阻塞式的写,这样就能比较简单的实现顺序花操做。api
代码写完之后,我以为在 actor 中 block 不够完美,就想其余的解决方案。实际上,借助 akka actor 的一些函数,能够实如今不阻塞的状况下实现顺序执行请求的功能的。这种办法的核心是使用 become, unbecome 函数:actor 设置两种状态 free 和 busy,当 free 的时,处理消息,当 busy 时,暂时将消息存储起来,处理消息后,给 actor 返回 done 指令,actor 的状态从新返回到 free,准备处理下一个请求。具体的实现又有不少细节能够考虑,好比当 busy 时到来的请求存储到哪里,是 stash 起来仍是在 actor 内部维护一个 queue。请求的处理逻辑是写在 actor 内部,仍是借鉴 cameo 模式,再建立一个 actor。异步
网上已有一种实现,我看了下,以为应该没有问题。只不过 actor 内部维护了一个 queue,这可能会形成 actor 死亡后重启数据丢失的状况。更好的办法应该是 cameo 模式建立新的 actor 来处理可能出现异常(危险)的工做,其次是把 actor 的 mailbox 当作那个 queue,不要本身维护,按照 doc 缩写,actor 重启后,mailbox 的消息不会丢失。函数
package actors import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import akka.actor.{ Actor, ActorRef } import play.api.libs.concurrent.Akka import play.api.Logger import play.api.Play.current trait SequentialActor { this: Actor => import SequentialActor._ // Actor defines type Receive as PartialFunction[Any, Unit] type ReceiveAsync = PartialFunction[Any, Future[_]] private val queue = scala.collection.mutable.Queue[Job]() private def enqueue(job: Job): Unit = queue enqueue job private def dequeue: Option[Job] = if (queue.isEmpty) None else Some(queue.dequeue) private var _senderAsync: ActorRef = _ def senderAsync = _senderAsync def receive: Receive = { case msg => context become busy process(Job(msg, sender)) } def busy: Receive = { case Done => dequeue match { case None => context.unbecome case Some(job) => process(job) } case msg => enqueue(Job(msg, sender)) } def process(job: Job) { _senderAsync = job.sender (receiveAsync orElse fallback)(job.msg).onComplete { _ => self ! Done } } def receiveAsync: ReceiveAsync def fallback: ReceiveAsync = { case msg => Logger.error(s"Unhandled message: $msg") Future.successful{ () } } } object SequentialActor { case object Done case class Job(msg: Any, sender: ActorRef) }