Akka做为一种成熟的生产环境并发解决方案,必须拥有一套完善的错误异常处理机制,本文主要讲讲Akka中的监管和容错。html
看过我上篇文章的同窗应该对Actor系统的工做流程有了必定的了解Akka系列(二):Akka中的Actor系统,它的很重要的概念就是分而治之,既然咱们把任务分配给Actor去执行,那么咱们必须去监管相应的Actor,当Actor出现了失败,好比系统环境错误,各类异常,能根据咱们制定的相应监管策略进行错误恢复,就是后面咱们会说到的容错。git
既然有监管这一事件,那必然存在着监管者这么一个角色,那么在ActorSystem中是如何肯定这种角色的呢?github
咱们先来看下ActorSystem中的顶级监管者:并发
一个actor系统在其建立过程当中至少要启动三个actor,如上图所示,下面来讲说这三个Actor的功能:app
/
: 根监管者顾名思义,它是一个老大,它监管着ActorSystem中全部的顶级Actor,顶级Actor有如下几种:ide
/user
: 是全部由用户建立的顶级actor的监管者;用ActorSystem.actorOf建立的actor在其下。测试
/system
: 是全部由系统建立的顶级actor的监管者,如日志监听器,或由配置指定在actor系统启动时自动部署的actor。this
/deadLetters
: 是死信actor,全部发往已经终止或不存在的actor的消息会被重定向到这里。spa
/temp
:是全部系统建立的短时actor的监管者,例如那些在ActorRef.ask的实现中用到的actor。scala
/remote
: 是一我的造虚拟路径,用来存放全部其监管者是远程actor引用的actor。
跟咱们日常打交道最多的就是/user
,它是咱们在程序中用ActorSystem.actorOf建立的actor的监管者,下面的容错咱们重点关心的就是它下面的失败处理,其余几种顶级Actor具体功能定义已经给出,有兴趣的也能够去了解一下。
根监管者监管着全部顶级Actor,对它们的各类失败状况进行处理,通常来讲若是错误要上升到根监管者,整个系统就会中止。
/user
: 顶级actor监管者上面已经讲过/user
是全部由用户建立的顶级actor的监管者,即用ActorSystem.actorOf建立的actor,咱们能够本身制定相应的监管策略,但因为它是actor系统启动时就产生的,因此咱们须要在相应的配置文件里配置,具体的配置能够参考这里Akka配置
/system
: 系统监管者/system
全部由系统建立的顶级actor的监管者,好比Akka中的日志监听器,由于在Akka中日志自己也是用Actor实现的,/system
的监管策略以下:对收到的除ActorInitializationException
和ActorKilledException
以外的全部Exception
无限地执行重启,固然这也会终止其全部子actor。全部其余Throwable被上升到根监管者,而后整个actor系统将会关闭。
用户建立的普通actor的监管:
上一篇文章介绍了Actor系统的组织结构,它是一种树形结构,其实这种结构对actor的监管是很是有利的,Akka实现的是一种叫“父监管”的形式,每个被建立的actor都由其父亲所监管,这种限制使得actor的监管结构隐式符合其树形结构,因此咱们能够得出一个结论:
一个被建立的Actor确定是一个被监管者,也多是一个监管者,它监管着它的子级Actor
上面咱们对ActorSystem中的监管角色有了必定的了解,那么究竟是如何制定相应的监管策略呢?Akka中有如下4种策略:
恢复下属,保持下属当前积累的内部状态
重启下属,清除下属的内部状态
永久地中止下属
升级失败(沿监管树向上传递失败),由此失败本身
这其实很好理解,下面是一个简单例子:
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume //恢复 case _: NullPointerException => Restart //重启 case _: IllegalArgumentException => Stop //中止 case _: Exception => Escalate //向上级传递 }
咱们能够根据异常的不一样使用不一样监管策略,在后面我会具体给出一个示例程序帮助你们理解。咱们在实现本身的策略时,须要复写Actor中的supervisorStrategy
,由于Actor的默认监管策略以下:
final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart }
它对除了它指定的异常进行中止,其余异常都是对下属进行重启。
Akka中有两种类型的监管策略:OneForOneStrategy
和AllForOneStrategy
,它们的主要区别在于:
OneForOneStrategy
: 该策略只会应用到发生故障的子actor上。
AllForOneStrategy
: 该策略会应用到全部的子actor上。
咱们通常都使用OneForOneStrategy
来进行制定相关监管策略,固然你也能够根据具体需求选择合适的策略。另外咱们能够给咱们的策略配置相应参数,好比上面maxNrOfRetries,withinTimeRange等,这里的含义是每分钟最多进行10次重启,若超出这个界限相应的Actor将会被中止,固然你也可使用策略的默认配置,具体配置信息能够参考源码。
本示例主要演示Actor在发生错误时,它的监管者会根据相应的监管策略进行不一样的处理。源码连接
由于这个例子比较简单,这里我直接贴上相应代码,后面根据具体的测试用例来解释各类监管策略所进行的响应:
class Supervisor extends Actor { //监管下属,根据下属抛出的异常进行相应的处理 override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate } var childIndex = 0 //用于标示下属Actor的序号 def receive = { case p: Props => childIndex += 1 //返回一个Child Actor的引用,因此Supervisor Actor是Child Actor的监管者 sender() ! context.actorOf(p,s"child${childIndex}") } } class Child extends Actor { val log = Logging(context.system, this) var state = 0 def receive = { case ex: Exception => throw ex //抛出相应的异常 case x: Int => state = x //改变自身状态 case s: Command if s.content == "get" => log.info(s"the ${s.self} state is ${state}") sender() ! state //返回自身状态 } } case class Command( //相应命令 content: String, self: String )
如今咱们来看看具体的测试用例:
首先咱们先构建一个测试环境:
class GuardianSpec(_system: ActorSystem) extends TestKit(_system) with WordSpecLike with Matchers with ImplicitSender { def this() = this(ActorSystem("GuardianSpec")) "A supervisor" must { "apply the chosen strategy for its child" in { code here... val supervisor = system.actorOf(Props[Supervisor], "supervisor") //建立一个监管者 supervisor ! Props[Child] val child = expectMsgType[ActorRef] // 从 TestKit 的 testActor 中获取回应 } } }
1.TestOne:正常运行
child ! 50 // 将状态设为 50 child ! Command("get",child.path.name) expectMsg(50)
正常运行,测试经过。
2.TestTwo:抛出ArithmeticException
child ! new ArithmeticException // crash it child ! Command("get",child.path.name) expectMsg(50)
你们猜这时候测试会经过吗?答案是经过,缘由是根据咱们制定的监管策略,监管者在面对子级Actor抛出ArithmeticException
异常时,它会去恢复相应出异常的Actor,并保持该Actor的状态,因此此时Actor的状态值仍是50,测试经过。
3.TestThree:抛出NullPointerException
child ! new NullPointerException // crash it harder child ! "get" expectMsg(50)
这种状况下测试还会经过吗?答案是不经过,缘由是根据咱们制定的监管策略,监管者在面对子级Actor抛出NullPointerException
异常时,它会去重启相应出异常的Actor,其状态会被清除,因此此时Actor的状态值应该是0,测试不经过。
4.TestFour:抛出IllegalArgumentException
supervisor ! Props[Child] // create new child val child2 = expectMsgType[ActorRef] child2 ! 100 // 将状态设为 100 watch(child) // have testActor watch “child” child ! new IllegalArgumentException // break it expectMsgPF() { case Terminated(`child`) => (println("the child stop")) } child2 ! Command("get",child2.path.name) expectMsg(100)
这里首先咱们又建立了一个Child Actor为child2,并将它的状态置为100,这里咱们监控前面建立的child1,而后给其发送一个IllegalArgumentException
的消息,让其抛出该异常,测试结果:
the child stop 测试经过
从结果中咱们能够看出,child在抛出IllegalArgumentException
后,会被其监管着中止,但监管者下的其余Actor仍是正常工做。
5.TestFive:抛出一个自定义异常
watch(child2) child2 ! Command("get",child2.path.name) // verify it is alive expectMsg(100) supervisor ! Props[Child] // create new child val child3 = expectMsgType[ActorRef] child2 ! new Exception("CRASH") // escalate failure expectMsgPF() { case t @ Terminated(`child2`) if t.existenceConfirmed => ( println("the child2 stop") ) } child3 ! Command("get",child3.path.name) expectMsg(0)
这里首先咱们又建立了一个Child Actor为child3,这里咱们监控前面建立的child2,而后给其发送一个Exception("CRASH")
的消息,让其抛出该异常,测试结果:
the child2 stop 测试不经过
不少人可能会疑惑为何TestFour能够经过,这里就通不过不了呢?由于这里错误Actor抛出的异常其监管者没法处理,只能将失败上溯传递,而顶级actor的缺省策略是对全部的Exception状况(ActorInitializationException和ActorKilledException例外)进行重启. 因为缺省的重启指令会中止全部的子actor,因此咱们这里的child3也会被中止。致使测试不经过。固然这里你也能够复写默认的重启方法,好比:
override def preRestart(cause: Throwable, msg: Option[Any]) {}
这样重启相应Actor时就不会中止其子级下的全部Actor了。
本文主要介绍了Actor系统中的监管和容错,这一部份内容在Akka中也是很重要的,它与Actor的树形组织结构巧妙结合,本文大量参考了Akka官方文档的相应章节,有兴趣的同窗能够点击这里Akka docs。也能够下载个人示例程序,里面包含了一个官方的提供的容错示例。