这里咱们建立第一个 Actor,名字叫 MyWork。web
静态方法 props() 用来构造这个 Actor 的实例,官方推荐每一个 Actor 都配备一个 props() 的方法。并发
接下来 Msg 中定义了这个 Actor 能够接收的消息。这里用 enum 变量来传递,也能够直接传递 String 或者其余类的对象。用 equals() 和 instanceof 对 msg 进行判断和转换就能够了。异步
重写的 preStart(),postStop(),preRestart(),postRestart() 方法会在 Actor 生命周期的不一样阶段调用,在其余文章里会介绍。分布式
onReceive(Object msg)方法是 Actor 的核心逻辑,Actor 接收到的消息都放在 MailBox 中,从这个 MailBox 中取出来的一条一条消息就经过这个方法进行处理。能够对不一样消息定义不一样的动做。ide
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import scala.Option;
public class MyWork extends UntypedAbstractActor {
LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
static public Props props() {
return Props.create(MyWork.class, () -> new MyWork()); } public static enum Msg { WORKING, EXCEPTION, STOP, RESTART, RESUME, BACK, SLEEP } @Override public void preStart() { System.out.println("MyWork preStart uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void postStop() { System.out.println("MyWork stopped uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void preRestart(Throwable reason, Option<Object> message) throws Exception { System.out.println("MyWork preRestart uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void postRestart(Throwable reason) throws Exception { System.out.println("MyWork postRestart uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void onReceive(Object msg) throws Exception { if (msg == Msg.WORKING) { logger.info("I am working"); } else if (msg == Msg.EXCEPTION) { throw new Exception("I failed!"); } else if(msg == Msg.RESTART){ logger.info("I will be restarted"); throw new NullPointerException(); } else if (msg == Msg.RESUME) { logger.info("I will be resume"); throw new ArithmeticException(); }else if (msg == Msg.STOP) { logger.info("I am stopped"); getContext().stop(getSelf()); }else if (msg == Msg.BACK) { getSender().tell("I am alive", getSelf()); } else if(msg == Msg.SLEEP) { logger.info("I am going to sleep"); Thread.sleep(3000); getSender().tell("I am awake", getSelf()); } else { unhandled(msg); } } }
主函数的功能:启动这个Actor并发送消息。svg
最后的输入一个回车结束程序的意义:
以前把输入回车去掉了直接system.terminate()感受很清爽,可是因为消息处理都是异步的,有可能还没处理完消息系统就关掉了,致使输出的日志对不上预期效果,对一些系统运行轨迹研究了好久不得其解,最后发现是system.terminate()的问题,手动回车中止系统就行了。函数
public static void main(String[] args) throws IOException {
//初始化 ActorSystem
final ActorSystem system = ActorSystem.create("MySystem");
//建立第一个 Actor
final ActorRef firstActor =
system.actorOf(MyWork.props(), "firstActor");
System.out.println(firstActor.path());
//向第一个 Actor 发送消息
firstActor.tell(Msg.WORKING, ActorRef.noSender());
System.out.println(">>> Press ENTER to exit <<<");
try {
System.in.read();
} finally {
system.terminate();
}
}
顺便给上pom依赖,没有都用到post
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.12</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-agent_2.12</artifactId>
<version>2.5.12</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.12</artifactId>
<version>2.5.12</version>
</dependency>
你们有兴趣的能够关注个人公众号(分布式系统斗者),涉及分布式系统、大数据和我的成长分享,欢迎你们一块儿交流进步大数据