使用Akka的远程调用

概述

  正如其它RPC或者RMI框架那样,Akka也提供了远程调用的能力。服务端在监听的端口上接收客户端的调用。本文将在《Spring与Akka的集成》一文的基础上介绍Akka的remote调用,本文不少代码和例子来源于Akka官网的代码示例,也包含了一些适用于Spring集成的改造,本文旨在介绍Akka的远程调用的开发过程。html

服务端开发java

配置

  Akka的默认配置文件为application.conf,若是不特别指明,Akka System都会默认加载此配置。若是你想自定义符合你习惯的名字,可使用以下代码:spring

final ActorSystem system = ActorSystem.create("YourSystem", ConfigFactory.load("yourconf"));  

上述代码中的yourconf不包含文件后缀名,在你的资源路径下实际是yourconf.conf。编程

  我不太想自定义加载的配置文件,而是继续使用application.conf,这里先列出其配置:json

include "common"  
  
akka {  
  # LISTEN on tcp port 2552  
  remote.netty.tcp.port = 2552  
}  

这里的remote.netty.tcp.port配置属性表示使用Netty框架在TCP层的监听端口是2552。include与java里的import或者jsp页面中的include标签的做用相似,表示引用其余配置文件中的配置。因为common.conf中包含了Akka的一些公共配置,因此能够这样引用,common.conf的配置以下:服务器

akka {  
  
  actor {  
    provider = "akka.remote.RemoteActorRefProvider"  
  }  
  
  remote {  
    netty.tcp {  
      hostname = "127.0.0.1"  
    }  
  }  
  
}  

common配置中的provider属性表示Actor的引用提供者是akka.remote.RemoteActorRefProvider,即远程ActorRef的提供者。这里的hostname属性表示服务器的主机名。从common配置咱们还能够看出Akka的配置有点相似于json,也是一种嵌套结构。此外,Akka还能够采用一种扁平的配置方式,例如:架构

akka.actor.provider = "..."  
akka.remote.netty.tcp.hostname = "127.0.0.1"  

它们所表明的做用是同样的。至于选择扁平仍是嵌套的,一方面依据你的我的习惯,一方面依据配置的多寡——随着配置项的增多,你会发现嵌套会让你的配置文件更加清晰。app

服务端

  因为官网的例子比较简洁并能说明问题,因此本文对Akka官网的例子进行了一些改造来介绍服务端与客户端之间的远程调用。服务端的配置已在上一小节列出,本小节着重介绍服务端的实现。框架

咱们的服务端是一个简单的提供基本的加、减、乘、除的服务的CalculatorActor,这些运算都直接封装在CalculatorActor的实现中(在实际的业务场景中,Actor应该只接收、回复及调用具体的业务接口,这里的加减乘除运算应当由指定的Service接口实现,特别是在J2EE或者与Spring集成后),CalculatorActor的实现见代码清单1。dom

代码清单1

@Named("CalculatorActor")  
@Scope("prototype")  
public class CalculatorActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(CalculatorActor.class);  
  
    @Override  
    public void onReceive(Object message) {  
  
        if (message instanceof Op.Add) {  
            Op.Add add = (Op.Add) message;  
            logger.info("Calculating " + add.getN1() + " + " + add.getN2());  
            Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Subtract) {  
            Op.Subtract subtract = (Op.Subtract) message;  
            logger.info("Calculating " + subtract.getN1() + " - " + subtract.getN2());  
            Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(), subtract.getN2(),  
                    subtract.getN1() - subtract.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Multiply) {  
            Op.Multiply multiply = (Op.Multiply) message;  
            logger.info("Calculating " + multiply.getN1() + " * " + multiply.getN2());  
            Op.MultiplicationResult result = new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(),  
                    multiply.getN1() * multiply.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Divide) {  
            Op.Divide divide = (Op.Divide) message;  
            logger.info("Calculating " + divide.getN1() + " / " + divide.getN2());  
            Op.DivisionResult result = new Op.DivisionResult(divide.getN1(), divide.getN2(),  
                    divide.getN1() / divide.getN2());  
            getSender().tell(result, getSelf());  
  
        } else {  
            unhandled(message);  
        }  
    }  
}  

