角色模型对编写并发、分布式系统进行了高度抽象。它减轻了开发者必须对互斥锁与线程管理的负担,更容易编写出正确的并发与并行系统。早在1973 年 Carl Hewitt 发表的论文中定义了角色,但一直流行于Erlang 语言中,随后被爱立信公司应用于创建高并发、可靠通讯系统,取得了巨大成功。 html
Akka 框架里面角色的API 跟Scala 框架里面角色类似,后者一些语法曾经模仿Erlang语言。 java
注意:因为Akka强迫父级监管者监督每个角色和(潜在的子级)监管者,建议你熟悉角色系统、监管、监控,这将可能帮助你阅读角色参考、路径和地址。 spring
在Java里面,角色是经过继承UntypedActor 类及实现onReceive方法来实现的.这个方法将message做为参数。 数据库
这里有个例子: api
01 | import akka.actor.UntypedActor; |
02 | import akka.event.Logging; |
03 | import akka.event.LoggingAdapter; |
04 |
05 | public class MyUntypedActor extends UntypedActor { |
06 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
07 |
08 | public void onReceive(Object message) throws Exception { |
09 | if (message instanceof String) { |
10 | log.info("Received String message: {}", message); |
11 | getSender().tell(message, getSelf()); |
12 | } else |
13 | unhandled(message); |
14 | } |
15 | } |
Props 是一个配置类,它的做用是对建立角色确认选项。把它做为不可变的、所以可自由共享规则对建立一个角色包括相关部署信息(例如:使用调度,详见下文)。下面是如何建立一个Props 实例的一些例子: 安全
1 | import akka.actor.Props; |
2 | import akka.japi.Creator; |
1 | static class MyActorC implements Creator<MyActor> { |
2 | @Override public MyActor create() { |
3 | return new MyActor("..."); |
4 | } |
5 | } |
6 |
7 | Props props1 = Props.create(MyUntypedActor.class); |
8 | Props props2 = Props.create(MyActor.class, "..."); |
9 | Props props3 = Props.create(new MyActorC()); |
第二行显示如何传递构造参数给Actor去建立。在构建Props对象时,存在匹配的构造是被验证的,若是发现不存在或者存在多个匹配构造,会致使一个IllegalArgumentEception。 网络
第三行验证Creator使用。用来验证Props构造的Creator必须是静态。类型参数是用来判断生成角色类的,若是充分擦除,将落回到Actor类,一个参数化工厂例子,能够是: 并发
1 | static class ParametricCreator<T extends MyActor> implements Creator<T> { |
2 | @Override public T create() { |
3 | // ... fabricate actor here |
4 | } |
5 | } |
注意:
因为邮箱要求——如使用双端队列为基础的邮箱使用的隐藏角色——被拾起,在建立以前,角色类型须要已知的,这是Creator类型参数容许的。所以对你用到角色必定尽量使用特定类型。 app
这是个好的主意在UntypedActor类里面提供静态工厂方法,该方法帮助建立尽量接近角色定义的合适Props 类。这也容许使用基于Creator方法,该方法静态验证所使用的构造函数确实存在,而不是只在运行时检查依赖。 负载均衡
01 | public class DemoActor extends UntypedActor { |
02 |
03 | /** |
04 | * Create Props for an actor of this type. |
05 | * @param magicNumber The magic number to be passed to this actor’s constructor. |
06 | * @return a Props for creating this actor, which can then be further configured |
07 | * (e.g. calling `.withDispatcher()` on it) |
08 | */ |
09 | public static Props props(final int magicNumber) { |
10 | return Props.create(new Creator<DemoActor>() { |
11 | private static final long serialVersionUID = 1L; |
12 |
13 | @Override |
14 | public DemoActor create() throws Exception { |
15 | return new DemoActor(magicNumber); |
16 | } |
17 | }); |
18 | } |
19 |
20 | final int magicNumber; |
21 |
22 | public DemoActor(int magicNumber) { |
23 | this.magicNumber = magicNumber; |
24 | } |
25 |
26 | @Override |
27 | public void onReceive(Object msg) { |
28 | // some behavior here |
29 | } |
30 |
31 | } |
32 |
33 | system.actorOf(DemoActor.props(42), "demo"); |
角色经过传入Props实例进入actorOf 工厂方法,该工厂方法在ActorSystem 和ActorContext类中提供使用。
1 | import akka.actor.ActorRef; |
2 | import akka.actor.ActorSystem; |
1 | // ActorSystem is a heavy object: create only one per application |
2 | final ActorSystem system = ActorSystem.create("MySystem"); |
3 | final ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class), |
4 | "myactor"); |
使用ActorSystem 将建立顶级角色,由角色系统提供守护的角色监管,同时使用一个角色的上下文将建立一个子角色。
1 | class A extends UntypedActor { |
2 | final ActorRef child = |
3 | getContext().actorOf(Props.create(MyUntypedActor.class), "myChild"); |
4 | // plus some behavior ... |
5 | } |
建议建立一个包含子类、超子类等等的层次结构,使得它适合具备逻辑性故障处理应用程序结构,详见Actor Systems。
actorOf 方法调用返回ActorRef实例。这是对角色实例处理,并与它进行交互的惟一途径。该ActorRef是不可变的,并有一个与它表明的一对一关系角色。该ActorRef是可序列化的和具有网络意识的。这意味着,你能够把它进行序列化,将它经过网络发送,在远程主机上使用它,它仍然表明着在原始的节点上相同的角色,横跨网络。
名称参数是可选的,可是你应该给你的角色起个更好名称,由于这是用在日志消息里面,并肯定角色。该名称不能为空或以$开头,但它可能包含URL编码的字符(例如,%20表明空格)。若是给定的名称已被相同父类中的其余子类使用,那将抛出InvalidActorNameException异常。
角色是自动异步启动当被建立时候。
若是你的未类型化的角色有一个携带参数的构造函数,而后那些须要Prosp的一部分,以及,如上所述。但在某些状况下,必须使用一个工厂方法,例如当实际构造函数参数由一个依赖注入框架决定时。
1 | import akka.actor.Actor; |
2 | import akka.actor.IndirectActorProducer; |
01 | class DependencyInjector implements IndirectActorProducer { |
02 | final Object applicationContext; |
03 | final String beanName; |
04 |
05 | public DependencyInjector(Object applicationContext, String beanName) { |
06 | this.applicationContext = applicationContext; |
07 | this.beanName = beanName; |
08 | } |
09 |
10 | @Override |
11 | public Class<? extends Actor> actorClass() { |
12 | return MyActor.class; |
13 | } |
14 |
15 | @Override |
16 | public MyActor produce() { |
17 | MyActor result; |
18 | // obtain fresh Actor instance from DI framework ... |
19 | return result; |
20 | } |
21 | } |
22 |
23 | final ActorRef myActor = getContext().actorOf( |
24 | Props.create(DependencyInjector.class, applicationContext, "MyActor"), |
25 | "myactor3"); |
警告:
你可能有时会倾向于提供一个IndirectActorProducer它老是返回相同的实例,例如:经过使用一个静态字段。这是不支持的,由于它违背了一个角色重启含义,这是这里所描述的含义:什么从新启动方式。当使用一个依赖注入框架时,角色Beans 必定不能是单例模式范围。
依赖注入和依赖注入框架集成技术更深刻地介绍了使用Akka与依赖注入指导方针和在类型安全的活化剂方面的Akka Java Spring 指导。
当写在角色外面的代码,应与角色进行沟通,在ask模式能够是一个解决方案(见下文),但有两个事情不能作:接收多个回复(例如:经过订阅的ActorRef到通知服务)和监控其余角色的生命周期。为了这些目的这里有个Inbox 类:
1 | final Inbox inbox = Inbox.create(system); |
2 | inbox.send(target, "hello"); |
3 | assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)).equals("world"); |
send方法包装一个标准的tell和提供一个内部的角色引用做为发送者。在最后一行将容许该回复被接收。监控一个角色同时也十分简单。
1 | final Inbox inbox = Inbox.create(system); |
2 | inbox.watch(target); |
3 | target.tell(PoisonPill.getInstance(), ActorRef.noSender()); |
4 | assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)) instanceof Terminated; |
UntypedActor 类仅仅定义一个抽象方法,就是上面提到onReceive(Object message)方法,该方法实现了角色行为。若是当前角色行为不匹配一个接收信息,建议调用unhandled 方法,该方法默认将发出一个new akka.actor.UnhandledMessage(message, sender, recipient)在系统角色事件流中(设置配置项akka.actor.debug.unhandled 到on 让它们转化为实际调试信息)。另外,它提供:
剩余的可见的方法是用户可重写的生命周期钩子,将在如下描述:
01 | public void preStart() { |
02 | } |
03 |
04 | public void preRestart(Throwable reason, scala.Option<Object> message) { |
05 | for (ActorRef each : getContext().getChildren()) { |
06 | getContext().unwatch(each); |
07 | getContext().stop(each); |
08 | } |
09 | postStop(); |
10 | } |
11 |
12 | public void postRestart(Throwable reason) { |
13 | preStart(); |
14 | } |
15 |
16 | public void postStop() { |
17 | } |
上面显示实现是默认由UntypedActor 类提供。
在角色系统中的路径表明一个“地方”,这可能被一个存活着的的角色占用着。最初,(除了系统初始化角色)的路径是空的。当actorOf()被调用时,指定一个由经过Props 描述给定的路径角色的化身。一个角色化身由路径和一个UID肯定。从新启动仅仅交换Props 定义的Actor 实例,但化身与UID依然是相同的。
当该角色中止时,化身的生命周期也相应结束了。在这一刻时间上相对应的生命周期事件也将被调用和监管角色也被通知终止结束。化身被中止以后,路径也能够重复被经过actorOf() 方法建立的角色使用。在这种状况下,新的化身的名称跟与前一个将是相同的而是UIDs将会有所不一样。
一个ActorRef 老是表明一个化身(路径和UID)而不仅是一个给定的路径。所以,若是一个角色中止,一个新的具备相同名称建立的旧化身的ActorRef不会指向新的。
在另外一方面ActorSelection 指向该路径(或多个路径在使用通配符时),而且是彻底不知道其化身当前占用着它。 因为这个缘由致使ActorSelection 不能被监视到。经过发送识别信息到将被回复包含正确地引用(见经过角色选择集识别角色)的 ActorIdentity 的ActorSelection 来解决当前化身ActorRef 存在该路径之下。这也能够用ActorSelection 类的resolveOne方法来解决,这将返回一个匹配ActorRef 的Future 。
当另外一个角色终止时,为了通知被通知(即永久性地中止,而不是暂时的失败和从新启动),一个角色能够本身注册为接收在终止上层的其余角色发送的终止消息,其余演员出动(请参阅中止演员)。这项服务是由角色系统的临终看护组件提供。
注册一个监视器是很容易的(见第四行,剩下的就是用于展现整个功能):
1 | import akka.actor.Terminated; |
01 | public class WatchActor extends UntypedActor { |
02 | final ActorRef child = this.getContext().actorOf(Props.empty(), "child"); |
03 | { |
04 | this.getContext().watch(child); // <-- the only call needed for registration |
05 | } |
06 | ActorRef lastSender = getContext().system().deadLetters(); |
07 |
08 | @Override |
09 | public void onReceive(Object message) { |
10 | if (message.equals("kill")) { |
11 | getContext().stop(child); |
12 | lastSender = getSender(); |
13 | } else if (message instanceof Terminated) { |
14 | final Terminated t = (Terminated) message; |
15 | if (t.getActor() == child) { |
16 | lastSender.tell("finished", getSelf()); |
17 | } |
18 | } else { |
19 | unhandled(message); |
20 | } |
21 | } |
22 | } |
可是应当注意的是,产生的终止消息独立于注册和终止发生的顺序。特别是,监控角色将接收一个终止信息即便被监控角色已经被终止在注册时候。
注册屡次并没必要然致使对多个消息被产生,但不保证只有一个对应这样的消息被接收:若是被监控角色终止已经发生和发送的消息排队等候着,在另外一个注册完成以前,该消息已经处理完,而后第二消息将会排队,是由于已经结束角色的监控的注册致使终止信息马上产生。
使用getContext().unwatch(target)方法从监控另外一个角色生命活力撤销下来也是有可能的。这个工做即便已终止消息已经排队于邮箱中,在调用unwatch方法后对于那个角色将没有终止消息被处理。
在正确启动角色以后,preStart方法被调用。
1 | @Override |
2 | public void preStart() { |
3 | child = getContext().actorOf(Props.empty()); |
4 | } |
第一次建立角色时,该方法被调用。在从新启动期间,它被postRestart的默认实现调用,这意味着经过重写该方法,你能够选择此方法中初始化代码是否被调用,对这个角色或每次重启仅只调用一次。在一个角色类的实例建立时,角色的构造函数的一部分的初始化代码将每次都被调用,这发生在每次重启时。
全部角色被监督着,即用故障处理策略连接到另外一个角色。当处理一个消息是,抛出一个异常的状况下,演员可能从新启动(见监管与监控)抛出一个异常。这重启涉及上述提到钩子:
1. 旧角色是经过调用preRestart方法进行通知的,这伴随着形成重启的异常与绑定该异常的消息;处理一个消息没有形成这个重启发生,则后者可能也没有发生,例如,当一个监管者不捕获该异常,则由其监管者重启又或者若是因为一个同类的失败,一个角色将被从新启动。若是消息是可用的,那么该消息的发件人也能够经过正常方式访问的(即经过调用getSender())。
这个方法用在这些地方时最好的,例如:清除,准备交到新的角色实例等等。默认它中止全部子实例和调用postStop方法。
2. 来自actorOf方法调用的初始化工厂用来产生新的实例。
3. 新角色的postRestart方法被调用时这引发了重启异常。默认状况下,preStart 是被调用,就如同在正常启动的状况下。
一个角色重启仅替换实际角色的对象;邮箱中的内容是不受重启影响,因此消息的处理将在postRestart钩子返回后恢复。引起异常的消息将不会再接收。当重启时候,发送到角色的任何消息将像日常同样排队到它的邮箱。
注意:要知道,相关用户失败消息的顺序是不肯定的。特别是,一个父类可能会从新启动其子类以前它已经处理了在失败以前子类发送故障的的最后消息。见讨论:消息顺序的详细信息。
终止一个角色以后,其postStop钩子被调用时,其可能用于例如从其余服务注销这个角色。在这个角色的消息队列已禁用以后,这个钩子仍保证运行,即送到已终止角色的信息将被重定向到ActorSystem的deadLetters。
做为角色的引用,路径和地址描述,每一个角色都有一个惟一的逻辑路径,这是由如下的子类到父类直到达到角色系统的根的角色的链获得的,它有一个物理路径,若是监管链包括任何远程监管者,这可能会有所不一样。这些路径是由系统使用来查找角色,如当接收到一个远程的消息和收件人进行搜索,但他们也有更直接用法:角色能够查找其余角色经过指定绝对或相对路径,逻辑或物理,并接收返回的结果的ActorSelection:
1 | // will look up this absolute path |
2 | getContext().actorSelection("/user/serviceA/actor"); |
3 | // will look up sibling beneath same supervisor |
4 | getContext().actorSelection("../joe"); |
其中指定的路径被解释为一个java.net.URI, 它以 / 分隔成路径段. 若是路径以 /开始, 表示一个绝对路径,从根监管者 ( “/user”的父亲)开始查找; 不然是从当前角色开始。若是某一个路径段为 .., 会找到当前所遍历到的角色的上一级, 不然则会向下一级寻找具备该名字的子角色。 必须注意的是角色路径中的.. 老是表示逻辑结构,也就是其监管者。
一个角色选择集的路径元素能够包含通配符,容许消息额广播到该选择集:
1 | // will look all children to serviceB with names starting with worker |
2 | getContext().actorSelection("/user/serviceB/worker*"); |
3 | // will look up all siblings beneath same supervisor |
4 | getContext().actorSelection("../*"); |
信息能够经过ActorSelection发送和当传送的每一个消息时,查找ActorSelection的路径。若是选择集不匹配任何角色的消息将被丢弃。
为了得到一个ActorSelection的ActorRef,你须要发送一个消息到选择集和使用来自橘色的回复的getSender引用。有一个内置的识别信息,即全部角色都理解并自动回复一个包含ActorRef的ActorIdentity消息。此消息由该角色特殊处理,在这个意义上说是穿越的,若是一个具体的名称查找失败(即非通配符路径元素不符合一个存在的角色)而后产生一个消极结果。请注意,这并不意味着传递的答复是有保障的,但它仍然是一个正常的消息。
1 | import akka.actor.ActorIdentity; |
2 | import akka.actor.ActorSelection; |
3 | import akka.actor.Identify; |
01 | public class Follower extends UntypedActor { |
02 | final String identifyId = "1"; |
03 | { |
04 | ActorSelection selection = |
05 | getContext().actorSelection("/user/another"); |
06 | selection.tell(new Identify(identifyId), getSelf()); |
07 | } |
08 | ActorRef another; |
09 |
10 | final ActorRef probe; |
11 | public Follower(ActorRef probe) { |
12 | this.probe = probe; |
13 | } |
14 |
15 | @Override |
16 | public void onReceive(Object message) { |
17 | if (message instanceof ActorIdentity) { |
18 | ActorIdentity identity = (ActorIdentity) message; |
19 | if (identity.correlationId().equals(identifyId)) { |
20 | ActorRef ref = identity.getRef(); |
21 | if (ref == null) |
22 | getContext().stop(getSelf()); |
23 | else { |
24 | another = ref; |
25 | getContext().watch(another); |
26 | probe.tell(ref, getSelf()); |
27 | } |
28 | } |
29 | } else if (message instanceof Terminated) { |
30 | final Terminated t = (Terminated) message; |
31 | if (t.getActor().equals(another)) { |
32 | getContext().stop(getSelf()); |
33 | } |
34 | } else { |
35 | unhandled(message); |
36 | } |
37 | } |
38 | } |
您也能够取得一个ActorSelection的ActorRef经过ActorSelection的resolveOne方法。它返回匹配ActorRef的Future,若是这样一个角色存在。若是没有这样的角色存在或鉴定所提供的超时时间内没有完成,它将已失败了结akka.actor.ActorNotFound。
远程角色地址也能够查找,若是远程被启用:
1 | getContext().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB"); |
一个关于actor查找的示例见 远程查找.
注意:在支持actorSelection,actorFor是被废弃,由于用actorFor得到的角色引用对本地与远程角色表现不一样。在一个本地角色引用的状况下,查找以前命名的演员须要存在,不然所获取的引用将是一个EmptyLocalActorRef。即便在获取角色引用以后,一个真实路径的角色才被建立,这时也是能够获取的。对于远程角色引用经过actorFor来获取的行为不一样的,发送信息到该引用上将在覆盖下经过在远程系统给每个消息发送的路径查找角色。
重要信息:消息能够是任何类型的对象,但必须是不可变的。阿Aka不能强制不变性,因此这必须按照约定。 这里是一个不变的消息的示例:
01 | public class ImmutableMessage { |
02 | private final int sequenceNumber; |
03 | private final List<String> values; |
04 |
05 | public ImmutableMessage(int sequenceNumber, List<String> values) { |
06 | this.sequenceNumber = sequenceNumber; |
07 | this.values = Collections.unmodifiableList(new ArrayList<String>(values)); |
08 | } |
09 |
10 | public int getSequenceNumber() { |
11 | return sequenceNumber; |
12 | } |
13 |
14 | public List<String> getValues() { |
15 | return values; |
16 | } |
17 | } |
向actor发送消息是使用下列方法之一。
每个消息发送者分别保证本身的消息的次序.
注意:使用ask会形成性能影响,由于当超时是,一些事情须要保持追踪。这须要一些东西来将一个Promise链接进入ActorRef,而且须要经过远程链接可到达的。因此老是使用tell更偏向性能,除非必须才用ask.
在全部这些方法你能够传递本身的ActorRef。让它这样作,由于这将容许接收的角色才可以回复您的邮件,由于发件人引用随该信息一块儿发送的。
这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。
1 | // don’t forget to think about who is the sender (2nd argument) |
2 | target.tell(message, getSelf()); |
发送者引用是伴随着消息传递的,在接收角色可用范围内,当处理该消息时,经过getSender方法。在一个角色内部一般是getSelf,这应该为发送者,但也多是这种状况,回复被路由到一些其余角色即该父类的第二个参数tell将是不一样的一个。在角色外部,若是没有回复,第二个参数能够为null;若是在角色外部须要一个回复,你可使用问答模式描,下面描述..
ask 模式既包含actor也包含future, 因此它是做为一种使用模式,而不是ActorRef的方法:
1 | import static akka.pattern.Patterns.ask; |
2 | import static akka.pattern.Patterns.pipe; |
3 | import scala.concurrent.Future; |
4 | import scala.concurrent.duration.Duration; |
5 | import akka.dispatch.Futures; |
6 | import akka.dispatch.Mapper; |
7 | import akka.util.Timeout; |
01 | final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); |
02 |
03 | final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>(); |
04 | futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout |
05 | futures.add(ask(actorB, "another request", t)); // using timeout from |
06 | // above |
07 |
08 | final Future<Iterable<Object>> aggregate = Futures.sequence(futures, |
09 | system.dispatcher()); |
10 |
11 | final Future<Result> transformed = aggregate.map( |
12 | new Mapper<Iterable<Object>, Result>() { |
13 | public Result apply(Iterable<Object> coll) { |
14 | final Iterator<Object> it = coll.iterator(); |
15 | final String x = (String) it.next(); |
16 | final String s = (String) it.next(); |
17 | return new Result(x, s); |
18 | } |
19 | }, system.dispatcher()); |
20 |
21 | pipe(transformed, system.dispatcher()).to(actorC); |
上面的例子展现了将 ask与 future上的 pipe 模式一块儿使用,由于这是一种很是经常使用的组合。 请注意上面全部的调用都是彻底非阻塞和异步的: ask 产生 Future, 两个经过Futures.sequence和map方法组合成一个新的Future,而后用 pipe 在future上安装一个 onComplete-处理器来完成将收集到的 Result 发送到其它actor的动做。
使用 ask 将会像tell 同样发送消息给接收方, 接收方必须经过getSender().tell(reply, getSelf()) 发送回应来为返回的 Future 填充数据。ask 操做包括建立一个内部actor来处理回应,必须为这个内部actor指定一个超时期限,过了超时期限内部actor将被销毁以防止内存泄露。详见下面:
注意:若是要以异常来填充future你须要发送一个 Failure 消息给发送方。这个操做不会在actor处理消息发生异常时自动完成。
1 | try { |
2 | String result = operation(); |
3 | getSender().tell(result, getSelf()); |
4 | } catch (Exception e) { |
5 | getSender().tell(new akka.actor.Status.Failure(e), getSelf()); |
6 | throw e; |
7 | } |
若是一个actor 没有完成future , 它会在超时时限到来时过时, 明确做为一个参数传给ask方法,以 AskTimeoutException来完成Future。
关于如何等待或查询一个future,更多信息请见Futures 。
Future的onComplete, onResult, 或 onTimeout 方法能够用来注册一个回调,以便在Future完成时获得通知。从而提供一种避免阻塞的方法。
在使用future回调时,在角色内部你要当心避免关闭该角色的引用, 即不要在回调中调用该角色的方法或访问其可变状态。这会破坏角色的封装,会引用同步bugbug和race condition, 由于回调会与此角色一同被并发调度。 不幸的是目前尚未一种编译时的方法可以探测到这种非法访问。 参阅: 角色与共享可变状态
你能够将消息从一个角色转发给另外一个。虽然通过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能相似路由器、负载均衡器、备份等的角色时会颇有用。同时你须要传递你的上下文变量。
1 | target.forward(result, getContext()); |
当一个角色收到被传递到onReceive方法的消息,这是在须要被定义的UntypedActor基类的抽象方法。
下面是个例子:
01 | import akka.actor.UntypedActor; |
02 | import akka.event.Logging; |
03 | import akka.event.LoggingAdapter; |
04 |
05 | public class MyUntypedActor extends UntypedActor { |
06 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
07 |
08 | public void onReceive(Object message) throws Exception { |
09 | if (message instanceof String) { |
10 | log.info("Received String message: {}", message); |
11 | getSender().tell(message, getSelf()); |
12 | } else |
13 | unhandled(message); |
14 | } |
15 | } |
除了使用IF-instanceof检查,还有一种方法是使用Apache Commons MethodUtils调用指定的参数类型相匹配的消息类型方法。
若是你须要一个用来发送回应消息的目标,可使用 getSender, 它是一个角色引用。 你能够用 getSender().tell(replyMsg, getSelf())向这个引用发送回应消息。 你也能够将这个ActorRef保存起来未来再做回应。若是没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为 ‘死信’ 角色的引用。
1 | @Override |
2 | public void onReceive(Object msg) { |
3 | Object result = |
4 | // calculate result ... |
5 |
6 | // do not forget the second argument! |
7 | getSender().tell(result, getSelf()); |
8 | } |
在一个ReceiveTimeout消息发送触发以后,该UntypedActorContext setReceiveTimeout定义不活动超时时间。当指定时,接收功能应该可以处理一个akka.actor.ReceiveTimeout消息。 1毫秒为最小支持超时。
请注意,接受超时可能会触发和在另外一条消息是入队后,该ReceiveTimeout消息将重排队;所以,它不能保证在接收到接收超时的必定有预先经过该方法所配置的空闲时段。
一旦设置,接收超时保持有效(即持续重复触发超过不活动时间后)。传递Duration.Undefined关掉此功能。
01 | import akka.actor.ActorRef; |
02 | import akka.actor.ReceiveTimeout; |
03 | import akka.actor.UntypedActor; |
04 | import scala.concurrent.duration.Duration; |
05 |
06 | public class MyReceiveTimeoutUntypedActor extends UntypedActor { |
07 |
08 | public MyReceiveTimeoutUntypedActor() { |
09 | // To set an initial delay |
10 | getContext().setReceiveTimeout(Duration.create("30 seconds")); |
11 | } |
12 |
13 | public void onReceive(Object message) { |
14 | if (message.equals("Hello")) { |
15 | // To set in a response to a message |
16 | getContext().setReceiveTimeout(Duration.create("1 second")); |
17 | } else if (message instanceof ReceiveTimeout) { |
18 | // To turn it off |
19 | getContext().setReceiveTimeout(Duration.Undefined()); |
20 | } else { |
21 | unhandled(message); |
22 | } |
23 | } |
24 | } |
经过调用ActorRefFactory 即 ActorContext 或 ActorSystem 的 stop 方法来终止一个角色, 一般 context 用来终止子角色,而 system 用来终止顶级角色. 实际的终止操做是异步执行的, 即stop 可能在角色被终止以前返回。
若是当前有正在处理的消息,对该消息的处理将在actor被终止以前完成,可是邮箱中的后续消息将不会被处理。缺省状况下这些消息会被送到 ActorSystem 的 死信, 可是这取决于邮箱的实现。
角色的终止分两步: 第一步角色将中止对邮箱的处理,向全部子角色发送终止命令,而后处理来自子角色的终止消息直到全部的子角色都完成终止, 最后终止本身 (调用postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated , 通知其监管者). 这个过程保证角色系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。若是其中某个角色没有响应 (即因为处理消息用了太长时间以致于没有收到终止命令), 整个过程将会被阻塞。
在 ActorSystem.shutdown被调用时, 系统根监管角色会被终止,以上的过程将保证整个系统的正确终止。
postStop hook 是在角色被彻底终止之后调用的。这是为了清理资源:
1 | @Override |
2 | public void postStop() { |
3 | // clean up resources here ... |
4 | } |
注意:因为角色的终止是异步的, 你不能立刻使用你刚刚终止的子角色的名字;这会致使 InvalidActorNameException. 你应该 watch 正在终止的 介绍而在最终到达的 Terminated消息的处理中建立它的替代者。
你也能够向角色发送 akka.actor.PoisonPill 消息, 这个消息处理完成后角色会被终止。 PoisonPill 与普通消息同样被放进队列,所以会在已经入队列的其它消息以后被执行。
像下面使用:
1 | myActor.tell(akka.actor.PoisonPill.getInstance(), sender); |
若是你想等待终止过程的结束,或者组合若干actor的终止次序,可使用gracefulStop:
1 | import static akka.pattern.Patterns.gracefulStop; |
2 | import scala.concurrent.Await; |
3 | import scala.concurrent.Future; |
4 | import scala.concurrent.duration.Duration; |
5 | import akka.pattern.AskTimeoutException; |
1 | try { |
2 | Future<Boolean> stopped = |
3 | gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN); |
4 | Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); |
5 | // the actor has been stopped |
6 | } catch (AskTimeoutException e) { |
7 | // the actor wasn't stopped within 5 seconds |
8 | } |
01 | public class Manager extends UntypedActor { |
02 |
03 | public static final String SHUTDOWN = "shutdown"; |
04 |
05 | ActorRef worker = getContext().watch(getContext().actorOf( |
06 | Props.create(Cruncher.class), "worker")); |
07 |
08 | public void onReceive(Object message) { |
09 | if (message.equals("job")) { |
10 | worker.tell("crunch", getSelf()); |
11 | } else if (message.equals(SHUTDOWN)) { |
12 | worker.tell(PoisonPill.getInstance(), getSelf()); |
13 | getContext().become(shuttingDown); |
14 | } |
15 | } |
16 |
17 | Procedure<Object> shuttingDown = new Procedure<Object>() { |
18 | @Override |
19 | public void apply(Object message) { |
20 | if (message.equals("job")) { |
21 | getSender().tell("service unavailable, shutting down", getSelf()); |
22 | } else if (message instanceof Terminated) { |
23 | getContext().stop(getSelf()); |
24 | } |
25 | } |
26 | }; |
27 | } |
当gracefulStop()成功返回时,角色的postStop()钩子将被执行:存在一个状况,happens-before 边缘在postStop()结尾和gracefulStop()返回之间。
在上面的例子中一个自定义的Manager.SHUTDOWN消息被发送到目标角色为了初始化正在终止角色的处理。您可使用PoisonPill为这一点,但在阻止目标的角色以前,你拥有不多机会与其余角色进行交互。简单的清除任务能够在postStop中处理。
注意:请记住,一个角色终止和它的名字被注销是互相异步发生的独立事件。所以,在gracefulStop()后返回,它多是你会发现名称仍然在使用。为了保证正确的注销,只能重复使用来自你控制监管者内与一个终止的消息的回应的名称,即不属于顶级的角色。
Akka支持在运行时对角色消息循环 (例如它的的实现)进行实时替换: 在角色中调用getContext.become 方法。 热替换的代码被存在一个栈中,能够被pushed(replacing 或 adding 在顶部)和popped。
注意:请注意角色被其监管者重启后将恢复其最初的行为。
热替换角色使用getContext().become:
1 | import akka.japi.Procedure; |
01 | public class HotSwapActor extends UntypedActor { |
02 |
03 | Procedure<Object> angry = new Procedure<Object>() { |
04 | @Override |
05 | public void apply(Object message) { |
06 | if (message.equals("bar")) { |
07 | getSender().tell("I am already angry?", getSelf()); |
08 | } else if (message.equals("foo")) { |
09 | getContext().become(happy); |
10 | } |
11 | } |
12 | }; |
13 |
14 | Procedure<Object> happy = new Procedure<Object>() { |
15 | @Override |
16 | public void apply(Object message) { |
17 | if (message.equals("bar")) { |
18 | getSender().tell("I am already happy :-)", getSelf()); |
19 | } else if (message.equals("foo")) { |
20 | getContext().become(angry); |
21 | } |
22 | } |
23 | }; |
24 |
25 | public void onReceive(Object message) { |
26 | if (message.equals("bar")) { |
27 | getContext().become(angry); |
28 | } else if (message.equals("foo")) { |
29 | getContext().become(happy); |
30 | } else { |
31 | unhandled(message); |
32 | } |
33 | } |
34 | } |
become 方法还有不少其它的用处,一个特别好的例子是用它来实现一个有限状态机(FSM)。这将代替当前行为(即行为栈顶部),这意味着你不用使用unbecome,而是下一个行为将明确被安装。
使用become另外一个方式:不代替而是添加到行为栈顶部。这种状况是必需要保证在长期运行中“pop”操做(即unbecome)数目匹配“push”数目,不然这个数目将致使内存泄露(这就是该行为不是默认缘由)。
01 | public class UntypedActorSwapper { |
02 |
03 | public static class Swap { |
04 | public static Swap SWAP = new Swap(); |
05 |
06 | private Swap() { |
07 | } |
08 | } |
09 |
10 | public static class Swapper extends UntypedActor { |
11 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
12 |
13 | public void onReceive(Object message) { |
14 | if (message == SWAP) { |
15 | log.info("Hi"); |
16 | getContext().become(new Procedure<Object>() { |
17 | @Override |
18 | public void apply(Object message) { |
19 | log.info("Ho"); |
20 | getContext().unbecome(); // resets the latest 'become' |
21 | } |
22 | }, false); // this signals stacking of the new behavior |
23 | } else { |
24 | unhandled(message); |
25 | } |
26 | } |
27 | } |
28 |
29 | public static void main(String... args) { |
30 | ActorSystem system = ActorSystem.create("MySystem"); |
31 | ActorRef swap = system.actorOf(Props.create(Swapper.class)); |
32 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
33 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
34 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
35 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
36 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
37 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
38 | } |
39 |
40 | } |
该UntypedActorWithStash类使一个角色临时藏匿不能或不该该使用角色的当前行为处理的消息。在改变角色的消息处理函数,即调用getContext().become()或getContext().unbecome(),全部藏匿的消息能够“unstashed”,所以前面加上他们角色的邮箱。这样一来,藏消息能够在他们已经收到原先相同的顺序进行处理。扩展UntypedActorWithStash角色将自动得到一个双端队列为基础的邮箱。
注意:抽象类UntypedActorWithStash实现标记接口RequiresMessageQueue这要求系统可以为该角色自动选择基于双端队列的邮箱实现。若是你想更多的控制权邮箱,请见邮箱文档:邮箱
。
这里是UntypedActorWithStash类中操做的示例:
1 | import akka.actor.UntypedActorWithStash; |
01 | public class ActorWithProtocol extends UntypedActorWithStash { |
02 | public void onReceive(Object msg) { |
03 | if (msg.equals("open")) { |
04 | unstashAll(); |
05 | getContext().become(new Procedure<Object>() { |
06 | public void apply(Object msg) throws Exception { |
07 | if (msg.equals("write")) { |
08 | // do writing... |
09 | } else if (msg.equals("close")) { |
10 | unstashAll(); |
11 | getContext().unbecome(); |
12 | } else { |
13 | stash(); |
14 | } |
15 | } |
16 | }, false); // add behavior on top instead of replacing |
17 | } else { |
18 | stash(); |
19 | } |
20 | } |
21 | } |
调用stash()将当前的消息(即角色最后收到的消息)到角色的藏匿处。当在处理默认状况下在角色的消息处理函数来隐藏那些没有被其余案件处理的状况时,这是典型调用。同一消息的两次是非法藏匿;这样作会致使一个IllegalStateException被抛出。藏匿也能够此状况下,调用stath()能会致使容量违规,这致使StashOverflowException。藏匿的容量可使用邮箱的配置的藏匿容量设置(一个Int类型)进行配置。
调用unstashAll()从藏匿到角色的邮箱进入队列消息,直到信箱(若是有的话)已经达到的能力(请注意,从藏匿处的消息前加上邮箱)。若是一个有界的邮箱溢出,一个MessageQueueAppendFailedException被抛出。在调用unstashAll()后,藏匿保证为空。
藏匿由scala.collection.immutable.Vector支持。这样一来,即便是很是大量的消息在不会对性能产生重大影响下被藏匿。
注意,藏匿是短暂的角色状态的一部分,该邮箱不像。所以,应该像具备相同属性的角色状态的其余部分进行管理。该preRestart的UntypedActorWithStash的实现将调用unstashAll(),它一般是所指望的行为。
注意:若是要强制执行,你的角色只能用一个无上限stash进行工做,那么你应该使用UntypedActorWithUnboundedStash类代替。
您能够经过发送一个
kill消息杀一个角色。这将致使角色抛出一个
ActorKilledException,引起故障。角色将暂停运做,其监管这将被要求如何处理失败,这可能意味着恢复的角色,从新启动,或彻底终止它。请见 监管手段以获取更多信息。
使用Kill像下面:
1 | victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender()); |
在消息被actor处理的过程当中可能会抛出异常,例如数据库异常。
若是消息处理过程当中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。因此若是你但愿重试对消息的处理,你须要本身抓住异常而后在异常处理流程中重试. 请确保你限制重试的次数,由于你不会但愿系统产生活锁 (从而消耗大量CPU而于事无补)。另外一种可能性请查看PeekMailbox pattern
若是消息处理过程当中发生异常,邮箱没有任何变化。若是actor被重启,邮箱会被保留。邮箱中的全部消息不会丢失。
若是角色内代码抛出了异常,那么角色将被暂停,接着监管者处理开始(见监管与监控)。依赖监管者决策角色将被恢复(就像什么事情没发生),重启(擦除内部状态从新开始)或终止。
角色钩子的丰富的生命周期提供了实现各类初始化模式的有用工具。在一个ActorRef的生命周期,一个角色可能会经历屡次从新启动后,当旧的实例替换为新的,对外面观察这是不可见的,仅仅看见ActorRef。
有人可能会想到“化身”的新实例。初始化可能须要一个角色的每个化身,但有时人们须要初始化仅发生在第一个实例诞生时,当ActorRef被建立。如下各节提供的模式为不一样的初始化需求。
使用构造函数初始化有着各类好处。首先,它使得有可能使用的val字段来存储任何状态,这并不在角色实例的生命周期中变化,使得角色实现更加健壮。该构造函数被角色的每个化身调用,因此角色的内部老是能够认为正确的初始化发生。这也是这种方法的缺点,由于有当一我的想避免在从新启动时从新初始化的内部状况。例如,在重启过程,保持整个子角色一般是有用。如下部分提供了这种状况下的模式。
在的第一个实例的初始化过程当中,一个角色的preStart()方法仅仅被直接调用一次,那就是,在ActorRef的建立。在从新启动的状况下,preStart()从postRestart()被调用,所以,若是不重写,preStart()被每个化身调用。然而,覆盖postRestart(),能够禁用此行为,并确保只调用一次preStart()。
这种模式的一个有用的用法是在从新启动时禁止建立子类新的ActorRef。这能够经过覆盖preRestart()来实现:
01 | @Override |
02 | public void preStart() { |
03 | // Initialize children here |
04 | } |
05 |
06 | // Overriding postRestart to disable the call to preStart() |
07 | // after restarts |
08 | @Override |
09 | public void postRestart(Throwable reason) { |
10 | } |
11 |
12 | // The default implementation of preRestart() stops all the children |
13 | // of the actor. To opt-out from stopping the children, we |
14 | // have to override preRestart() |
15 | @Override |
16 | public void preRestart(Throwable reason, Option<Object> message) |
17 | throws Exception { |
18 | // Keep the call to postStop(), but no stopping of children |
19 | postStop(); |
20 | } |
请注意,该子角色还在从新启动,但不会建立新的ActorRef。对子类能够递归应用相同的原则,确保他们的preStart()方法被只在建立本身的引用时调用。
了解更多信息,请参阅What Restarting Means:
有这样的状况,在构造函数中,当它是不可能传递所需的全部角色初始化的信息,例如,在存在循环的依赖关系。在这种状况下,角色应该听一个初始化消息,并利用become()或有限状态机的状态对角色的初始化和未初始化的状态进行编码。
01 | private String initializeMe = null; |
02 |
03 | @Override |
04 | public void onReceive(Object message) throws Exception { |
05 | if (message.equals("init")) { |
06 | initializeMe = "Up and running"; |
07 | getContext().become(new Procedure<Object>() { |
08 | @Override |
09 | public void apply(Object message) throws Exception { |
10 | if (message.equals("U OK?")) |
11 | getSender().tell(initializeMe, getSelf()); |
12 | } |
13 | }); |
14 | } |
15 | } |
若是在初始化以前,角色能够接收消息,一个有用的工具多是Stash,能够保存消息直到初始化完成,在角色初始化以后从新放回。
注意:这个模式应当心使用,而且当上述的模式都不适用才使用。其中一个潜在的问题是,消息可能会在发送给远程角色丢失。另外,在未初始化状态发布一个ActorRef可能致使在其接收用户信息的初始化以前已经完成。