Spark Streaming的内核机制。后台线程负责接收源数据的流,收到的数据保存到BlockManager,并按照时间间隔封装成批量RDD,在RDD上执行各类操做。数组
每种数据源提供本身的Receiver的实现,将接收的数据调用store方法保存下来,store方法调用ReceiverSupervisorImpl来保存真正的数据,ReceiverSupervisorImpl调用pushSingle等方法将数据添加到BlockGenerator。BlockGenerator启动RecurringTimer定时器保证按照程序设置的时间间隔触发spark streaming的任务执行,从BlockGenerator存储中取出已经接收到的全部数据并构造RDD数组。同时在这一系列过程当中经过BlockGeneratorListener传递消息。spa
每种子实时数据源须要实现本身的DStream的子类。线程
在reveiver/ BlockGenerator.scala中。负责接收DStream的数据并保存到blocksForPushing中。对于每种须要接收的实时流,都会启动一个BlockGenerator负责在后台默默的接收数据并保存到BlockManager中,同时产生Block并添加到blocksForPushing队列中。scala
BlockGernerator的默认的时间间隔是200ms,能够设置。定时器的代码以下:队列
/** Change the buffer to which single records are added to. */it
入参time是period的整数倍,time-period就是实时流RDD批次的序号。spark
private def updateCurrentBuffer(time: Long): Unit = {io
try {thread
var newBlock: Block = null后台
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
不少Receiver的基类,利用compute方法生成RDD[T],从receiverTracker中根据validTime获取当前batch时间的数据,通常存储在BlockManager中,包装成BlockRDD[T]。