Spark中大量采用事件监听方式,实现driver端的组件之间的通讯。本文就来解释一下Spark中事件监听是如何实现的java
观察者模式和监听器在设计模式中有一个观察者模式,该模式创建一种对象与对象之间的依赖关系,一个对象状态发生改变时当即通知其余对象,其余对象就据此做出相应的反应。其中发生改变的对象称之为观察目标(也有叫主题的),被通知的对象称之为观察者,能够有多个观察者注册到一个观察目标中,这些观察者之间没有联系,其数量能够根据须要增减。编程
Spark-Core内部的事件框架实现了基于事件的异步化编程模式。它的最大好处是能够提高应用程序对物理资源的充分利用,能最大限度的压榨物理资源,提高应用程序的处理效率。缺点比较明显,下降了应用程序的可读性。Spark的基于事件的异步化编程框架由事件框架和异步执行线程池组成,应用程序产生的Event发送给ListenerBus,ListenerBus再把消息广播给全部的Listener,每一个Listener收到Event判断是否本身感兴趣的Event,如果,会在Listener独享的线程池中执行Event所对应的逻辑程序块。下图展现Event、ListenerBus、Listener、Executor的关系,从事件生成、事件传播、事件解释三个方面的视角来看。 设计模式
咱们从线程的视角来看,看异步化处理。异步化处理体如今事件传播、事件解释两个阶段,其中事件解释的异步化实现了咱们的基于事件的异步化编程。app
Spark-Core、Spark-Streaming采用了分类的思路(分而治之)进行管理,每一大类事件都有独自的Event、ListenerBus框架
Spark-Core的核心事件trait是SparkListenerEvent,Spark-Straming的核心事件trait是StreamingListenerEvent异步
下图是各类事件实体类:ide
咱们在定义事件须要注意哪些方面呢?咱们以SparkListenerTaskStart为例,分析一个事件拥有哪些特征。post
Spark-Core的核心监听triat是SparkListener,Spark-Streaming的核心监听triat StreamingListener,二者都表明了一类监听的抽象ui
下图是一些监听实体类:spa
监听器总线对象,Spark程序在运行的过程当中,Driver端的不少功能都依赖于事件的传递和处理,而事件总线在这中间发挥着相当重要的纽带做用。事件总线经过异步线程,提升了Driver执行的效率。Listener注册到ListenerBus对象中,而后经过ListenerBus对象来实现事件监听(相似于计算机与周边设备之间的关系)
其start方法直接启动一个dispatchThread,其核心逻辑就是不停地在一个事件队列eventQueue里取出事件,若是事件合法且LiverListenerBus没有被关停,就将事件通知给全部注册的listener中
其dispatch方法就是向事件队列里添加相应的事件。
ListenerBus用于管理全部的Listener,Spark-Core和Spark-Streaming公用相同的trait ListenerBus, 最终都是使用AsyncEventQueue类对Listener进行管理。
管理全部注册的Listener,为一类Listener建立一个惟一的AsyncEventQueue,广播Event到全部的Listener。默承认提供四类AsyncEventQueue分别为‘shared’、‘appStatus’、‘executorManagement’、‘eventLog’。目前Spark-Core并无放开类别设置,意谓着最多只能有上述四类,从设计的严谨上来说分类并非越多越好,每多一个类别,就会多一个AsyncEventQueue实例,每一个实例中会包含一个事件传播的线程,对系统的资源占用仍是比较多的。
private val listenerThread = new Thread(name) { setDaemon(true) //线程自己设为守护线程 override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { eventLock.acquire()//不断获取信号量,信号量减一,能获取到说明还有事件未处理 self.synchronized { processingEvent = true } try { val event = eventQueue.poll //获取事件, remove() 和 poll() 方法都是从队列中删除第一个元素(head)。 if (event == null) { // 此时说明没有事件,但仍是拿到信号量了,这说明stop方法被调用了 // 跳出while循环,关闭守护进程线程 if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } // 调用ListenerBus的postxToAll(event: E)方法 postxToAll(event) } finally { self.synchronized { processingEvent = false } } } } } }
private val started = new AtomicBoolean(false) private val stopped = new AtomicBoolean(false) //存放事件 private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent] // 表示队列中产生和使用的事件数量的计数器,这个信号量是为了不消费者线程空跑 private val eventLock = new Semaphore(0)
LiveListenerBus在SparkContext的setupAndStartListenerBus中被初始化,并调用start方法启动LiveListenerBus。
def start(): Unit = { if (started.compareAndSet(false, true)) { listenerThread.start() //启动消费者线程 } else { throw new IllegalStateException(s"$name already started!") }
中止LiveListenerBus,它将等待队列事件被处理,但在中止后丢掉全部新的事件。须要注意stop可能会致使长时间的阻塞,执行stop方法的线程会被挂起,直到全部的AsyncEventQueue(默认四个)中的dispatch线程都退出后执行stop主法的线程才会被唤醒。
def stop(): Unit = { if (!started.get()) { throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know // `stop` is called. // 释放一个信号量,但此时是没有事件的,从而listenerThread会拿到一个空事件,从而知道该中止了 eventLock.release() //而后等待消费者线程自动关闭 listenerThread.join() } else { // Keep quiet } }
采用广播的方式事件传播,这个过程很快,主线程只须要把事件传播给AsyncEventQueue便可,最后由AsyncEventQueue再广播给相应的Listener
def post(event: SparkListenerEvent): Unit = { if (stopped.get) { // Drop further events to make `listenerThread` exit ASAP logError(s"$name has already stopped! Dropping event $event") return } // 在事件队列队尾添加事件 // add()和offer()区别:二者都是往队列尾部插入元素,不一样的时候,当超出队列界限的时候,add()方法是抛出异常让你处理,而offer()方法是直接返回false val eventAdded = eventQueue.offer(event) if (eventAdded) { //若是成功加入队列,则在信号量中加一 eventLock.release() } else { // 若是事件队列超过其容量,则将删除新的事件,这些子类将被通知到删除事件。 onDropEvent(event) droppedEventsCounter.incrementAndGet() } val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { // Don't log too frequently 日志不要太频繁 // 若是上一次,队列满了EVENT_QUEUE_CAPACITY=1000设置的值,就丢掉,而后记录一个时间,若是一直持续丢掉,那么每过60秒记录一第二天志,否则日志会爆满的 if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() // 记录一个warn日志,表示这个事件,被丢弃了 logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }
图中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件来源,它们都是经过调用LiveListenerBus的post方法将消息提交给事件队列,每post一个事件,信号量就加一。
listenerThread不停的获取信号量,而后从事件队列中取出事件,取到事件,则调用postForAll把事件分发给已注册的监听器,不然,就是取到空事件,它明白这是事件总线搞的鬼,它调用了stop可是每post事件,从而中止事件总线线程。