一个设计优秀的工具或框架,应该都有一个易用、强大的插件或扩展体系,akka也不例外。html
akka的扩展方法很是简单,由于只涉及到两个组件:Extension、 ExtensionId。其中Extension在每一个ActorSystem中只会加载一次,而后被akka管理。你能够在ActorSystem启动的时候以编程的方式加载,也能够经过配置的方式自动加载。因为Extension是在ActorSystem层面的扩展,因此须要开发者本身处理线程安全的问题。ExtensionId能够理解为Extension的一个惟一标志,ActorSystem会根据它来判断Extension是否被加载过,以确保Extension只能加载一次。java
/** * The basic ActorSystem covers all that is needed for locally running actors, * using futures and so on. In addition, more features can hook into it and * thus become visible to actors et al by registering themselves as extensions. * This is accomplished by providing an extension—which is an object * implementing this trait—to `ActorSystem.registerExtension(...)` or by * specifying the corresponding option in the configuration passed to * ActorSystem, which will then instantiate (without arguments) each FQCN and * register the result. * * The extension itself can be created in any way desired and has full access * to the ActorSystem implementation. * * This trait is only a marker interface to signify an Akka Extension. */ trait Extension
上面是Extension的定义,能够看出它很是简单,简单到就是一个trait,没有任何字段和方法。也就是说咱们实现的对akka的扩展能够是任意形式的类,并且会被保证加载一次,那么是如何保证只会加载一次的呢?ExtensionId也许能够回答这个问题。编程
/** * Identifies an Extension * Lookup of Extensions is done by object identity, so the Id must be the same wherever it's used, * otherwise you'll get the same extension loaded multiple times. */ trait ExtensionId[T <: Extension] { /** * Returns an instance of the extension identified by this ExtensionId instance. */ def apply(system: ActorSystem): T = { java.util.Objects.requireNonNull(system, "system must not be null!").registerExtension(this) } /** * Returns an instance of the extension identified by this ExtensionId instance. * Java API * For extensions written in Scala that are to be used from Java also, * this method should be overridden to get correct return type. * {{{ * override def get(system: ActorSystem): TheExtension = super.get(system) * }}} * */ def get(system: ActorSystem): T = apply(system) /** * Is used by Akka to instantiate the Extension identified by this ExtensionId, * internal use only. */ def createExtension(system: ExtendedActorSystem): T override final def hashCode: Int = System.identityHashCode(this) override final def equals(other: Any): Boolean = this eq other.asInstanceOf[AnyRef] }
ExtensionId也很简单,首先这是一个trait,且有一个类型变量T,要求T是Extension的子类。其中有一个apply,经过system返回一个T的实例。createExtension没有实现。那须要继续深刻registerExtension的代码。安全
/** * Registers the provided extension and creates its payload, if this extension isn't already registered * This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization * of the payload, if is in the process of registration from another Thread of execution */ def registerExtension[T <: Extension](ext: ExtensionId[T]): T
经过registerExtension的定义来看,官方注释写的也很清楚,它就是在注册一个extension,而且建立一个实例。若是这个extension已经注册过,就再也不注册。app
@tailrec final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = { findExtension(ext) match { case null ⇒ //Doesn't already exist, commence registration val inProcessOfRegistration = new CountDownLatch(1) extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process case null ⇒ try { // Signal was successfully sent ext.createExtension(this) match { // Create and initialize the extension case null ⇒ throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]") case instance ⇒ extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension instance //Profit! } } catch { case t: Throwable ⇒ extensions.replace(ext, inProcessOfRegistration, t) //In case shit hits the fan, remove the inProcess signal throw t //Escalate to caller } finally { inProcessOfRegistration.countDown //Always notify listeners of the inProcess signal } case other ⇒ registerExtension(ext) //Someone else is in process of registering an extension for this Extension, retry } case existing ⇒ existing.asInstanceOf[T] } }
咱们来看看registerExtension的具体实现,它首先经过findExtension查找对应的ExtensionId是否已经注册,若是已经注册,则直接返回找到的结果,不然就进行建立。在case null分支中,有一个CountDownLatch。咱们有必要简要介绍一下这个类的做用和使用方法。框架
“CountDownLatch典型用法1:某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为n new CountDownLatch(n)
,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown()
,当计数器的值变为0时,在CountDownLatch上 await()
的线程就会被唤醒”ide
也就是说registerExtension是会保证线程安全的,以保证Extension只被加载一次。extensions会经过putIfAbsent方法插入ExtensionId与inProcessOfRegistration的键值对,固然了extensions是一个ConcurrentHashMap。若是key不存在,即第一次注册的时候,则把键值对插入并返回null。因此第一次注册会命中case null,而后把当前ActorSystem传给createExtension方法建立Extension实例。若是建立成功,就会替换extensions中ExtensionId对应的value为新建立的Extension实例(替换以前是inProcessOfRegistration这个CountDownLatch),最后执行countDown,计数器变成0。若是建立失败呢?会抛出一个IllegalStateException异常或其余异常,收到异常后,会把ExtensionId对应的value变成对应的Throwable信息。那么若是putIfAbsent插入失败呢,也就是ExtensionId已经有对应的value了,会递归执行registerExtension从新注册,既然有值了为啥还要从新注册呢?由于对应的值有三种状况:Extension实例、Throwable、CountDownLatch。因此须要从新注册。工具
另外CountDownLatch必定会有await,那么啥时候await呢。别急,还有findExtension没有分析呢。ui
/** * Returns any extension registered to the specified Extension or returns null if not registered */ @tailrec private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match { case c: CountDownLatch ⇒ c.await(); findExtension(ext) //Registration in process, await completion and retry case t: Throwable ⇒ throw t //Initialization failed, throw same again case other ⇒ other.asInstanceOf[T] //could be a T or null, in which case we return the null as T }
很显然,findExtension会对查询到的结果进行判断,若是是CountDownLatch就调用await进行等待,等待其余线程的registerExtension执行完毕,而后递归调用findExtension;若是其余线程注册完了返回异常,则此处也简单的抛出异常;若是返回其余类型的数据,则把它转化成T的一个实例,也就是咱们自定义的Extension,那若是返回null呢?那就返回null喽。this
至此registerExtension分析完毕,它以线程安全的方式保证Extension被加载一次,也就是createExtension方法只被调用一次。那么如何根据ActorSystem建立咱们自定义的Extension就很是灵活了。
咱们来看一下官网的例子。
class CountExtensionImpl extends Extension { //Since this Extension is a shared instance // per ActorSystem we need to be threadsafe private val counter = new AtomicLong(0) //This is the operation this Extension provides def increment() = counter.incrementAndGet() }
上面是咱们自定义的一个Extension,它很是简单,就是一个计数器,且increment()保证线程安全。
object CountExtension extends ExtensionId[CountExtensionImpl] with ExtensionIdProvider { //The lookup method is required by ExtensionIdProvider, // so we return ourselves here, this allows us // to configure our extension to be loaded when // the ActorSystem starts up override def lookup = CountExtension //This method will be called by Akka // to instantiate our Extension override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl /** * Java API: retrieve the Count extension for the given system. */ override def get(system: ActorSystem): CountExtensionImpl = super.get(system) }
上面是一个ExtensionId,还继承了ExtensionIdProvider,ExtensionIdProvider源码以下,其实就是用来查找ExtensionId的,这样就可以经过配置文件自动加载了。
/** * To be able to load an ExtensionId from the configuration, * a class that implements ExtensionIdProvider must be specified. * The lookup method should return the canonical reference to the extension. */ trait ExtensionIdProvider { /** * Returns the canonical ExtensionId for this Extension */ def lookup(): ExtensionId[_ <: Extension] }
能够看出createExtension就是new了一个CountExtensionImpl,没有把ExtendedActorSystem传给CountExtensionImpl。其实在稍微复杂点的Extension里面是能够接收ExtendedActorSystem参数的,有了对ExtendedActorSystem的引用,咱们就能够调用ExtendedActorSystem的全部公开的方法。若是你要问我ExtendedActorSystem都有哪些公开的方法或者说,有了ExtendedActorSystem能够作什么,我是拒绝回答的。有了ExtendedActorSystem你还不是想干啥就干啥?哈哈。