Add、Subtract、Multiply、Divide都继承自MathOp,这里只列出MathOp和Add的实现,见代码清单2所示。

代码清单2

public interface MathOp extends Serializable {  
}  
  
public static class Add implements MathOp {  
    private static final long serialVersionUID = 1L;  
    private final int n1;  
    private final int n2;  
  
    public Add(int n1, int n2) {  
        this.n1 = n1;  
        this.n2 = n2;  
    }  
  
    public int getN1() {  
        return n1;  
    }  
  
    public int getN2() {  
        return n2;  
    }  
} 

服务端应当启动CalculatorActor实例,以提供服务,代码以下:

logger.info("Start calculator");  
final ActorRef calculator = actorSystem.actorOf(springExt.props("CalculatorActor"), "calculator");  
actorMap.put("calculator", calculator);  
logger.info("Started calculator"); 

客户端

  客户端调用远程CalculatorActor提供的服务后,还要接收其回复信息,所以也须要监听端口。客户端和服务端若是在同一台机器节点上,那么客户端的监听端口不能与服务端冲突,我给出的配置示例以下:

include "common"  
  
akka {  
  remote.netty.tcp.port = 2553  
}  

客户端经过远程Actor的路径得到ActorSelection,而后向远程的Akka System获取远程CalculatorActor的ActorRef,进而经过此引用使用远端CalculatorActor提供的服务。在详细的说明实现细节以前,先来看看LookupActor的实现,见代码清单3所示。

代码清单3

