此文是从思惟导图中导出稍做调整后生成的,思惟脑图对代码浏览支持不是很好,为了更好阅读体验,文中涉及到的源码都是删除掉没必要要的代码后的伪代码,如需获取更好阅读体验可下载脑图配合阅读:node
此博文共分为四个部分:git
数据的产生与导入主要分为如下五个部分github
由 Receiver 的总指挥 ReceiverTracker 分发多个 job(每一个 job 有 1 个 task),到多个 executor 上分别启动 ReceiverSupervisor 实例数组
从ReceiverInputDStreams中获取Receivers,并把他们发送到全部的worker nodes:缓存
class ReceiverTracker { var endpoint:RpcEndpointRef= private def **launchReceivers**(){ // DStreamGraph的属性inputStreams val receivers=inputStreams.map{nis=> val rcvr=nis.getReceiver() // rcvr是对kafka,socket等接受数据的定义 rcvr } // 发送到worker endpoint.send(StartAllReceivers(receivers)) } }
目的地选择分两种状况:初始化选择和失败重启选择框架
class ReceiverTracker { // 分发目的地的计算 val schedulingPolicy= new ReceiverSchedulingPolicy() def receive{ // 首次启动 case StartAllReceivers(receivers) => ... // 失败重启 case RestartReceiver(receiver)=> ... } }
1. 选择最优executors位置异步
2. 遍历构造最终分发的excutorsocket
class ReceiverTracker { val schedulingPolicy= new ReceiverSchedulingPolicy() def receive{ // 首次启动 case StartAllReceivers(receivers) => // 1. 选择最优executors位置 val locations= schedulingPolicy.scheduleReceivers( receivers,getExecutors ) // 2. 遍历构造最终分发的excutor for(receiver<- receivers){ val executors = scheduledLocations( receiver.streamId) startReceiver(receiver, executors) } // 失败重启 case RestartReceiver(receiver)=> ... } }
1.获取以前的executorsoop
2. 计算新的excutor位置大数据
2.1 以前excutors可用,则使用以前的
2.2 以前的不可用则从新计算位置
3. 发送给worker重启receiver
class ReceiverTracker { val schedulingPolicy= new ReceiverSchedulingPolicy() def receive{ // 首次启动 case StartAllReceivers(receivers) => ... // 失败重启 case RestartReceiver(receiver)=> // 1.获取以前的executors val oldScheduledExecutors =getStoredScheduledExecutors( receiver.streamId ) // 2. 计算新的excutor位置 val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // 2.1 以前excutors可用,则使用以前的 oldScheduledExecutors } else { // 2.2 以前的不可用则从新计算位置 schedulingPolicy.rescheduleReceiver() // 3. 发送给worker重启receiver startReceiver( receiver, scheduledLocations) } }
策略选择由ReceiverSchedulingPolicy实现,默认策略是轮训(round-robin),在1.5版本以前是使用依赖 Spark Core 的 TaskScheduler 进行通用分发,
在1.5以前存在executor分发不均衡问题致使Job执行失败:
若是某个 Task 失败超过 spark.task.maxFailures(默认=4) 次的话,整个 Job 就会失败。这个在长时运行的 Spark Streaming 程序里,Executor 多失效几回就有可能致使 Task 失败达到上限次数了,若是某个 Task 失效一下,Spark Core 的 TaskScheduler 会将其从新部署到另外一个 executor 上去重跑。但这里的问题在于,负责重跑的 executor 多是在下发重跑的那一刻是正在执行 Task 数较少的,但不必定可以将 Receiver 分布的最均衡的。
策略代码:
val scheduledLocations =ReceiverSchedulingPolicy.scheduleReceivers(receivers,xecutors) val scheduledLocations =ReceiverSchedulingPolicy.rescheduleReceiver(receiver, ...)
将receiver列表转换为RDD
class ReceiverTracker { def receive{ ... startReceiver(receiver, executors) } def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]){ } } class ReceiverTracker { def startReceiver( ... val receiverRDD: RDD[Receiver] = if (scheduledLocations.isEmpty) { **ssc.sc.makeRDD(Seq(receiver), 1)** } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s" $receiverId") ... } }
将每一个receiver,spark环境变量,hadoop配置文件,检查点路径等信息传送给excutor的接收对象ReceiverSupervisorImpl
class ReceiverTracker { def startReceiver( ... val startReceiverFunc: Iterator[Receiver[_]]=>Unit= (iterator:Iterator)=>{ val receiver=iterator.next() val supervisor= new ReceiverSupervisoImpl( receiver, SparkEnv, HadoopConf, checkpointDir, ) supervisor.start(), supervisor.awaitTermination() } ... }
将前两部定义的rdd和fun从driver提交到excutor
class ReceiverTracker { def startReceiver( ... val future=ssc.sparkContext.submitJob( receiverRDD, startReceverFunc, ) ... } }
Executor的启动在Receiver类中定义,在ReceiverSupervisor类中调用,在Receiver的子类中实现
excutor中共须要启动两个线程
-1. 启动Receiver接收数据
- 2. 启动pushingThread定时推送数据到driver
class ReceiverSupervisor( receiver: Receiver, conf: sparkConf ){ def start() { onStart() startReceiver() } }
启动Receiver,开始接收数据
class ReceiverSupervisor( receiver: Receiver, conf: sparkConf ){ def start() { onStart() startReceiver() } // 1. 启动Receiver,开始接收数据 def startReceiver(){ receiverState=Started receiver.onStart() } }
启动pushTread,定时推送信息到driver
class ReceiverSupervisor( receiver: Receiver, conf: sparkConf ){ def start() { onStart() startReceiver() } // 1. 启动Receiver,开始接收数据 def startReceiver(){ receiverState=Started receiver.onStart() } } // 2. 启动pushTread,定时推送信息到driver def onStart() { registeredBlockGenerators.asScala.foreach { _.start() } } } // _.start() 的实现 class BlockGenerator{ def start(){ blockIntervalTimer.start() blockPushingThread.start() } }
启动 Receiver 实例,并一直 block 住当前线程
在1.5版本以前,一个job包含多个task,一个task失败次数失败超过4次后,整个Job都会失败,1.5版本以后一个job只包含一个task,而且添加了可重试机制,大大增长了job的活性
Spark Core 的 Task 下发时只会参考并大部分时候尊重 Spark Streaming 设置的 preferredLocation 目的地信息,仍是有必定可能该分发 Receiver 的 Job 并无在咱们想要调度的 executor 上运行。此时,在第 1 次执行 Task 时,会首先向 ReceiverTracker 发送 RegisterReceiver 消息,只有获得确定的答复时,才真正启动 Receiver,不然就继续作一个空操做,致使本 Job 的状态是成功执行已完成。固然,ReceiverTracker 也会另外调起一个 Job,来继续尝试 Receiver 分发……如此直到成功为止。
一个 Receiver 的分发 Job 是有可能没有完成分发 Receiver 的目的的,因此 ReceiverTracker 会继续再起一个 Job 来尝试 Receiver 分发。这个机制保证了,若是一次 Receiver 若是没有抵达预先计算好的 executor,就有机会再次进行分发,从而实如今 Spark Streaming 层面对 Receiver 所在位置更好的控制。
对 Receiver 的监控重启机制
上面分析了每一个 Receiver 都有专门的 Job 来保证分发后,咱们发现这样一来,Receiver 的失效重启就不受 spark.task.maxFailures(默认=4) 次的限制了。
由于如今的 Receiver 重试不是在 Task 级别,而是在 Job 级别;而且 Receiver 失效后并不会致使前一次 Job 失败,而是前一次 Job 成功、并新起一个 Job 再次进行分发。这样一来,无论 Spark Streaming 运行多长时间,Receiver 老是保持活性的,不会随着 executor 的丢失而致使 Receiver 死去。
// todo 阻塞,知道executor返回发送结果
class ReceiverTracker { def startReceiver( ... future.onComplete { case Success(_)=> ... case Failure())=> onReceiverJobFinish(receiverId) ... }}(ThreadUtils.sameThread) ... }
每一个 ReceiverSupervisor 启动后将立刻生成一个用户提供的 Receiver 实现的实例 —— 该 Receiver 实现能够持续产生或者持续接收系统外数据,好比 TwitterReceiver 能够实时爬取 twitter 数据 —— 并在 Receiver 实例生成后调用 Receiver.onStart()。
数据的接收由Executor端的Receiver实现,启动和中止须要子类实现,存储基类实现,供子类调用
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { // 启动和中止须要子类实现 def onStart() def onStop() // 【存储单条小数据】 def store(dataItem: T) {...} // 【存储数组形式的块数据】 def store(dataBuffer: ArrayBuffer[T]) {...} // 【存储 iterator 形式的块数据】 def store(dataIterator: Iterator[T]) {...} // 【存储 ByteBuffer 形式的块数据】 def store(bytes: ByteBuffer) {...} ... }
经过kafka去接收数据,
class KafkaInputDStream **extends Receiver**( _ssc : StreamingContext, kafkaParams : Map[String,String], topics : Map[String,Int], useReliableReceiver : Boolean storageLevel : StorageLevel ){ def onStart(){ } }
拼接kafka consumer所需参数
class KafkaInputDStream(){ def onStart(){ **// 1. 获取kafka参数** val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) } }
class KafkaInputDStream(){ // kafka连接器 var consumerConnector:ConsumerConnector def onStart(){ // 1. 获取kafka参数 val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) // 2. 连接到kafka val consumerConf= new ConsumerConfig(props) consumerConnector= Consumer.create(consumerConf) } }
class KafkaInputDStream(){ // kafka连接器 var consumerConnector:ConsumerConnector def onStart(){ // 1. 获取kafka参数 val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) // 2. 连接到kafka val consumerConf= new ConsumerConfig(props) consumerConnector= Consumer.create(consumerConf) // 3. 监听全部topic val topicMessageStreams= consumerConnector.createMessage() val executorPool=ThreadUtils. newDaemonFixedTreadPool( topics.values.sum, "kafkaMessageHandler" ) topicMessageStreams.values.foreach( streams=>streams.foreach{ stream=> executorPool.submit( new MessageHandler(stream) ) } ) } }
class KafkaInputDStream(){ // kafka连接器 var consumerConnector:ConsumerConnector def onStart(){ // 1. 获取kafka参数 val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) // 2. 连接到kafka val consumerConf= new ConsumerConfig(props) consumerConnector= Consumer.create(consumerConf) // 3. 监听全部topic val topicMessageStreams= consumerConnector.createMessage() val executorPool=ThreadUtils. newDaemonFixedTreadPool( topics.values.sum, "kafkaMessageHandler" ) topicMessageStreams.values.foreach( streams=>streams.foreach{ stream=> executorPool.submit( new MessageHandler(stream) ) } ) } // 4. 异步保存数据 class MessageHandler( stream:KafkaStream[K,V]) extends Runable{ def run{ val streamIterator=stream.iterator() while(streamIterator.hasNext()){ val msgAndMetadata= streamIterator.next() **store(** **msgAndMetadata.key,** **msgAndMetadata.message** **)** } } } } }
自定义的Receiver只须要继承Receiver类,并实现onStart方法里新拉起数据接收线程,并在接收到数据时 store() 到 Spark Streamimg 框架就能够了。
Receiver 在 onStart() 启动后,就将持续不断地接收外界数据,并持续交给 ReceiverSupervisor 进行数据转储
Receiver在调用store方法后,根据不一样的入参会调用ReceiverSupervisor的不一样方法。ReceiverSupervisor的方法由ReceiverSupervisorImpl实现
class Receiver { var supervisor:ReceiverSupervisor; // 1.单条数据 def strore(dataItem: T ){ supervisor.pushSigle(dataItem) } // 2. byte数组 def store(bytes : ByteBuffer){ supervisor.pushBytes(bytes,None,None) } // 3. 迭代器格式 def store(dataIterator : Iterator[T]){ supervisor.pusthIteratro(dataIterator) } // 4. ByteBuffer格式 def store(dataBuffer:ArrayBuffer[T]){ supervisor.pushArrayBuffer(dataBuffer) } }
调用ReceiverSupervisorImpl的pushSigle方法保存数据
class ReceiverSupervisorImpl { val defaultBlockGenerator= new BlockGenerator( blockGeneratorListener, streamId, env.conf ) def pushSinge(data:Any){ defaultBlockGenerator.addData(data) } }
先检查接收数据的频率,控制住频率就控制了每一个batch须要处理的最大数据量
就是在加入 currentBuffer 数组时会先由 rateLimiter 检查一下速率,是否加入的频率已经过高。若是过高的话,就须要 block 住,等到下一秒再开始添加。这里的最高频率是由 spark.streaming.receiver.maxRate (default = Long.MaxValue) 控制的,是单个 Receiver 每秒钟容许添加的条数。控制了这个速率,就控制了整个 Spark Streaming 系统每一个 batch 须要处理的最大数据量。
class BlockGenerator{ def addData(data:Any)={ // 1. 检查接收频率 waitToPush() } } class RateLimiter(conf:SparkConf){ val maxRateLimit= conf.getLong( "spark.streaming.receiver.maxRate", Long.MaxValue ) val rateLimiter=GuavaRateLimiter.create( maxRateLimit.toDouble ) def waitToPush(){ rateLimiter.acquire() } }
若是频率正常,则把数据添加到数组中,不然抛异常
class BlockGenerator{ var currentBuffer=new ArrayBuffer[Any] def addData(data:Any)={ // 1. 检查接收频率 waitToPush() // 2. 添加数据到currentBuffer synchronized{ if(state==Active){ currentBuffer+=data }else{ throw new SparkException{ "connot add data ..." } } } } }
3.1 清空currentBuffer
3.2 将block块放入blocakQueue
class BlockGenerator{ var currentBuffer=new ArrayBuffer[Any] // 定时器:定时更新currentBuffer val blockIntervalTimer= new RecurringTimer( clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator" ) // 保存block的数组大小,默认是10 val queueSize=conf.getInt( "spark.streaming.blockQueueSize",10) val blocksForPushing= new ArrayBlockingQueue[Block](queueSize) def addData(data:Any)={ // 1. 检查接收频率 waitToPush() // 2. 添加数据到currentBuffer synchronized{ currentBuffer+=data } def updateCurrentBuffer(timer:Long){ var newBlock:Block=null synchronized{ // 3.1 清空currentBuffer val newBlockBuffer=currentBuffer currentBuffer=new ArrayBuffer[Any] // 3. 2 将block块放入blocakQueue newBlock= new Block(id,newBlockBuffer) blocksForPushing.put(newBlock) } } } }
在初始化BlockGenerator时,启动一个线程去持续的执行pushBlocks方法。若是尚未生成blocks,则阻塞调用queue.poll去获取数据,若是已经存在blocks块,则直接queue.take(10)
class BlockGenerator{ var currentBuffer=new ArrayBuffer[Any] // 定时器:定时更新currentBuffer val blockIntervalTimer= new RecurringTimer( clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator" ) // 保存block的数组大小,默认是10 val queueSize=conf.getInt( "spark.streaming.blockQueueSize",10) val blocksForPushing= new ArrayBlockingQueue[Block](queueSize) // 推送block块 val blockPushingThread=new Thread(){ def run(){keepPushingBlocks()} } def addData(data:Any)={ // 1. 检查接收频率 waitToPush() // 2. 添加数据到currentBuffer synchronized{ currentBuffer+=data } def updateCurrentBuffer(timer:Long){ var newBlock:Block=null synchronized{ // 3.1 清空currentBuffer val newBlockBuffer=currentBuffer currentBuffer=new ArrayBuffer[Any] // 3. 2 将block块放入blocakQueue newBlock= new Block(id,newBlockBuffer) blocksForPushing.put(newBlock) } } def keepPushingBlocks(){ // **4.1 当block正在产时,等待其生成** while(areBlocksBeingGenerated){ Option(blocksForPushing.poll( waitingTime ) match{ case Some(block)=> pushBLock(block) case None => }) } // 4.2 block块已经生成 while(!blocksForPushing.isEmpty){ val block=blocksForPushing.take() pushBlock(block) } } } }
class ReceiverSupervisorImpl { def pushAndReportBlock { val blockStoreResult = **receivedBlockHandler.storeBlock**( blockId, receivedBlock ) } }
class ReceiverSupervisorImpl { def pushAndReportBlock { val blockStoreResult = receivedBlockHandler.**storeBlock**( blockId, receivedBlock ) val blockInfo = ReceivedBlockInfo( streamId, numRecords, metadataOption, blockStoreResult ) trackerEndpoint.askSync[Boolean](AddBlock(blockInfo)) } }
class ReceiverSupervisorImpl{ def pushBytes( bytes: ByteBuffer, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock( ByteBufferBlock(bytes), metadataOption, blockIdOption ) } }
class ReceiverSupervisorImpl{ def pushIterator( iterator: Iterator[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption) } }
class ReceiverSupervisorImpl{ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) } }
ReceivedBlockHandler 有两个具体的存储策略的实现:
(a) BlockManagerBasedBlockHandler,是直接存到 executor 的内存或硬盘
(b) WriteAheadLogBasedBlockHandler,是先写 WAL,再存储到 executor 的内存或硬盘
将数据存储交给blockManager进行管理,调用blockmanager的putIterator方法,由其实如今不一样excutor上的复制以及缓存策略。
class BlockManagerBasedBlockHandler( blockManager:BlockManager, storageLevel:StorageLevel )extends ReceivedBlockHandler{ def storeBlock(blockId,block){ var numRecords:Option[Long]=None val putSucceeded:Boolean = block match{ case ArrayBufferBlock(arrayBuffer)=> numRecords=Some(arrayBuffer.size) blockManager.putIterator( blockId, arrayBuffer.iterator, storageLevel, tellMaster=true ) case IteratorBlock(iterator)=> val countIterator= new CountingIterator(iterator) val putResult= **blockManager.putIterato**r( blockId, arrayBuffer.iterator, storageLevel, tellMaster=true ) numRecords=countIterator.count putResult case ByteBufferBlock(byteBuffer)=> blockManager.putBytes( blockId, new ChunkedBytedBuffer( byteBuffer.duplicate(), storageLevel, tellMaster=true ) ) // 报告给driver的信息:id和num BlockManagerBasedStoreResult( blockId, numRecords ) } } } // ChunkedBytedBuffer: 将byte数组分片 // byteBuffer.duplicate(): 复制
WriteAheadLogBasedBlockHandler 的实现则是同时写到可靠存储的 WAL 中和 executor 的 BlockManager 中;在二者都写完成后,再上报块数据的 meta 信息。
BlockManager 中的块数据是计算时首选使用的,只有在 executor 失效时,才去 WAL 中读取写入过的数据。
同其它系统的 WAL 同样,数据是彻底顺序地写入 WAL 的;在稍后上报块数据的 meta 信息,就额外包含了块数据所在的 WAL 的路径,及在 WAL 文件内的偏移地址和长度。
class WriteAheadLogBasedBlockHandler( blockManager: BlockManager, serializerManager: SerializerManager, streamId: Int, storageLevel: StorageLevel, conf: SparkConf, hadoopConf: Configuration, checkpointDir: String, clock: Clock = new SystemClock )extends ReceivedBlockHandler{ // 保存超时时间 blockStoreTimeout = conf.getInt( "spark.streaming.receiver. blockStoreTimeout",30).seconds // 写log类 val writeAheadLog=WriteAheadLogUtils. creatLogForReceiver( conf, checkpointDirToLogDir( checkpointDir, streamId, hadoopConf ) ) def storeBlock(){ // 1. 执行blockManager val serializedBlock = block match {...} // 2. 执行保存到log // 用future异步执行 val storeInBlockManagerFuture=Future{ blockManger.putBytes(...serializedBlock) } val storeInWriteAheadLogFuture=Future{ writeAheadLog.write(...serializedBlock) } val combineFuture= storeInBlockManagerFuture.zip( storeInWriteAHeadLogFuture ).map(_._2) val walRecordHandle=ThreadUtils. awaitUtils.awaitResult( combineFuture,blockStoreTimeout ) WriteAheandLogBasedStoreResult( blockId, numRecords, walRecordHandle ) } } // future1.zip(future2): 合并future,返回tuple(future) // 两个future中有一个失败,则失败
每次成块在 executor 存储完毕后,ReceiverSupervisor 就会及时上报块数据的 meta 信息给 driver 端的 ReceiverTracker;这里的 meta 信息包括数据的标识 id,数据的位置,数据的条数,数据的大小等信息
ReceiverSupervisor会将数据的标识ID,数据的位置,数据的条数,数据的大小等信息上报给driver
class ReceiverSupervisorImpl { def pushAndReportBlock { val blockStoreResult = receivedBlockHandler.storeBlock( blockId, receivedBlock ) val blockInfo = ReceivedBlockInfo( **streamId,** **numRecords,** **metadataOption,** **blockStoreResult** ) trackerEndpoint.askSync[Boolean](AddBlock(blockInfo)) } }
// 报告给driver的信息:blockId,block数量,walRecordHandle
WriteAheandLogBasedStoreResult( blockId, numRecords, **walRecordHandle** )
// 报告给driver的信息:id和num
BlockManagerBasedStoreResult( blockId, numRecords )
一方面 Receiver 将经过 AddBlock 消息上报 meta 信息给 ReceiverTracker,另外一方面 JobGenerator 将在每一个 batch 开始时要求 ReceiverTracker 将已上报的块信息进行 batch 划分,ReceiverTracker 完成了块数据的 meta 信息管理工做。
具体的,ReceiverTracker 有一个成员 ReceivedBlockTracker,专门负责已上报的块数据 meta 信息管理。
在 ssc.start() 时,将隐含地调用 ReceiverTracker.start();而 ReceiverTracker.start() 最重要的任务就是调用本身的 launchReceivers() 方法将 Receiver 分发到多个 executor 上去。而后在每一个 executor 上,由 ReceiverSupervisor 来分别启动一个 Receiver 接收数据
并且在 1.5.0 版本以来引入了 ReceiverSchedulingPolicy,是在 Spark Streaming 层面添加对 Receiver 的分发目的地的计算,相对于以前版本依赖 Spark Core 的 TaskScheduler 进行通用分发,新的 ReceiverSchedulingPolicy 会对 Streaming 应用的更好的语义理解,也能计算出更好的分发策略。
而且还经过每一个 Receiver 对应 1 个 Job 的方式,保证了 Receiver 的屡次分发,和失效后的重启、永活
ReceiverTracker:
RpcEndPoint 能够理解为 RPC 的 server 端,底层由netty提供通讯支持,供 client 调用。
ReceiverTracker 做为 RpcEndPoint 的地址 —— 即 driver 的地址 —— 是公开的,可供 Receiver 链接;若是某个 Receiver 链接成功,那么 ReceiverTracker 也就持有了这个 Receiver 的 RpcEndPoint。这样一来,经过发送消息,就能够实现双向通讯。
只接收消息不回复,除了错误上报消息是excutor发送的之外,其他都是driver的tracker本身给本身发送的命令,接收消息均在ReceiverTracker.receive方法中实现
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => ... } }
在 ReceiverTracker 刚启动时,发给本身这个消息,触发具体的 schedulingPolicy 计算,和后续分发
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => val scheduledLocations = schedulingPolicy. scheduleReceivers( receivers, getExecutors ) for (receiver <- receivers) { val executors = scheduledLocations( receiver. streamId ) updateReceiverScheduledExecutors( receiver. streamId, executors ) receiverPreferredLocations( receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) } case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => ... } }
当初始分发的 executor 不对,或者 Receiver 失效等状况出现,发给本身这个消息,触发 Receiver 从新分发
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... // 失败重启 case RestartReceiver(receiver)=> // 1.获取以前的executors val oldScheduledExecutors = getStoredScheduledExecutors( receiver.streamId ) // 2. 计算新的excutor位置 val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // 2.1 以前excutors可用,则使用以前的 oldScheduledExecutors } else { // 2.2 以前的不可用则从新计算位置 schedulingPolicy.rescheduleReceiver() // 3. 发送给worker重启receiver startReceiver( receiver, scheduledLocations) case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => ... } }
当块数据已完成计算再也不须要时,发给本身这个消息,将给全部的 Receiver 转发此 CleanupOldBlocks 消息
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => receiverTrackingInfos.values.flatMap( _.endpoint ).foreach( _.send(c) ) case UpdateReceiverRateLimit => ... case ReportError => ... } }
ReceiverTracker 动态计算出某个 Receiver 新的 rate limit,将给具体的 Receiver 发送 UpdateRateLimit 消息
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) } case ReportError => ... } }
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => reportError(streamId, message, error) } }
接收executor的消息,处理完毕后并回复给executor
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => ... case StopAllReceivers => ... } }
由 Receiver 在试图启动的过程当中发来,将回复容许启动,或不容许启动
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => val successful=registerReceiver( streamId, type, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
具体的块数据 meta 上报消息,由 Receiver 发来,将返回成功或失败
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => context.reply( addBlock(receivedBlockInfo) ) case DeregisterReceiver() => ... case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
executor发送的本地消息。在 ReceiverTracker stop() 的过程当中,查询是否还有活跃的 Receiver,返回全部或者的receiverId
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => context.reply( receiverTrackingInfos.filter( _._2.state != ReceiverState.INACTIVE ).keys.toSeq ) case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
查询全部excutors的信息给receiver
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => case AllReceiverIds => ... case GetAllReceiverInfo => context.reply( receiverTrackingInfos.toMap ) case StopAllReceivers => ... } }
在 ReceiverTracker stop() 的过程刚开始时,要求 stop 全部的 Receiver;将向全部的 Receiver 发送 stop 信息,并返回true
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) receiverTrackingInfos.values.flatMap( _.endpoint ).foreach { _.send(StopReceiver) } context.reply(true) } }
由 Receiver 发来,中止receiver,处理后,不管如何都返回 true
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => deregisterReceiver( streamId, message, error ) context.reply(true) case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
addBlock(receivedBlockInfo: ReceivedBlockInfo)方法接收到某个 Receiver 上报上来的块数据 meta 信息,将其加入到 streamIdToUnallocatedBlockQueues 里
class ReceivedBlockTracker{ // 上报上来的、但还没有分配入 batch 的 Block 块数据的 meta val streamIdToUnallocatedBlockQueues = new HashMap[Int, ReceivedBlockQueue] // WAL val writeResult= writeToLog( BlockAdditionEvent( receivedBlockInfo ) ) if(writeResult){ synchronized{ streamIdToUnallocatedBlockQueues. getOrElseUpdate( streamId, new ReceivedBlockQueue() )+= receivedBlockInfo } } }
JobGenerator 在发起新 batch 的计算时,将 streamIdToUnallocatedBlockQueues 的内容,以传入的 batchTime 参数为 key,添加到 timeToAllocatedBlocks 里,并更新 lastAllocatedBatchTime
class ReceivedBlockTracker{ // 上报上来的、已分配入 batch 的 Block 块数据的 meta,按照 batch 进行一级索引、再按照 receiverId 进行二级索引的 queue,因此是一个 HashMap: time → HashMap val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks:Map[ Int, Seq[ReceivedBlockInfo] ] ] // 记录了最近一个分配完成的 batch 是哪一个 var lastAllocatedBatchTime: Time = null // 收集全部未分配的blocks def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { // 判断时间是否合法:大于最近收集的时间 if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { // 从未分配队列中取出blocks val streamIdToBlocks = streamIds.map { streamId =>(streamId,getReceivedBlockQueue(streamId) .dequeueAll(x => true)) }.toMap val allocatedBlocks =AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { // 放入已分配队列 timeToAllocatedBlocks.put( batchTime, allocatedBlocks) // 更新最近分配的时间戳 lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } }
JobGenerator 在发起新 batch 的计算时,由 DStreamGraph 生成 RDD DAG 实例时,调用getBlocksOfBatch(batchTime: Time)查 timeToAllocatedBlocks,得到划入本 batch 的块数据元信息,由今生成处理对应块数据的 RDD
class ReceivedBlockTracker{ def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized { timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty) } }
当一个 batch 已经计算完成、能够把已追踪的块数据的 meta 信息清理掉时调用,将经过job清理 timeToAllocatedBlocks 表里对应 cleanupThreshTime 以前的全部 batch 块数据 meta 信息
class ReceivedBlockTracker{ def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { val timesToCleanup = timeToAllocatedBlocks.keys. filter { _ < cleanupThreshTime }.toSeq} if (writeToLog( BatchCleanupEvent(timesToCleanup))) { // 清除已分配batch队列 timeToAllocatedBlocks --= timesToCleanup // 清除WAL writeAheadLogOption.foreach( _.clean( cleanupThreshTime.milliseconds, waitForCompletion) ) } }
脑图制做参考:https://github.com/lw-lin/CoolplaySpark
完整脑图连接地址:https://sustblog.oss-cn-beijing.aliyuncs.com/blog/2018/spark/srccode/spark-streaming-all.png