集群(多台节点机) —> 每台节点机(1个片区) —> 每一个片区(多个分片) —> 每一个分片(多个实体)java
实体: 分片管理的 Actor
Shards :分片是统一管理的一组实体
ShardRegion : 片区,部署在每一个集群节点上,对分片进行管理
ShardCoordinator : cluster-singleton 集群单例, 决定分片属于哪一个片区node
ShardRegion 在节点上启动api
带实体ID的消息--> 片区ShardRegion ,请求分片位置-->ShardCoordinator-->决定哪一个ShardRegion将拥有Shard-->tcp
ShardRegion 确认请求并建立 Shard supervisor 作为子actor -->shard actor 建立 entity -->ShardRegion和Shard 定位entity分布式
全部片区组成分布式分片管理层,带实体ID的消息直接发给本机片区,分片管理层路由消息, ShardRegion建立须要提供基于ShardRegion.MessageExtractor的实现 ,必须提供从消息抽取分片和实体ID的函数ide
package shard import akka.actor.AbstractActor import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.cluster.sharding.ClusterSharding import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardRegion import akka.japi.Option import akka.japi.pf.ReceiveBuilder import com.typesafe.config.ConfigFactory import org.slf4j.LoggerFactory import java.io.Serializable import java.time.Clock.system import java.time.Clock.system import java.util.* import java.time.Clock.system /** * Created by: tankx * Date: 2019/7/16 * Description: 集群分片示例 */ //分布到集群环境中 class DogActor : AbstractActor() { var log = LoggerFactory.getLogger(DogActor::class.java) override fun createReceive(): Receive { return ReceiveBuilder.create().matchAny(this::receive).build() } fun receive(obj: Any) { log.info("收到消息: $obj") if (obj is DogMsg) { log.info("${obj.id} ${obj.msg}") } } } //定义消息(必须带有实体ID进行分片) data class DogMsg(var id: Int, var msg: String) : Serializable //分片规则 class ShardExtractor : ShardRegion.MessageExtractor { //提取实体ID,实体对应的actor override fun entityId(message: Any?): String { if (message is DogMsg) { return message.id.toString() } else { throw RuntimeException("没法识别消息类型 $message") } } //根据实体ID,计算出对应分片ID override fun shardId(message: Any?): String { //var numberOfShards: Int = 10 //简单的分区数取模 return message.id%numberOfShards if (message is DogMsg) { //return (message.id % 10).toString() return message.id.toString() } else { throw RuntimeException("没法识别消息类型 $message") } } //对消息可进行拆封操做 override fun entityMessage(message: Any): Any { return message } } //分区中止时会派发的消息类型 object handOffStopMessage fun createActorSystem(port: Int): ActorSystem { val config = ConfigFactory.parseString( "akka.remote.artery.canonical.port=$port" ).withFallback( ConfigFactory.load() ) var actorSystem = ActorSystem.create("custerA", config); return actorSystem } fun startShardRegion(port: Int) { var actorSystem = createActorSystem(port) val settings = ClusterShardingSettings.create(actorSystem)//.withRole("ClusterShardRole") val shardReg = ClusterSharding.get(actorSystem).start( "dogShard", Props.create(DogActor::class.java), settings, ShardExtractor(), ShardCoordinator.LeastShardAllocationStrategy(10, 1), handOffStopMessage ) for (i in 1..10) { shardReg.tell(DogMsg(i, " wang"), ActorRef.noSender()) Thread.sleep(3000) } } fun shardRegProxy() { var actorSystem = createActorSystem(2663) //startProxy 代理模式,即它不会承载任何实体自己,但知道如何将消息委托到正确的位置 ClusterSharding.get(actorSystem) .startProxy("dogShard", Optional.empty(), ShardExtractor()) .let { println(" shard proxy $it started.") } Thread.sleep(3000) var shardReg = ClusterSharding.get(actorSystem).shardRegion("dogShard") for (i in 1..100) { shardReg.tell(DogMsg(i, "C wang"), ActorRef.noSender()) Thread.sleep(1500) } }
再分别启动入口函数
fun main() { startShardRegion(2661) }
fun main() { startShardRegion(2662) }
fun main() {
shardRegProxy()
}
配置文件:oop
akka { actor { provider = "cluster" } # For the sample, just bind to loopback and do not allow access from the network # the port is overridden by the logic in main class remote.artery { enabled = on transport = tcp canonical.port = 0 canonical.hostname = 127.0.0.1 } cluster { seed-nodes = [ "akka://custerA@127.0.0.1:2661", "akka://custerA@127.0.0.1:2662"] # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. auto-down-unreachable-after = 10s } }
以上示例需在同属一个集群,消息才能正常中转,因此必定要保障 ActorSystem.create(name,config) name 为一致!(调了半天消息一直没有发送成功,原来是这里的问题,KAO!)ui
同一个集群同一个ActorSystem name!this
否则会报异常:
No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed