Akka系列(三):监管与容错

Akka做为一种成熟的生产环境并发解决方案,必须拥有一套完善的错误异常处理机制,本文主要讲讲Akka中的监管和容错。html

监管

看过我上篇文章的同窗应该对Actor系统的工做流程有了必定的了解Akka系列(二):Akka中的Actor系统,它的很重要的概念就是分而治之,既然咱们把任务分配给Actor去执行,那么咱们必须去监管相应的Actor,当Actor出现了失败,好比系统环境错误,各类异常,能根据咱们制定的相应监管策略进行错误恢复,就是后面咱们会说到的容错。git

监管者

既然有监管这一事件,那必然存在着监管者这么一个角色,那么在ActorSystem中是如何肯定这种角色的呢?github

咱们先来看下ActorSystem中的顶级监管者:并发

图片描述

一个actor系统在其建立过程当中至少要启动三个actor,如上图所示,下面来讲说这三个Actor的功能:app

1./: 根监管者

顾名思义,它是一个老大,它监管着ActorSystem中全部的顶级Actor,顶级Actor有如下几种:ide

  • /user: 是全部由用户建立的顶级actor的监管者;用ActorSystem.actorOf建立的actor在其下。测试

  • /system: 是全部由系统建立的顶级actor的监管者,如日志监听器,或由配置指定在actor系统启动时自动部署的actor。this

  • /deadLetters: 是死信actor,全部发往已经终止或不存在的actor的消息会被重定向到这里。spa

  • /temp:是全部系统建立的短时actor的监管者,例如那些在ActorRef.ask的实现中用到的actor。scala

  • /remote: 是一我的造虚拟路径,用来存放全部其监管者是远程actor引用的actor。

跟咱们日常打交道最多的就是/user,它是咱们在程序中用ActorSystem.actorOf建立的actor的监管者,下面的容错咱们重点关心的就是它下面的失败处理,其余几种顶级Actor具体功能定义已经给出,有兴趣的也能够去了解一下。

根监管者监管着全部顶级Actor,对它们的各类失败状况进行处理,通常来讲若是错误要上升到根监管者,整个系统就会中止。

2./user: 顶级actor监管者

上面已经讲过/user是全部由用户建立的顶级actor的监管者,即用ActorSystem.actorOf建立的actor,咱们能够本身制定相应的监管策略,但因为它是actor系统启动时就产生的,因此咱们须要在相应的配置文件里配置,具体的配置能够参考这里Akka配置

3./system: 系统监管者

/system全部由系统建立的顶级actor的监管者,好比Akka中的日志监听器,由于在Akka中日志自己也是用Actor实现的,/system的监管策略以下:对收到的除ActorInitializationExceptionActorKilledException以外的全部Exception无限地执行重启,固然这也会终止其全部子actor。全部其余Throwable被上升到根监管者,而后整个actor系统将会关闭。

用户建立的普通actor的监管:

上一篇文章介绍了Actor系统的组织结构,它是一种树形结构,其实这种结构对actor的监管是很是有利的,Akka实现的是一种叫“父监管”的形式,每个被建立的actor都由其父亲所监管,这种限制使得actor的监管结构隐式符合其树形结构,因此咱们能够得出一个结论:

一个被建立的Actor确定是一个被监管者,也多是一个监管者,它监管着它的子级Actor

监管策略

上面咱们对ActorSystem中的监管角色有了必定的了解,那么究竟是如何制定相应的监管策略呢?Akka中有如下4种策略:

  • 恢复下属,保持下属当前积累的内部状态

  • 重启下属,清除下属的内部状态

  • 永久地中止下属

  • 升级失败(沿监管树向上传递失败),由此失败本身

这其实很好理解,下面是一个简单例子:

override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume  //恢复
      case _: NullPointerException => Restart //重启
      case _: IllegalArgumentException => Stop //中止
      case _: Exception => Escalate  //向上级传递
    }

咱们能够根据异常的不一样使用不一样监管策略,在后面我会具体给出一个示例程序帮助你们理解。咱们在实现本身的策略时,须要复写Actor中的supervisorStrategy,由于Actor的默认监管策略以下:

final val defaultDecider: Decider = {
    case _: ActorInitializationException ⇒ Stop
    case _: ActorKilledException         ⇒ Stop
    case _: DeathPactException           ⇒ Stop
    case _: Exception                    ⇒ Restart
  }

它对除了它指定的异常进行中止,其余异常都是对下属进行重启。

Akka中有两种类型的监管策略:OneForOneStrategyAllForOneStrategy,它们的主要区别在于:

  • OneForOneStrategy: 该策略只会应用到发生故障的子actor上。

  • AllForOneStrategy: 该策略会应用到全部的子actor上。

咱们通常都使用OneForOneStrategy来进行制定相关监管策略,固然你也能够根据具体需求选择合适的策略。另外咱们能够给咱们的策略配置相应参数,好比上面maxNrOfRetries,withinTimeRange等,这里的含义是每分钟最多进行10次重启,若超出这个界限相应的Actor将会被中止,固然你也可使用策略的默认配置,具体配置信息能够参考源码。

监管容错示例

