SparkStreaming

Spark Streaming的内核机制。后台线程负责接收源数据的流,收到的数据保存到BlockManager,并按照时间间隔封装成批量RDD,在RDD上执行各类操做。数组

每种数据源提供本身的Receiver的实现,将接收的数据调用store方法保存下来,store方法调用ReceiverSupervisorImpl来保存真正的数据,ReceiverSupervisorImpl调用pushSingle等方法将数据添加到BlockGenerator。BlockGenerator启动RecurringTimer定时器保证按照程序设置的时间间隔触发spark streaming的任务执行,从BlockGenerator存储中取出已经接收到的全部数据并构造RDD数组。同时在这一系列过程当中经过BlockGeneratorListener传递消息。spa

每种子实时数据源须要实现本身的DStream的子类。线程

    1. BlockGenerator

在reveiver/ BlockGenerator.scala中。负责接收DStream的数据并保存到blocksForPushing中。对于每种须要接收的实时流,都会启动一个BlockGenerator负责在后台默默的接收数据并保存到BlockManager中,同时产生Block并添加到blocksForPushing队列中。scala

BlockGernerator的默认的时间间隔是200ms,能够设置。定时器的代码以下:队列

/** Change the buffer to which single records are added to. */it

入参timeperiod的整数倍,time-period就是实时流RDD批次的序号。spark

  private def updateCurrentBuffer(time: Long): Unit = {io

    try {thread

      var newBlock: Block = null后台

      synchronized {

        if (currentBuffer.nonEmpty) {

          val newBlockBuffer = currentBuffer

          currentBuffer = new ArrayBuffer[Any]

          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

          listener.onGenerateBlock(blockId)

          newBlock = new Block(blockId, newBlockBuffer)

        }

      }

 

      if (newBlock != null) {

        blocksForPushing.put(newBlock// put is blocking when queue is full

      }

    } catch {

      case ie: InterruptedException =>

        logInfo("Block updating timer thread was interrupted")

      case e: Exception =>

        reportError("Error in block updating thread", e)

    }

  }

    1. ReceiverInputDStream

不少Receiver的基类,利用compute方法生成RDD[T],从receiverTracker中根据validTime获取当前batch时间的数据,通常存储在BlockManager中,包装成BlockRDD[T]。

相关文章
相关标签/搜索