为了使一个项目支持集群,本身学习使用了 akka cluster 并在项目中实施了,今后,生活就变得有些痛苦。再配上 apache 作反向代理和负载均衡,debug 起来不要太酸爽。直到如今,我还对 akka cluster 输出的 log 不是很熟悉,目前网络上 akka cluster 的信息还比较少,想深刻了解这东西的话,仍是要本身读 source code。前几天,雪球那帮人说 akka 不推荐使用,有不少坑,这给我提了个醒,目前我对 akka 的理解是远远不够的,须要深刻学习。html
akka cluster sharding 是 akka 的一个 extension。12年左右,有人在 google group 中开始讨论dedicated actor for each entity 这个概念,通过不少讨论,最终由 Patrik Nordwall 实现,以 experimental 的形式加入到 akka contri 库里。我原本不知道有这么一个东西,甚至想过本身实现一个这样玩意。我并无为 cluster sharding 作过 benchmark,也不知道该怎么作,http://dcaoyuan.github.io/papers/rpi_cluster/benchmark.html 作了一个在树莓派上的benchmark,单个节点1000 qps,很像学习下他的 benchmark 的代码。git
第一篇,学习下 cluster sharding 中是如何使用替身模式的。首先,什么是替身模式:一个 actor 收到 request 后可能会作一些比较复杂的操做,典型的操做好比,汇集操做。举个例子,primary 节点 为了知道各个 replica 节点的状态,他会 ping 全部的 replica,收集他们的反馈,记录他们的存活状态,这种场景下,就比较适合新建立一个 actor,它专门作着一件事。这样作有几个优势,首先,primary actor 能够把这部分逻辑放到其余 actor 中,不会搞乱本身自己的逻辑,其实 actor 仅有一个 receive 函数,case 写的多了会很乱的。其次,把这种事情交给其余 actor,这个 actor 即使因异常重启,也不会对系统有太大影响,重作一遍便可。总之,替身模式,就是指建立一个替身actor来单独作一件事。github
在 cluster sharding 中,有两个逻辑使用了 替身模式,一个是 stop cluster。apache
/**
* INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of
* them have terminated it replies with `ShardStopped`.
*/
private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any)
extends Actor {
import ShardCoordinator.Internal.ShardStopped
entities.foreach { a ⇒
context watch a
a ! stopMessage
}
var remaining = entities
def receive = {
case Terminated(ref) ⇒
remaining -= ref
if (remaining.isEmpty) {
replyTo ! ShardStopped(shard)
context stop self
}
}
}
第二个用法,也是用来 hande off 网络
/** * INTERNAL API. Rebalancing process is performed by this actor. * It sends `BeginHandOff` to all `ShardRegion` actors followed by * `HandOff` to the `ShardRegion` responsible for the shard. * When the handoff is completed it sends [[RebalanceDone]] to its * parent `ShardCoordinator`. If the process takes longer than the * `handOffTimeout` it also sends [[RebalanceDone]]. */private[akka] class RebalanceWorker(shard: String, from: ActorRef, handOffTimeout: FiniteDuration, regions: Set[ActorRef]) extends Actor { import Internal._ regions.foreach(_ ! BeginHandOff(shard)) var remaining = regions import context.dispatcher context.system.scheduler.scheduleOnce(handOffTimeout, self, ReceiveTimeout) def receive = { case BeginHandOffAck(`shard`) ⇒ remaining -= sender() if (remaining.isEmpty) { from ! HandOff(shard) context.become(stoppingShard, discardOld = true) } case ReceiveTimeout ⇒ done(ok = false) } def stoppingShard: Receive = { case ShardStopped(shard) ⇒ done(ok = true) case ReceiveTimeout ⇒ done(ok = false) } def done(ok: Boolean): Unit = { context.parent ! RebalanceDone(shard, ok) context.stop(self) }}