本文由 GodPan 发表在 ScalaCool 团队博客。node
上一篇文章咱们讲了Akka Remote,理解了Akka中的远程通讯,其实Akka Cluster能够当作Akka Remote的扩展,由原来的两点变成由多点组成的通讯网络,这种模式相信你们都很了解,就是集群,它的优点主要有两点:系统伸缩性高,容错性更好。git
不少人很容易把分布式和集群的概念搞错,包括我也是,我一开始也觉得它们两个是同样的概念,只是叫法不一样而已,但其实否则,虽然它们在实际场景中都是部署在不一样的机器上,但它们所提供的功能并非同样的。举个简单的例子来看看它们之间的不一样:github
为了保持整个系列连续性,我又以抽奖为基础举一个例子:算法
假定咱们如今抽奖流程包括,抽奖分配奖品和用户根据连接领取指定奖品,用户先抽奖而后获取奖品连接,点击连接填写相应信息领取奖品。数据库
咱们如今把抽奖分配奖品和用户根据连接领取指定奖品分别部署在两台机器上,忽然有一天很不幸,抽奖活动进行到一半,抽奖分配奖品那台机子所在的区域停电了,很显然,后续的用户参与抽奖就不能进行了,由于咱们只有一台抽奖分配奖品的机子,但因为咱们将领取奖品的业务部署在另外一台机器上,因此前面那些中奖的用户仍是能够正常的领取奖品,具体相关定义可参考《分布式系统概念与设计》中对分布式系统的定义。bash
如今咱们仍是有两台机器,可是咱们在两个机器上都部署了抽奖分配奖品和用户根据连接领取指定奖品的业务逻辑,忽然有一天,有一台所在的区域停电了,但这时咱们并担忧,由于另外一台服务器仍是能够正常的运行处理用户的全部请求。服务器
它们的各自特色:网络
总的来讲: 分布式是以分离任务缩短期来提升效率,而集群是在单位时间内处理更多的任务来提升效率。负载均衡
在前面的文章Akka Actor的工做方式,咱们能够将一个任务分解成一个个小任务,而后分配给它的子Actor执行,其实这就能够当作一个小的分布式系统,那么在Akka中,集群又是一种怎样的概念呢?frontend
其实往简单里说,就是一些相同的ActorSystem的组合,它们具备着相同的功能,咱们须要执行的任务能够随机的分配到目前可用的ActorSystem上,这点跟Nginx的负载均衡很相似,根据算法和配置将请求转发给运行正常的服务器去,Akka集群的表现形式也是这样,固然它背后的理论基础是基于gossip协议的,目前不少分布式的数据库的数据同步都采用这个协议,有兴趣的同窗能够本身去研究研究,只是我也是只知其一;不知其二,这里就不写了,怕误导了你们。
下面我来说讲Akka Cluster中比较重要的几个概念:
Seed Nodes能够看过是种子节点或者原始节点,它的一个主要做用用于能够自动接收新加入集群的节点的信息,并与之通讯,使用方式能够用配置文件或者运行时指定,推荐使用配置文件方式,好比:
akka.cluster.seed-nodes = [
"akka.tcp://ClusterSystem@host1:2552",
"akka.tcp://ClusterSystem@host2:2552"]复制代码
seed-nodes列表中的第一个节点会集群启动的时候初始化,而其余节点则是在有须要时再初始化。
固然你也能够不指定seed nodes,但你能够须要手动或者在程序中写相关逻辑让相应的节点加入集群,具体使用方式可参考官方文档。
Cluster Events字面意思是集群事件,那么这是什么意思呢?其实它表明着是一个节点的各类状态和操做,举个例子,假设你在打一局王者5v5的游戏,那么你能够把十我的当作一个集群,咱们每一个人都是一个节点,咱们的任何操做和状态都能被整个系统捕获到,好比A杀了B、A超神了,A离开了游戏,A从新链接了游戏等等,这些状态和操做在Cluster Events中就至关于节点之于集群,那么它具体是怎么使用的呢?
首先咱们必须将节点注册到集群中,或者说节点订阅了某个集群,咱们能够这么作:
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])复制代码
具体代码相关的使用我会再下面写一个demo例子,来讲明是如何具体使用它们的。
从上面的代码咱们能够看到有一个MemberEvent的概念,这个其实就是每一个成员所可能拥有的events,那么一个成员在它的生命周期中有如下的events
状态说明:
虽然上面说到集群中的各个节点的功能是同样的,其实并不必定,好比咱们将分布式和集群融合到一块儿,集群中的一部分节点负责接收请求,一部分用于计算,一部分用于数据存储等等,因此Akka Cluster提供了一种Roles的概念,用来表示该节点的功能特性,咱们能够在配置文件中指定,好比:
akka.cluster.roles = request
akka.cluster.roles = compute
akka.cluster.roles = store复制代码
ClusterClient是一个集群客户端,主要用于集群外部系统与集群通讯,使用它很是方便,咱们只须要将集群中的任意指定一个节点做为集群客户端,而后将其注册为一个该集群的接待员,最后咱们就能够在外部系统直接与之通讯了,使用ClusterClient须要作相应的配置:
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]复制代码
假设咱们如今我一个接待的Actor,叫作frontend,咱们就能够这样作:
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
ClusterClientReceptionist(system).registerService(frontend)复制代码
上面讲了集群概念和Akka Cluster中相对重要的概念,下面咱们就来写一个Akka Cluster的demo,
demo需求:
线假设须要执行一些相同任务,频率为2s一个,如今咱们须要将这些任务分配给Akka集群中的不一样节点去执行,这里使用ClusterClient做为集群与外部的通讯接口。
首先咱们先来定义一些命令:
package sample.cluster.transformation
final case class TransformationJob(text: String) // 任务内容
final case class TransformationResult(text: String) // 执行任务结果
final case class JobFailed(reason: String, job: TransformationJob) //任务失败相应缘由
case object BackendRegistration // 后台具体执行任务节点注册事件复制代码
而后咱们实现具体执行任务逻辑的后台节点:
class TransformationBackend extends Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) //在启动Actor时将该节点订阅到集群中
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case TransformationJob(text) => { // 接收任务请求
val result = text.toUpperCase // 任务执行获得结果(将字符串转换为大写)
sender() ! TransformationResult(text.toUpperCase) // 向发送者返回结果
}
case state: CurrentClusterState =>
state.members.filter(_.status == MemberStatus.Up) foreach register // 根据节点状态向集群客户端注册
case MemberUp(m) => register(m) // 将刚处于Up状态的节点向集群客户端注册
}
def register(member: Member): Unit = { //将节点注册到集群客户端
context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
BackendRegistration
}
}复制代码
相应节点的配置文件信息,我这里就不贴了,请从相应的源码demo里获取。源码连接
接着咱们来实现集群客户端:
class TransformationFrontend extends Actor {
var backends = IndexedSeq.empty[ActorRef] //任务后台节点列表
var jobCounter = 0
def receive = {
case job: TransformationJob if backends.isEmpty => //目前暂无执行任务节点可用
sender() ! JobFailed("Service unavailable, try again later", job)
case job: TransformationJob => //执行相应任务
jobCounter += 1
implicit val timeout = Timeout(5 seconds)
val backend = backends(jobCounter % backends.size) //根据相应算法选择执行任务的节点
println(s"the backend is ${backend} and the job is ${job}")
val result = (backend ? job)
.map(x => x.asInstanceOf[TransformationResult]) // 后台节点处理获得结果
result pipeTo sender //向外部系统发送执行结果
case BackendRegistration if !backends.contains(sender()) => // 添加新的后台任务节点
context watch sender() //监控相应的任务节点
backends = backends :+ sender()
case Terminated(a) =>
backends = backends.filterNot(_ == a) // 移除已经终止运行的节点
}
}复制代码
最后咱们实现与集群客户端交互的逻辑:
class ClientJobTransformationSendingActor extends Actor {
val initialContacts = Set(
ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"))
val settings = ClusterClientSettings(context.system)
.withInitialContacts(initialContacts)
val c = context.system.actorOf(ClusterClient.props(settings), "demo-client")
def receive = {
case TransformationResult(result) => {
println(s"Client response and the result is ${result}")
}
case Send(counter) => {
val job = TransformationJob("hello-" + counter)
implicit val timeout = Timeout(5 seconds)
val result = Patterns.ask(c,ClusterClient.Send("/user/frontend", job, localAffinity = true), timeout)
result.onComplete {
case Success(transformationResult) => {
self ! transformationResult
}
case Failure(t) => println("An error has occured: " + t.getMessage)
}
}
}
}复制代码
下面咱们开始运行这个domo:
object DemoClient {
def main(args : Array[String]) {
TransformationFrontendApp.main(Seq("2551").toArray) //启动集群客户端
TransformationBackendApp.main(Seq("8001").toArray) //启动三个后台节点
TransformationBackendApp.main(Seq("8002").toArray)
TransformationBackendApp.main(Seq("8003").toArray)
val system = ActorSystem("OTHERSYSTEM")
val clientJobTransformationSendingActor =
system.actorOf(Props[ClientJobTransformationSendingActor],
name = "clientJobTransformationSendingActor")
val counter = new AtomicInteger
import system.dispatcher
system.scheduler.schedule(2.seconds, 2.seconds) { //定时发送任务
clientJobTransformationSendingActor ! Send(counter.incrementAndGet())
}
StdIn.readLine()
system.terminate()
}
}复制代码
运行结果:
从结果能够看到,咱们将任务根据算法分配给不一样的后台节点进行执行,最终返回结果。
本文的demo例子已上传github:源码连接