在不少应用场景中都会出如今系统中须要某类Actor的惟一实例(only instance)。这个实例在集群环境中可能在任何一个节点上,但保证它是惟一的。Akka的Cluster-Singleton提供对这种Singleton Actor模式的支持,能作到当这个实例所在节点出现问题须要脱离集群时自动在另外一个节点上构建一个一样的Actor,并从新转交控制。固然,因为涉及了一个新构建的Actor,内部状态会在这个过程当中丢失。Single-Actor的主要应用包括某种对外部只能支持一个接入的程序接口,或者一种带有由多个其它Actor运算结果产生的内部状态的累积型Actor(aggregator)。固然,若是使用一种带有内部状态的Singleton-Actor,能够考虑使用PersistenceActor来实现内部状态的自动恢复。如此Cluster-Singleton变成了一种很是实用的模式,能够在许多场合下应用。java
Cluster-Singleton模式也偏偏由于它的惟一性特色存在着一些隐忧,须要特别关注。惟一性容易形成的隐忧包括:容易形成超负荷、没法保证稳定在线、没法保证消息投递。这些须要用户在编程时增长特别处理。node
好了,咱们设计个例子来了解Cluster-Singleton,先看看Singleton-Actor的功能:编程
class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0
var freeTrees = 0
var ttlMatches = 0
override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole =>
if (freeTrees > 0) { ttlMatches += 1 freeTrees -= 1 } else freeHoles += 1
case AddTree =>
if (freeHoles > 0) { ttlMatches += 1 freeHoles -= 1 } else freeTrees += 1 } override def receiveRecover: Receive = { case evt: Event => updateState(evt) case SnapshotOffer(_,ss: State) => freeHoles = ss.nHoles freeTrees = ss.nTrees ttlMatches = ss.nMatches } override def receiveCommand: Receive = { case Dig => persist(AddHole){evt => updateState(evt) } sender() ! AckDig //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Plant => persist(AddTree) {evt => updateState(evt) } sender() ! AckPlant //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Disconnect => //this node exits cluster. expect switch to another node
log.info(s"${cluster.selfAddress} is leaving cluster ...") cluster.leave(cluster.selfAddress) case CleanUp =>
//clean up ...
self ! PoisonPill } }
这个SingletonActor就是一种特殊的Actor,它继承了PersistentActor,因此须要实现PersistentActor的抽象函数。SingletonActor维护了几个内部状态,分别是各种运算的当前累积结果freeHoles,freeTrees,ttlMatches。SingletonActor模拟的是一个种树场景:当收到Dig指令后产生登记树坑AddHole事件,在这个事件中更新当前状态值;当收到Plant指令后产生AddTree事件并更新状态。由于Cluster-Singleton模式没法保证消息安全投递因此应该加个回复机制AckDig,AckPlant让消息发送者可用根据状况补发消息。咱们是用Cluster.selfAddress来确认当前集群节点的转换。安全
咱们须要在全部承载SingletonActor的集群节点上构建部署ClusterSingletonManager,以下:app
def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") }
能够看的出来,ClusterSingletonManager也是一种Actor,经过ClusterSingletonManager.props配置其所管理的SingletonActor。咱们的目的主要是去求证当前集群节点出现故障须要退出集群时,这个SingletonActor是否可以自动转移到其它在线的节点上。ClusterSingletonManager的工做原理是首先在全部选定的集群节点上构建和部署,而后在最早部署的节点上启动SingletonActor,当这个节点不可以使用时(unreachable)自动在次先部署的节点上从新构建部署SingletonActor。frontend
一样做为一种Actor,ClusterSingletonProxy是经过与ClusterSingletonManager消息沟通来调用SingletonActor的。ClusterSingletonProxy动态跟踪在线的SingletonActor,为用户提供它的ActorRef。咱们能够经过下面的代码来具体调用SingletonActor:tcp
object SingletonUser { def create = { val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]") .withFallback(ConfigFactory.load()) val suSystem = ActorSystem("SingletonClusterSystem",config) val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props( singletonManagerPath = "/user/singletonManager", settings = ClusterSingletonProxySettings(suSystem).withRole(None) ), name= "singletonUser") import suSystem.dispatcher //send Dig messages every 2 seconds to SingletonActor through prox
suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig) //send Plant messages every 3 seconds to SingletonActor through prox
suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant) //send kill message to hosting node every 30 seconds
suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect) } }
咱们分不一样的时间段经过ClusterSingletonProxy向SingletonActor发送Dig和Plant消息。而后每隔30秒向SingletonActor发送一个Disconnect消息通知它所在节点开始脱离集群。而后咱们用下面的代码来试着运行:ide
package clustersingleton.demo import clustersingleton.sa.SingletonActor import clustersingleton.frontend.SingletonUser object ClusterSingletonDemo extends App { SingletonActor.create(2551) //seed-node
SingletonActor.create(0) //ClusterSingletonManager node
SingletonActor.create(0) SingletonActor.create(0) SingletonActor.create(0) SingletonUser.create //ClusterSingletonProxy node
}
运算结果以下:函数
[INFO] [07/09/2017 20:17:28.210] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.334] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:2551] [INFO] [07/09/2017 20:17:28.489] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.493] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55839] [INFO] [07/09/2017 20:17:28.514] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.528] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55840] [INFO] [07/09/2017 20:17:28.566] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.571] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55841] [INFO] [07/09/2017 20:17:28.595] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.600] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55842] [INFO] [07/09/2017 20:17:28.620] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.624] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55843] [INFO] [07/09/2017 20:17:28.794] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:28.817] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=0,ttlMatches=0 [INFO] [07/09/2017 20:17:29.679] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=1,freeTrees=0,ttlMatches=0 ... [INFO] [07/09/2017 20:17:38.676] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:2551 is leaving cluster ... [INFO] [07/09/2017 20:17:39.664] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=4 [INFO] [07/09/2017 20:17:40.654] [SingletonClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=2,ttlMatches=4 [INFO] [07/09/2017 20:17:41.664] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=5 [INFO] [07/09/2017 20:17:42.518] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:43.653] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=5 [INFO] [07/09/2017 20:17:43.672] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=1,ttlMatches=6 [INFO] [07/09/2017 20:17:45.665] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=6 [INFO] [07/09/2017 20:17:46.654] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=6 ... [INFO] [07/09/2017 20:17:53.673] [SingletonClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:55839 is leaving cluster ... [INFO] [07/09/2017 20:17:55.654] [SingletonClusterSystem-akka.actor.default-dispatcher-13] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=4,ttlMatches=9 [INFO] [07/09/2017 20:17:55.664] [SingletonClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=10 [INFO] [07/09/2017 20:17:56.646] [SingletonClusterSystem-akka.actor.default-dispatcher-5] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:57.662] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=4,ttlMatches=10 [INFO] [07/09/2017 20:17:58.652] [SingletonClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=5,ttlMatches=10
从结果显示里咱们能够观察到随着节点脱离集群,SingletonActor自动转换到其它的集群节点上继续运行。工具
值得再三注意的是:以此等简单的编码就能够实现那么复杂的集群式分布运算程序,说明Akka是一种具备广阔前景的实用编程工具!
下面是本次示范的完整源代码:
build.sbt
name := "cluster-singleton" version := "1.0" scalaVersion := "2.11.9" resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/" val akkaversion = "2.4.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaversion, "com.typesafe.akka" %% "akka-remote" % akkaversion, "com.typesafe.akka" %% "akka-cluster" % akkaversion, "com.typesafe.akka" %% "akka-cluster-tools" % akkaversion, "com.typesafe.akka" %% "akka-cluster-sharding" % akkaversion, "com.typesafe.akka" %% "akka-persistence" % "2.4.8", "com.typesafe.akka" %% "akka-contrib" % akkaversion, "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8")
application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://SingletonClusterSystem@127.0.0.1:2551"] log-info = off } persistence { journal.plugin = "akka.persistence.journal.leveldb-shared" journal.leveldb-shared.store { # DO NOT USE 'native = off' IN PRODUCTION !!! native = off dir = "target/shared-journal" } snapshot-store.plugin = "akka.persistence.snapshot-store.local" snapshot-store.local.dir = "target/snapshots" } }
SingletonActor.scala
package clustersingleton.sa import akka.actor._ import akka.cluster._ import akka.persistence._ import com.typesafe.config.ConfigFactory import akka.cluster.singleton._ import scala.concurrent.duration._ import akka.persistence.journal.leveldb._ import akka.util.Timeout import akka.pattern._ object SingletonActor { sealed trait Command case object Dig extends Command case object Plant extends Command case object AckDig extends Command //acknowledge
case object AckPlant extends Command //acknowledge
case object Disconnect extends Command //force node to leave cluster
case object CleanUp extends Command //clean up when actor ends
sealed trait Event case object AddHole extends Event case object AddTree extends Event case class State(nHoles: Int, nTrees: Int, nMatches: Int) def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } } class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0
var freeTrees = 0
var ttlMatches = 0
override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole =>
if (freeTrees > 0) { ttlMatches += 1 freeTrees -= 1 } else freeHoles += 1
case AddTree =>
if (freeHoles > 0) { ttlMatches += 1 freeHoles -= 1 } else freeTrees += 1 } override def receiveRecover: Receive = { case evt: Event => updateState(evt) case SnapshotOffer(_,ss: State) => freeHoles = ss.nHoles freeTrees = ss.nTrees ttlMatches = ss.nMatches } override def receiveCommand: Receive = { case Dig => persist(AddHole){evt => updateState(evt) } sender() ! AckDig //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Plant => persist(AddTree) {evt => updateState(evt) } sender() ! AckPlant //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Disconnect => //this node exits cluster. expect switch to another node
log.info(s"${cluster.selfAddress} is leaving cluster ...") cluster.leave(cluster.selfAddress) case CleanUp =>
//clean up ...
self ! PoisonPill } }
SingletonUser.scala
package clustersingleton.frontend import akka.actor._ import clustersingleton.sa.SingletonActor import com.typesafe.config.ConfigFactory import akka.cluster.singleton._ import scala.concurrent.duration._ object SingletonUser { def create = { val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]") .withFallback(ConfigFactory.load()) val suSystem = ActorSystem("SingletonClusterSystem",config) val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props( singletonManagerPath = "/user/singletonManager", settings = ClusterSingletonProxySettings(suSystem).withRole(None) ), name= "singletonUser") import suSystem.dispatcher //send Dig messages every 2 seconds to SingletonActor through prox
suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig) //send Plant messages every 3 seconds to SingletonActor through prox
suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant) //send kill message to hosting node every 30 seconds
suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect) } }
ClusterSingletonDemo.scala
package clustersingleton.demo import clustersingleton.sa.SingletonActor import clustersingleton.frontend.SingletonUser object ClusterSingletonDemo extends App { SingletonActor.create(2551) //seed-node
SingletonActor.create(0) //ClusterSingletonManager node
SingletonActor.create(0) SingletonActor.create(0) SingletonActor.create(0) SingletonUser.create //ClusterSingletonProxy node
}