Akka和μJavaActorsμJavaActors均是java的Actor库,其中Akka提供了叫为完整的Actor开发框架,较为庞大,学习成本很高,μJavaActors 是一个轻量级Actor库,大约仅有1200行代码,比较适合入门。java
Akka是一个至关成熟、强大的库,github上download下的是Akka的源码,应该使用sbt构建的工程,若是没有使用sbt经验,想导出jar还挺不容易的,推荐Akka官网下载akka各个组件的jar去使用,简单介绍一下helloworld 级别Akka的demo。git
1.Akka的主要组件github
akka-actor.jar : Actor核心组件了,定义了Acotr核心类正则表达式
akka-slf4f.jar : SLF4F Logger的支持,一个打log的组件,不用太关注
redis
akka-remote.jar : Actor作远程调用的jar,相似RFC吧
数组
akka-cluster : actor作集群管理组件
框架
akka-camel.jar : 对Apache Camel 集成接口
eclipse
scala-library-2.11.8.jar : akka核心应该是Scala写的,这个组件就是对akka的核心支持ide
Akka还有不少组件,不过对于hello world级的程序简单了解几个就ok了。工程是基于eclipse的,须要包含下面几个基础的组件:学习
编写两个Actor:
package demo02; import akka.actor.UntypedActor; /* * UntypedAcotr是无类型Actor的一个抽象类,继承与核心类Actor */ public class Greeter extends UntypedActor { public static enum Msg{ GREET , DONE; } /** * 每一个Actor必须实现OnReceive,当该Actor收到消息调用该方法 */ @Override public void onReceive(Object msg) throws Throwable { if(msg == Msg.GREET){ System.out.println("Hello world"); /** * 这里吐槽一下Akka对于发消息的设计,发送消息的设计居然是: * receiver.tell(msg , sender) * 也许没理解akka设计的理念,可是正常人设计不该该是: * sender.tell(msg , receiver) * 汗…… */ getSender().tell(Msg.DONE, getSelf()); }else{ unhandled(msg); } } }
package demo02; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; public class HelloWorld extends UntypedActor { @Override public void preStart(){ final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class)); greeter.tell(Greeter.Msg.GREET, getSelf()); } @Override public void onReceive(Object msg) throws Throwable { if(msg == Greeter.Msg.DONE){ getContext().stop(getSelf()); }else{ unhandled(msg); } } }
下面是Main方法:
package demo02; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; public class Main { public static void main(String[] args) { //ActorSystem 至关于ActorManager,管理各类Acor调度、线程管理等 ActorSystem system = ActorSystem.create("hello"); //建立一个HelloWorld 类型的Actor,在Actor启动前,会调preStart(),此时会想Greeter发消息 ActorRef actor = system.actorOf(Props.create(HelloWorld.class)); //添加结束终结Actor,当ActorSystem调Stop时,会向每一个Actor发送Terminated消息 system.actorOf(Props.create(Terminator.class, actor), "terminator"); } public static class Terminator extends UntypedActor{ private final LoggingAdapter log = Logging.getLogger(getContext().system(),this); private ActorRef actorRef = null; public Terminator(ActorRef ref){ System.out.println("Terminator Init !!!"); actorRef = ref; getContext().watch(actorRef); } @Override public void onReceive(Object msg) throws Throwable { if (msg instanceof Terminated) { log.info("{} has terminated, shutting down system", actorRef.path()); getContext().system().terminate(); } else { unhandled(msg); } } } }
上面代码在akka的源码中sample均可以找到的,从上面看Akka对消息的识别是根据类型处理的,在我这种菜鸟看来,并非很合适,当我消息类型较多时,消息类岂不是要爆炸,固然也能够作分级Actor,再加一层转发层解决这个问题哈
μJavaActors 是一个十分轻量级的Actor库,实现核心的Actor调度,不涉及复杂的框架,简单分析一下它的源码吧
1.Actor核心接口
Actor:定义了一个标准的Actor应该具备行为
ActorManager:Actor管理器接口,提供线程管理,Actor调度等
Messager : Actor相互间传递传递的消息接口,固然附带的接口还有MessageEvent和MessageListener
简单引用做者对这个概念的描述:
Actor 是一个执行单元,一次处理一条消息。Actor
具备如下关键行为或特征:
name
,该名称在每一个 ActorManager
中必须是唯一的。 category
;类别是一种向一组 actor 中的一个成员发送消息的方式。一个 actor 一次只能属于一个类别。 ActorManager
能够提供一个执行 actor 的线程,系统就会调用 receive()
。为了保持最高效率,actor 应该迅速处理消息,而不要进入漫长的等待状态(好比等待人为输入)。 willReceive()
容许 actor 过滤潜在的消息主题。 peek()
容许该 actor 和其余 actor 查看是否存在挂起的消息(或许是为了选择主题)。 remove()
容许该 actor 和其余 actor 删除或取消任何还没有处理的消息。 getMessageCount()
容许该 actor 和其余 actor 获取挂起的消息数量。 getMaxMessageCount()
容许 actor 限制支持的挂起消息数量;此方法可用于预防不受控制地发送。大部分程序都有许多 actor,这些 actor 经常具备不一样的类型。actor 可在程序启动时建立或在程序执行时建立(和销毁)。本文中的 actor 包 包含一个名为 AbstractActor
的抽象类,actor 实现基于该类。
ActorManager 是一个 actor 管理器。它负责向 actor 分配线程(进而分配处理器)来处理消息。ActorManager
拥有如下关键行为或特征:
createActor()
建立一个 actor 并将它与此管理器相关联。 startActor()
启动一个 actor。 detachActor()
中止一个 actor 并将它与此管理器断开。 send()/broadcast()
将一条消息发送给一个 actor、一组 actor、一个类别中的任何 actor 或全部 actor。在大部分程序中,只有一个 ActorManager
,但若是您但愿管理多个线程和/或 actor 池,也能够有多个 ActorManager
。此接口的默认实现是 DefaultActorManager
。
消息 是在 actor 之间发送的消息。Message
是 3 个(可选的)值和一些行为的容器:
source
是发送 actor。 subject
是定义消息含义的字符串(也称为命令)。 data
是消息的任何参数数据;一般是一个映射、列表或数组。参数能够是要处理和/或其余 actor 要与之交互的数据。 subjectMatches()
检查消息主题是否与字符串或正则表达式匹配。μJavaActors 包的默认消息类是 DefaultMessage
。
ActorManager其实只要简单浏览一下μJavaActors源码就能够理解Actor设计思路啦,主要分析一下ActorManager中的Actor调度源码:
public class ActorRunnable implements Runnable { public boolean hasThread; public AbstractActor actor; public void run() { // logger.trace("procesNextActor starting"); int delay = 1; while (running) { try { if (!procesNextActor()) { // logger.trace("procesNextActor waiting on actor"); // sleep(delay * 1000); synchronized (actors) { // TOOD: adjust this delay; possible parameter // we want to minizmize overhead (make bigger); // but it has a big impact on message processing // rate (makesmaller) // actors.wait(delay * 1000); actors.wait(100); } delay = Math.max(5, delay + 1); } else { delay = 1; } } catch (InterruptedException e) { } catch (Exception e) { logger.error("procesNextActor exception", e); } } // logger.trace("procesNextActor ended"); } protected boolean procesNextActor() { boolean run = false, wait = false, res = false; actor = null; synchronized (actors) { for (String key : runnables.keySet()) { actor = runnables.remove(key); break; } } if (actor != null) { // first run never started run = true; actor.setHasThread(true); hasThread = true; try { actor.run(); } finally { actor.setHasThread(false); hasThread = false; } } else { synchronized (actors) { for (String key : waiters.keySet()) { actor = waiters.remove(key); break; } } if (actor != null) { // then waiting for responses wait = true; actor.setHasThread(true); hasThread = true; try { res = actor.receive(); if (res) { incDispatchCount(); } } finally { actor.setHasThread(false); hasThread = false; } } } // if (!(!run && wait && !res) && a != null) { // logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a); // } return run || res; } }