Akka-2.5.12学习系列(第一个Actor)

这里咱们建立第一个 Actor,名字叫 MyWork。web

props()

静态方法 props() 用来构造这个 Actor 的实例,官方推荐每一个 Actor 都配备一个 props() 的方法。并发

消息

接下来 Msg 中定义了这个 Actor 能够接收的消息。这里用 enum 变量来传递,也能够直接传递 String 或者其余类的对象。用 equals() 和 instanceof 对 msg 进行判断和转换就能够了。异步

生命周期

重写的 preStart(),postStop(),preRestart(),postRestart() 方法会在 Actor 生命周期的不一样阶段调用,在其余文章里会介绍。分布式

onReceive(Object msg)

onReceive(Object msg)方法是 Actor 的核心逻辑,Actor 接收到的消息都放在 MailBox 中,从这个 MailBox 中取出来的一条一条消息就经过这个方法进行处理。能够对不一样消息定义不一样的动做。ide

示例代码:

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import scala.Option;

public class MyWork extends UntypedAbstractActor {

    LoggingAdapter logger = Logging.getLogger(getContext().system(), this);

    static public Props props() {
        return Props.create(MyWork.class, () -> new MyWork()); } public static enum Msg { WORKING, EXCEPTION, STOP, RESTART, RESUME, BACK, SLEEP } @Override public void preStart() { System.out.println("MyWork preStart uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void postStop() { System.out.println("MyWork stopped uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void preRestart(Throwable reason, Option<Object> message) throws Exception { System.out.println("MyWork preRestart uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void postRestart(Throwable reason) throws Exception { System.out.println("MyWork postRestart uid=" + getSelf().path().uid() + ", path=" + getSelf().path() + ", object hash=" + this.hashCode()); } @Override public void onReceive(Object msg) throws Exception { if (msg == Msg.WORKING) { logger.info("I am working"); } else if (msg == Msg.EXCEPTION) { throw new Exception("I failed!"); } else if(msg == Msg.RESTART){ logger.info("I will be restarted"); throw new NullPointerException(); } else if (msg == Msg.RESUME) { logger.info("I will be resume"); throw new ArithmeticException(); }else if (msg == Msg.STOP) { logger.info("I am stopped"); getContext().stop(getSelf()); }else if (msg == Msg.BACK) { getSender().tell("I am alive", getSelf()); } else if(msg == Msg.SLEEP) { logger.info("I am going to sleep"); Thread.sleep(3000); getSender().tell("I am awake", getSelf()); } else { unhandled(msg); } } } 

主函数的功能:启动这个Actor并发送消息。svg

最后的输入一个回车结束程序的意义:
以前把输入回车去掉了直接system.terminate()感受很清爽,可是因为消息处理都是异步的,有可能还没处理完消息系统就关掉了,致使输出的日志对不上预期效果,对一些系统运行轨迹研究了好久不得其解,最后发现是system.terminate()的问题,手动回车中止系统就行了。函数

public static void main(String[] args) throws IOException {
        //初始化 ActorSystem
        final ActorSystem system = ActorSystem.create("MySystem");

        //建立第一个 Actor
        final ActorRef firstActor =
                system.actorOf(MyWork.props(), "firstActor");

        System.out.println(firstActor.path());

        //向第一个 Actor 发送消息
        firstActor.tell(Msg.WORKING, ActorRef.noSender());

        System.out.println(">>> Press ENTER to exit <<<");
        try {
            System.in.read();
        } finally {
            system.terminate();
        }
    }

顺便给上pom依赖,没有都用到post

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.5.12</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-agent_2.12</artifactId>
    <version>2.5.12</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster_2.12</artifactId>
    <version>2.5.12</version>
</dependency>

你们有兴趣的能够关注个人公众号(分布式系统斗者),涉及分布式系统、大数据和我的成长分享,欢迎你们一块儿交流进步大数据

这里写图片描述