@Named("LookupActor")  
@Scope("prototype")  
public class LookupActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(LookupActor.class);  
  
    private final String path;  
    private ActorRef calculator = null;  
  
    public LookupActor(String path) {  
        this.path = path;  
        sendIdentifyRequest();  
    }  
  
    private void sendIdentifyRequest() {  
        getContext().actorSelection(path).tell(new Identify(path), getSelf());  
        getContext().system().scheduler().scheduleOnce(Duration.create(3, SECONDS), getSelf(),  
                ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf());  
    }  
  
    @Override  
    public void onReceive(Object message) throws Exception {  
        if (message instanceof ActorIdentity) {  
            calculator = ((ActorIdentity) message).getRef();  
            if (calculator == null) {  
                logger.info("Remote actor not available: " + path);  
            } else {  
                getContext().watch(calculator);  
                getContext().become(active, true);  
            }  
  
        } else if (message instanceof ReceiveTimeout) {  
            sendIdentifyRequest();  
  
        } else {  
            logger.info("Not ready yet");  
  
        }  
    }  
  
    Procedure<Object> active = new Procedure<Object>() {  
        @Override  
        public void apply(Object message) {  
            if (message instanceof Op.MathOp) {  
                // send message to server actor  
                calculator.tell(message, getSelf());  
  
            } else if (message instanceof Op.AddResult) {  
                Op.AddResult result = (Op.AddResult) message;  
                logger.info(String.format("Add result: %d + %d = %d\n", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
  
            } else if (message instanceof Op.SubtractResult) {  
                Op.SubtractResult result = (Op.SubtractResult) message;  
                logger.info(String.format("Sub result: %d - %d = %d\n", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
                  
            } else if (message instanceof Terminated) {  
                logger.info("Calculator terminated");  
                sendIdentifyRequest();  
                getContext().unbecome();  
  
            } else if (message instanceof ReceiveTimeout) {  
                // ignore  
  
            } else {  
                unhandled(message);  
            }  
  
        }  
    };  
}

LookupActor的构造器须要传递远端CalculatorActor的路径,而且调用了sendIdentifyRequest方法,sendIdentifyRequest的做用有两个:

  1. 经过向ActorSelection向远端的Akka System发送Identify消息,并获取远程CalculatorActor的ActorRef;
  2. 启动定时调度,3秒后向CalculatorActor的执行上下文发送ReceiveTimeout消息,而LookupActor处理ReceiveTimeout消息时,再次调用了sendIdentifyRequest方法。
为什么要循环调用sendIdentifyRequest方法呢?因为远端服务有可能由于进程奔溃、系统重启等缘由致使已经得到的ActorRef过时或失效,所以须要一个监测机制。sendIdentifyRequest的循环调用就是一个简单的检测机制。
远端的Akka System在接收到Identify消息后,会给LookupActor回复ActorIdentity消息,LookupActor收到ActorIdentity消息后即可以解析出消息中载有的CalculatorActor的ActorRef,LookupActor而后调用getContext().watch(calculator)实现对子Actor的监管,一旦CalculatorActor重启或终止,LookupActor即可以接收到Terminated消息(有关Actor的监管机制,能够阅读官方文档)。
因为LookupActor的onReceive没法处理加、减、乘、除及Terminated消息,因此这里用到了一个Akka Actor的状态转换,经过使用getContext().become(active, true)。这里的active是一个内部类,其继承了Procedure并重写了apply方法,其中封装了对于对于加减乘除的计算以及结果、Terminated消息的处理。经过getContext().become(active, true),使得active接替onReceive方法处理接收到的消息。正如Akka官网所述——Actor的这一特性很是适合于开发实现FSM(有限状态自动机)。
active的功能主要分为三类:
  • 若是收到MathOp的消息,说明是加减乘除的消息,则将消息进一步告知远端的CalculatorActor并由其进行处理;
  • 若是收到AddResult或者SubtractResult,这说明CalculatorActor已经处理完了加或者减的处理,并回复了处理结果,所以对计算结果进行使用(本例只是简单的打印);
  • 若是收到了Terminated消息,说明远端的CalculatorActor中止或者重启了,所以须要从新调用sendIdentifyRequest获取最新的CalculatorActor的ActorRef。最后还须要取消active,恢复为默认接收消息的状态;
启动客户端的代码示例以下:
logger.info("start lookup");  
final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2552/user/calculator";  
final ActorRef lookup = actorSystem.actorOf(springExt.props("LookupActor", path), "lookup");  
final Random r = new Random();  
actorSystem.scheduler().schedule(Duration.create(1, SECONDS), Duration.create(1, SECONDS), new Runnable() {  
  
    @Override  
    public void run() {  
        if (r.nextInt(100) % 2 == 0) {  
            lookup.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null);  
        } else {  
            lookup.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null);  
        }  
  
    }  
}, actorSystem.dispatcher());   
这里的客户端示例以1秒的周期,向LookupActor随机发送Add或者Subtract的消息。

Actor远端调用模型

  不管是本地Actor仍是远端Actor,Actor之因此可以接收消息,是由于每一个Actor都有其自身的邮箱,你能够定制本身的邮箱(能够用java中的各类队列)。本地应用若是想要调用远端的Actor服务并接收返回信息也就必须拥有本身的邮箱,不然邮递员投递信件时因为没法找到你家的邮箱,可能会打回邮件、放在你家的门缝下甚至丢弃。所以Actor的调用不管是本地的仍是远端的都最好遵照Actor的编程模型,就像下图所示。
 

运行结果

  个人客户端和服务端都运行于本地,客户端tcp监听端口是2553,服务端监听端口是2552,因为本例子的代码较为健壮,因此客户端、服务端能够以任意顺序启动。客户端运行后的日志以下图所示:

 

服务端的运行日志以下图所示:

 

总结

  Akka的远端调用是你们在使用时最经常使用的特性之一,掌握起来不是什么难事,如何实现处理多种消息,并考虑其稳定性、健壮性是须要详细考虑的。

后记:通过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
 
售卖连接以下:
相关文章
相关标签/搜索