本示例主要演示Actor在发生错误时,它的监管者会根据相应的监管策略进行不一样的处理。源码连接

由于这个例子比较简单,这里我直接贴上相应代码,后面根据具体的测试用例来解释各类监管策略所进行的响应:

class Supervisor extends Actor {
  //监管下属,根据下属抛出的异常进行相应的处理
  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume
      case _: NullPointerException => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception => Escalate
    }
  var childIndex = 0 //用于标示下属Actor的序号

  def receive = {
    case p: Props =>
      childIndex += 1
      //返回一个Child Actor的引用,因此Supervisor Actor是Child Actor的监管者
      sender() ! context.actorOf(p,s"child${childIndex}")
  }
}

class Child extends Actor {
  val log = Logging(context.system, this)
  var state = 0
  def receive = {
    case ex: Exception => throw ex //抛出相应的异常
    case x: Int => state = x //改变自身状态
    case s: Command if s.content == "get" =>
      log.info(s"the ${s.self} state is ${state}")
      sender() ! state //返回自身状态
  }
}

case class Command(  //相应命令
    content: String,
    self: String
)

如今咱们来看看具体的测试用例:
首先咱们先构建一个测试环境:

class GuardianSpec(_system: ActorSystem)
    extends TestKit(_system)
    with WordSpecLike
    with Matchers
    with ImplicitSender {

  def this() = this(ActorSystem("GuardianSpec"))

  "A supervisor" must {

    "apply the chosen strategy for its child" in {
        code here...
        val supervisor = system.actorOf(Props[Supervisor], "supervisor") //建立一个监管者
        supervisor ! Props[Child]
        val child = expectMsgType[ActorRef] // 从 TestKit 的 testActor 中获取回应
    } 
  }
}

1.TestOne:正常运行

child ! 50 // 将状态设为 50
child ! Command("get",child.path.name)
expectMsg(50)

正常运行,测试经过。

2.TestTwo:抛出ArithmeticException

child ! new ArithmeticException // crash it
child ! Command("get",child.path.name)
expectMsg(50)

你们猜这时候测试会经过吗?答案是经过,缘由是根据咱们制定的监管策略,监管者在面对子级Actor抛出ArithmeticException异常时,它会去恢复相应出异常的Actor,并保持该Actor的状态,因此此时Actor的状态值仍是50,测试经过。

3.TestThree:抛出NullPointerException

child ! new NullPointerException // crash it harder
child ! "get"
expectMsg(50)

这种状况下测试还会经过吗?答案是不经过,缘由是根据咱们制定的监管策略,监管者在面对子级Actor抛出NullPointerException异常时,它会去重启相应出异常的Actor,其状态会被清除,因此此时Actor的状态值应该是0,测试不经过。

4.TestFour:抛出IllegalArgumentException

supervisor ! Props[Child] // create new child
val child2 = expectMsgType[ActorRef]
child2 ! 100 // 将状态设为 100
watch(child) // have testActor watch “child”
child ! new IllegalArgumentException // break it
expectMsgPF() {
  case Terminated(`child`) => (println("the child stop"))
}
child2 ! Command("get",child2.path.name)
expectMsg(100)

这里首先咱们又建立了一个Child Actor为child2,并将它的状态置为100,这里咱们监控前面建立的child1,而后给其发送一个IllegalArgumentException的消息,让其抛出该异常,测试结果:

the child stop
测试经过

从结果中咱们能够看出,child在抛出IllegalArgumentException后,会被其监管着中止,但监管者下的其余Actor仍是正常工做。

5.TestFive:抛出一个自定义异常

watch(child2)
 child2 ! Command("get",child2.path.name) // verify it is alive
 expectMsg(100)
 supervisor ! Props[Child] // create new child
 val child3 = expectMsgType[ActorRef]
 child2 ! new Exception("CRASH") // escalate failure
 expectMsgPF() {
    case t @ Terminated(`child2`) if t.existenceConfirmed => (
       println("the child2 stop")
    )
}
child3 ! Command("get",child3.path.name)
expectMsg(0)

这里首先咱们又建立了一个Child Actor为child3,这里咱们监控前面建立的child2,而后给其发送一个Exception("CRASH")的消息,让其抛出该异常,测试结果:

the child2 stop
测试不经过

不少人可能会疑惑为何TestFour能够经过,这里就通不过不了呢?由于这里错误Actor抛出的异常其监管者没法处理,只能将失败上溯传递,而顶级actor的缺省策略是对全部的Exception状况(ActorInitializationException和ActorKilledException例外)进行重启. 因为缺省的重启指令会中止全部的子actor,因此咱们这里的child3也会被中止。致使测试不经过。固然这里你也能够复写默认的重启方法,好比:

override def preRestart(cause: Throwable, msg: Option[Any]) {}

这样重启相应Actor时就不会中止其子级下的全部Actor了。

本文主要介绍了Actor系统中的监管和容错,这一部份内容在Akka中也是很重要的,它与Actor的树形组织结构巧妙结合,本文大量参考了Akka官方文档的相应章节,有兴趣的同窗能够点击这里Akka docs。也能够下载个人示例程序,里面包含了一个官方的提供的容错示例。

相关文章
相关标签/搜索