ClusterClient能够与某个集群通讯,而自己节点没必要是集群的一部分。它只须要知道一个或多个节点的位置做为联系节点。它会跟ClusterReceptionist 创建链接,来跟集群中的特定节点发送消息。并且必须把provider改为remote或cluster。receptionist须要在集群全部节点或一组节点内启动,它能够自行启动或经过ClusterReceptionist 扩展来启动。ClusterClient能够进行通讯的actor必须是经过ClusterReceptionis扩展注册过的actor。html
看到这里,你是否是想骂人了,这么简单的功能我均可以本身实现了。不过akka就是这样,一些看似很是简单的功能,框架自己提供的功能更加稳定、更加通用,但性能不必定是最优的。废话很少说,咱们来看看ClusterClient的具体实现。node
/** * This actor is intended to be used on an external node that is not member * of the cluster. It acts like a gateway for sending messages to actors * somewhere in the cluster. From the initial contact points it will establish * a connection to a [[ClusterReceptionist]] somewhere in the cluster. It will * monitor the connection to the receptionist and establish a new connection if * the link goes down. When looking for a new receptionist it uses fresh contact * points retrieved from previous establishment, or periodically refreshed * contacts, i.e. not necessarily the initial contact points. * * You can send messages via the `ClusterClient` to any actor in the cluster * that is registered in the [[ClusterReceptionist]]. * Messages are wrapped in [[ClusterClient.Send]], [[ClusterClient.SendToAll]] * or [[ClusterClient.Publish]]. * * Use the factory method [[ClusterClient#props]]) to create the * [[akka.actor.Props]] for the actor. * * If the receptionist is not currently available, the client will buffer the messages * and then deliver them when the connection to the receptionist has been established. * The size of the buffer is configurable and it can be disabled by using a buffer size * of 0. When the buffer is full old messages will be dropped when new messages are sent * via the client. * * Note that this is a best effort implementation: messages can always be lost due to the distributed * nature of the actors involved. */ final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging
经过ClusterClient的定义和官方注释来看,就是一个普通的actor,它能够集群中的特定actor(ClusterReceptionist)进行通讯。它经过初始的联系点(其实就是ActorPath)与集群内的ClusterReceptionist发消息,同时会监控receptionist的连接状态,以确保连接正常。ClusterClient没有重定义preStart,那就看它的主构造函数吧。api
sendGetContacts() scheduleRefreshContactsTick(establishingGetContactsInterval) self ! RefreshContactsTick
分别调用了上面三段代码。缓存
def sendGetContacts(): Unit = { val sendTo = if (contacts.isEmpty) initialContactsSel else if (contacts.size == 1) initialContactsSel union contacts else contacts if (log.isDebugEnabled) log.debug(s"""Sending GetContacts to [${sendTo.mkString(",")}]""") sendTo.foreach { _ ! GetContacts } }
sendGetContacts很简单就是给当前的联系点发送GetContacts消息。安全
def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = { refreshContactsTask foreach { _.cancel() } refreshContactsTask = Some(context.system.scheduler.schedule( interval, interval, self, RefreshContactsTick)) }
scheduleRefreshContactsTick启动定时器在interval以后,每隔interval时间,给本身发送RefreshContactsTick消息。网络
第三段给本身发送了RefreshContactsTick消息。感受后面两个代码有点重复,定时器第一个参数直接设置成0不就行了?省略了第三段代码的调用。app
case RefreshContactsTick ⇒ sendGetContacts()
收到RefreshContactsTick消息怎么处理?仍是调用sendGetContacts。那请问在主构造函数里面调用sendGetContacts干啥呢?框架
var contactPaths: HashSet[ActorPath] = initialContacts.to[HashSet] val initialContactsSel = contactPaths.map(context.actorSelection) var contacts = initialContactsSel
initialContactsSel、contacts、contactPaths、initialContacts是否是很类似呢?ide
其中initialContactsSel最关键,这是把initialContacts给map成了ActorSelection,同时还给initialContacts发送了Identity消息。ActorPath是远程的actor,怎么select呢?还记得上文说过么?必须把provider配置成remote或者cluster,为啥?你猜。函数
case ActorIdentity(_, Some(receptionist)) ⇒ log.info("Connected to [{}]", receptionist.path) scheduleRefreshContactsTick(refreshContactsInterval) sendBuffered(receptionist) context.become(active(receptionist) orElse contactPointMessages) connectTimerCancelable.foreach(_.cancel()) failureDetector.heartbeat() self ! HeartbeatTick // will register us as active client of the selected receptionist
收到ActorIdentity以后调用scheduleRefreshContactsTick从新设置定时器,把缓存的消息发送给receptionist ,修改当前行为变成active。至此就能够经过Send、SendToAll、Publish给集群内特定的actor转发消息了。
def active(receptionist: ActorRef): Actor.Receive = { case Send(path, msg, localAffinity) ⇒ receptionist forward DistributedPubSubMediator.Send(path, msg, localAffinity) case SendToAll(path, msg) ⇒ receptionist forward DistributedPubSubMediator.SendToAll(path, msg) case Publish(topic, msg) ⇒ receptionist forward DistributedPubSubMediator.Publish(topic, msg) case HeartbeatTick ⇒ if (!failureDetector.isAvailable) { log.info("Lost contact with [{}], reestablishing connection", receptionist) reestablish() } else receptionist ! Heartbeat case HeartbeatRsp ⇒ failureDetector.heartbeat() case RefreshContactsTick ⇒ receptionist ! GetContacts case Contacts(contactPoints) ⇒ // refresh of contacts if (contactPoints.nonEmpty) { contactPaths = contactPoints.map(ActorPath.fromString).to[HashSet] contacts = contactPaths.map(context.actorSelection) } publishContactPoints() case _: ActorIdentity ⇒ // ok, from previous establish, already handled case ReceptionistShutdown ⇒ if (receptionist == sender()) { log.info("Receptionist [{}] is shutting down, reestablishing connection", receptionist) reestablish() } }
总结下ClusterClient的行为,它经过配置的initialContacts给远程的actor(集群内的ClusterReceptionist)发送ActorSelection消息,而后在收到第一个ActorIdentity消息后,就算联系上了集群。(剩下的ActorIdentity消息被忽略,其实就是最快返回的做为联系点)。定时第一个返回ActorIdentity消息的ClusterReceptionist发送GetContacts消息,获取全部的ClusterReceptionist实例的位置。那如何判断第一个联系点失去联系了呢?看到HeartbeatTick了吗?
val heartbeatTask = context.system.scheduler.schedule( heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
咱们刚才忽略了heartbeatTask的定义,其实这是一个定时器,每隔heartbeatInterval秒给本身发送HeartbeatTick消息。其实关于在变量定义过程当中写代码,我是不喜欢的,不利于分析源码的啊。
收到HeartbeatTick消息就给receptionist发送了Heartbeat消息,在收到HeartbeatRsp后更新failureDetector当前的心跳信息。若是failureDetector检测到失败则调用reestablish方法,从新创建连接。
ClusterClient的源码就分析到这里,下面咱们来看看Cluster内的ClusterReceptionist的实现,以前说过,咱们能够用actorOf启动或者ClusterReceptionist扩展来启动。固然优先看ClusterReceptionist扩展了啊。
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider { override def get(system: ActorSystem): ClusterClientReceptionist = super.get(system) override def lookup() = ClusterClientReceptionist override def createExtension(system: ExtendedActorSystem): ClusterClientReceptionist = new ClusterClientReceptionist(system) }
上面是ExtensionId的定义,很显然它还扩展了ExtensionIdProvider,也就是说,经过配置这个Extension就能够启动了,无需代码显式的启动。
/** * Extension that starts [[ClusterReceptionist]] and accompanying [[akka.cluster.pubsub.DistributedPubSubMediator]] * with settings defined in config section `akka.cluster.client.receptionist`. * The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension. */ final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension
有没有发现关于重要的类,官方注释都很清晰?这个扩展启动ClusterReceptionist和DistributedPubSubMediator,而DistributedPubSubMediator由DistributedPubSub扩展启动,关于DistributedPubSub后面再分析。
/** * The [[ClusterReceptionist]] actor */ private val receptionist: ActorRef = { if (isTerminated) system.deadLetters else { val name = config.getString("name") val dispatcher = config.getString("use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId case id ⇒ id } // important to use val mediator here to activate it outside of ClusterReceptionist constructor val mediator = pubSubMediator system.systemActorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config)) .withDispatcher(dispatcher), name) } } /** * Returns the underlying receptionist actor, particularly so that its * events can be observed via subscribe/unsubscribe. */ def underlying: ActorRef = receptionist
/** * Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]]. */ private def pubSubMediator: ActorRef = DistributedPubSub(system).mediator
ClusterClientReceptionist定义中有上面源码,很是关键,它启动了一个ClusterReceptionist,其余源码都是注册和注销服务的,咱们先忽略。
/** * [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is * supposed to be started on all nodes, or all nodes with specified role, in the cluster. * The receptionist can be started with the [[ClusterClientReceptionist]] or as an * ordinary actor (use the factory method [[ClusterReceptionist#props]]). * * The receptionist forwards messages from the client to the associated [[akka.cluster.pubsub.DistributedPubSubMediator]], * i.e. the client can send messages to any actor in the cluster that is registered in the * `DistributedPubSubMediator`. Messages from the client are wrapped in * [[akka.cluster.pubsub.DistributedPubSubMediator.Send]], [[akka.cluster.pubsub.DistributedPubSubMediator.SendToAll]] * or [[akka.cluster.pubsub.DistributedPubSubMediator.Publish]] with the semantics described in * [[akka.cluster.pubsub.DistributedPubSubMediator]]. * * Response messages from the destination actor are tunneled via the receptionist * to avoid inbound connections from other cluster nodes to the client, i.e. * the `sender()`, as seen by the destination actor, is not the client itself. * The `sender()` of the response messages, as seen by the client, is `deadLetters` * since the client should normally send subsequent messages via the `ClusterClient`. * It is possible to pass the original sender inside the reply messages if * the client is supposed to communicate directly to the actor in the cluster. * */ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings) extends Actor with ActorLogging
加上咱们以前的分析和官方注释,这个actor就很好理解了。ClusterClient就是发送GetContracts消息给这个actor的,ClusterReceptionist在集群内全部节点或一组节点启动。它能够经过ClusterClientReceptionist这个扩展启动,或者做为普通的actor启动(actorOf)。ClusterReceptionist把ClusterClient转发的消息再吃给你信转发给DistributedPubSubMediator或注册的DistributedPubSubMediator(也就是咱们注册的Service)。目标actor返回的消息经过DistributedPubSubMediator打的“洞”返回给客户端,其实就是修改了sender。
这个定义也能够看出,它就是一个很是普通的actor。从源码来看,主构造函数和preStart函数都没有须要特别注意的地方,那就直接看receive喽。
case GetContacts ⇒ // Consistent hashing is used to ensure that the reply to GetContacts // is the same from all nodes (most of the time) and it also // load balances the client connections among the nodes in the cluster. if (numberOfContacts >= nodes.size) { val contacts = Contacts(nodes.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut)) if (log.isDebugEnabled) log.debug("Client [{}] gets contactPoints [{}] (all nodes)", sender().path, contacts.contactPoints.mkString(",")) sender() ! contacts } else { // using toStringWithAddress in case the client is local, normally it is not, and // toStringWithAddress will use the remote address of the client val a = consistentHash.nodeFor(sender().path.toStringWithAddress(cluster.selfAddress)) val slice = { val first = nodes.from(a).tail.take(numberOfContacts) if (first.size == numberOfContacts) first else first union nodes.take(numberOfContacts - first.size) } val contacts = Contacts(slice.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut)) if (log.isDebugEnabled) log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(",")) sender() ! contacts }
对GetContacts消息的处理咱们须要特别关注,毕竟ClusterClient就是发送这个消息来获取集群内service信息的。第一个if语句的注释也很明白,有一个一致性hash来保证全部节点对GetContacts消息的返回都是一致的。
case msg @ (_: Send | _: SendToAll | _: Publish) ⇒ val tunnel = responseTunnel(sender()) tunnel ! Ping // keep alive pubSubMediator.tell(msg, tunnel)
上面就是收到Send、SendToAll、Publish消息的处理逻辑。好像就是把消息发送给了pubSubMediator,这里出现了前面注释中说的“打洞”
def responseTunnel(client: ActorRef): ActorRef = { val encName = URLEncoder.encode(client.path.toSerializationFormat, "utf-8") context.child(encName) match { case Some(tunnel) ⇒ tunnel case None ⇒ context.actorOf(Props(classOf[ClientResponseTunnel], client, responseTunnelReceiveTimeout), encName) } }
它在干啥,又建立了一个ClientResponseTunnel这个actor?把这个actor做为service消息的返回者?而后还有一个responseTunnelReceiveTimeout超时时间?
/** * Replies are tunneled via this actor, child of the receptionist, to avoid * inbound connections from other cluster nodes to the client. */ class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging { context.setReceiveTimeout(timeout) private val isAsk = { val pathElements = client.path.elements pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$") } def receive = { case Ping ⇒ // keep alive from client case ReceiveTimeout ⇒ log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path) context stop self case msg ⇒ client.tell(msg, Actor.noSender) if (isAsk) context stop self } }
这个actor功能很简单,就是给client转发消息,这尼玛有点太绕了啊。在本地给各个client有建立了一个代理actor啊,返回的消息都经过这个actor返回啊,为啥不直接在服务端就把消息发送给client了呢?其实想一想这是很是合理且必要的。有可能service所在的节点,与客户端网络是不通的。或者为了安全管理不能直接通讯,经过这个代理回送消息就很必要了。无论怎么样吧,akka的都是对的,akka的都是好的。
case Heartbeat ⇒ if (verboseHeartbeat) log.debug("Heartbeat from client [{}]", sender().path) sender() ! HeartbeatRsp updateClientInteractions(sender())
还有就是对客户端发送的Heartbeat消息的处理,处理逻辑很简单,但有一点须要注意,那就是对客户端列表的一个维护。也就是说在每一个ClusterReceptionist都是有客户端列表的。其实吧,这一点我是很是不赞同的。毕竟客户端有多是海量的,光是维护这个列表就很是耗内存了。弄这个列表虽然功能上很是丰富,但容易形成OOM啊。若是客户端很少,说明akka尚未正式被你们所熟知或者被大公司使用啊。
好了,ClusterClient就分析到这里了。聪明的读者可能会问,我尚未看到消息是如何经过ClusterReceptionist发送给实际的服务actor啊,pubSubMediator.tell(msg, tunnel)这段代码是如何路由消息的呢?嗯,确实,不过别急,这个会在下一章节(DistributedPubSubMediator)讲解。毕竟官方在ClusterClient的文档中,直接推荐用DistributedPubSubMediator来实现相似的功能。我以为吧,这又是一个坑,既然你都推荐DistributedPubSubMediator了,还提供ClusterClient模块干啥呢?直接废弃掉啊。