在《使用Akka构建集群(一)》一文中经过简单集群监听器的例子演示了如何使用Akka搭建一个简单的集群,可是这个例子“也许”离咱们的实际业务场景太远,你基本不太可能去作这样的工做,除非你负责运维、监控相关的工做(但实际上一个合格的程序员在实现功能的同时,也应当考虑监控的问题,至少应当接入一些监控系统或框架)。css
本文将介绍一个相对看来更符合咱们对于集群使用的业务需求的例子——将客户端请求的字符串转换为大写(假如客户端真的没有这个能力的话)。html
本文的Akka配置继续沿用《使用Akka构建集群(一)》一文中所展现的配置,但在正式编码以前咱们须要在配置中增长一个新的配置项akka.cluster.roles指定集群中服务端的角色,从新编辑事后的application.conf以下:前端
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://metadataAkkaSystem@127.0.0.1:2551", "akka.tcp://metadataAkkaSystem@127.0.0.1:2552"] #//#snippet # excluded from snippet auto-down-unreachable-after = 10s #//#snippet # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. # # auto-down-unreachable-after = 10s roles = [backend] # Disable legacy metrics in akka-cluster. metrics.enabled=off } }
你仍然不须要过多产生于集群直接相关的细节。若是你已经阅读了《使用Akka构建集群(一)》一文,本文介绍的内容应该不会花费你太多的时间。java
客户端与服务端通讯须要一些pojo,它们的实现以下:node
public interface TransformationMessages { public static class TransformationJob implements Serializable { private final String text; public TransformationJob(String text) { this.text = text; } public String getText() { return text; } } public static class TransformationResult implements Serializable { private final String text; public TransformationResult(String text) { this.text = text; } public String getText() { return text; } @Override public String toString() { return "TransformationResult(" + text + ")"; } } public static class JobFailed implements Serializable { private final String reason; private final TransformationJob job; public JobFailed(String reason, TransformationJob job) { this.reason = reason; this.job = job; } public String getReason() { return reason; } public TransformationJob getJob() { return job; } @Override public String toString() { return "JobFailed(" + reason + ")"; } } public static final String BACKEND_REGISTRATION = "BackendRegistration"; }
TransformationJob表明待转换的任务,其text属性是须要处理的字符串文本;TransformationResult是任务处理的结果,其text属性是转换完成的字符串文本;JobFailed是任务失败,其reason属性表明失败缘由;字符串常量BACKEND_REGISTRATION用于服务端向客户端注册,以便于客户端知道有哪些服务端能够提供服务。程序员
服务端用于将字符串转换为大写的Actor(正如我以前的文章所言,真正的处理应当从Actor中分离出去,只少经过接口解耦)的实现见代码清单1所示。spring
代码清单1后端
TransformationBackend在preStart方法中订阅了集群的MemberUp事件,这样当它发现新注册的集群成员节点的角色是frontend(前端)时,将向此节点发送BACKEND_REGISTRATION消息,后者将会知道前者提供了服务。TransformationBackend所在的节点在刚刚加入集群时,TransformationBackend还会收到CurrentClusterState消息,从中能够解析出集群中的全部前端节点(即roles为frontend的),并向其发送BACKEND_REGISTRATION消息。通过以上两步能够确保集群中的前端节点和后端节点不管启动或加入集群的顺序怎样变化,都不会影响后端节点通知全部的前端节点及前端节点知道哪些后端节点提供了服务。缓存
客户端除了监听端口不一样外,也须要增长akka.cluster.roles配置项,咱们指定为frontend。客户端的配置以下:app
客户端用于处理转换任务的Actor见代码清单2所说。
代码清单2
能够看到TransformationFrontend处理的消息分为如下三种:
初始化服务端TransformationBackend的代码以下:
logger.info("Start transformationBackend"); final ActorRef transformationBackend = actorSystem.actorOf(springExt.props("TransformationBackend"), "transformationBackend"); actorMap.put("transformationBackend", transformationBackend); logger.info("Started transformationBackend");
初始化客户端TransformationFrontend的代码以下:
能够看到咱们在客户端每2秒将发送一个新的消息,这个消息以“hello-”开头,后边是一个不断自增的数字。当收处处理结果后,客户端还会将结果打印出来。
咱们以3个服务端节点(host相同,端口分别为255一、2552及随机)、1个客户端节点(端口随机)组成的集群为例,咱们首先启动第一个种子节点,而后以任意顺序启动其它服务端或者客户端节点(启动顺序问题在《使用Akka构建集群(一)》一文中已介绍,此处再也不赘述),集群成员变化的日志以下图:
从上面展现的日志中能够看到集群的3个服务端节点和1个客户端节点前后加入集群的信息。
咱们再来看看端口为57222的角色为frontend的节点的日志信息,以下图:
从frontend的日志看出,它已经打印了大写得HELLO-3到HELLO-10十条任务处理结果。那么这些任务分别是由集群中的哪些节点负责处理的?咱们首先来看看端口为2551的backend节点,其处理任务的日志以下图:
看来2551节点处理了hello-四、hello-7及hello-10三条任务。咱们再来看看端口为2552的backend节点,其处理任务的日志以下图:
能够看到2552节点处理了hello-二、hello-5及hello-8三条任务。最后看看端口为57211的backend节点,其处理任务的日志以下图:
能够看到从hello-3到hello-10这8条处理任务被均衡的分配给了3个不一样的后端节点处理。奇怪的是hello-1这条消息竟然没有任何显示,那是由于前端节点刚开始处理消息时,backends列表里尚未缓存好任何backend的ActorRef。咱们向上查找frontend节点的日志,在相隔很远的日志中发现了下面的输出:
这也印证了咱们的猜想。
根据本文的例子,你们应当看到使用Akka构建集群,开发人员只须要关注消息的发送与接收,而无需过多涉及集群的细节。不管前端仍是后端节点均可以加入同一个集群,并且多个后端节点处理消息也能达到负载均衡的功效。
其它Akka应用的博文以下: