本期内容:多线程
解密Spark Streaming Job架构和运行机制架构
解密Spark Streaming 容错架构和运行机制框架
一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,并且发展前景广阔,加之Spark的生态系统,Streaming能够方便调用其余的诸如SQL,MLlib等强大框架,它必将一统天下。socket
Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。若是能够掌握Spark streaming这个复杂的应用程序,那么其余的再复杂的应用程序都不在话下了。这里选择Spark Streaming做为版本定制的切入点也是大势所趋。分布式
本节课经过从job和容错的总体架构上来考察Spark Streaming的运行机制。ide
用以前已有的最简单的例子:oop
// Socket来源的单词计数 // YY课堂:天天20:00现场授课频道68917580 val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) words.print() ssc.start()
跟踪源码能够发现:ui
在初始化 StreamingContext时,建立了以下几个对象:this
// StreamingContext.scala line 183 private[streaming] val scheduler = new JobScheduler(this)
而JobScheduler在初始化的时候,会初始化jobGenerator,且包含receiverTracker。spa
// JobScheduler.scala line 50 private val jobGenerator = new JobGenerator(this) // line 50 val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // 56
再看建立DStream的部分
// StreamingContext.scala line 327 def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } // StreamingContext.scala line 345 def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) // line 351 }
// SocketInputDStream.scala line 33 private[streaming] class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { // 这个方法是关键 def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } }
再看 ssc.start
// StreamingContext.scala line 594 def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() // line 610 } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
第610行,调用了scheduler.start,scheduler就是以前初始化是产生的JobScheduler。
// JobScheduler.scala line 62 def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) // line 80 inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
请看80行,将receiverTracker初始化:
// ReceiverTracker.scala line 101 private[streaming] class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging { private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } private val receivedBlockTracker = new ReceivedBlockTracker( ssc.sparkContext.conf, ssc.sparkContext.hadoopConfiguration, receiverInputStreamIds, ssc.scheduler.clock, ssc.isCheckpointPresent, Option(ssc.checkpointDir) )
调用receiverTracker.start和jobGenerator.star
// ReceiverTracker.scala line 148 /** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() // line 157 logInfo("ReceiverTracker started") trackerState = Started } }
launchReceivers()
// ReceiverTracker.scala line 413 private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() // 这个就是SocketInputDStream.getReceiver(),本例中是SocketReceiver ,见SocketInputDStream.scala line 34 rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) // line 423 }
看看StartAllReceivers是如何被消费的?
// ReceiverTracker.scala line 448 // Local messages case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // 尽可能负载均匀 for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) // 启动接收器,再也不进一步深究,有兴趣的能够继续查看源码 }
再回到JobScheduler.scala line 83,jobGenerator.start
// JobGenerator.scala line 79 def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
至此消息接收和Job生成器已启动。
在StreamingContext调用start方法的内部实际上是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,而且调用JobGenerator和ReceiverTacker的start方法
1.JobGenerator启动后会不断的根据batchDuration生成一个个的Job
2.ReceiverTracker启动后首先在Spark Cluster中启动Receiver(实际上是在Executor中先启动ReceiverSupervisor),在Receiver收到数据后会经过ReceiverSupervisor存储到Executor而且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会经过ReceivedBlockTracker来管理接受到的元数据信息
每一个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,至关于Runnable接口实例,此时要想运行Job须要提交给JobScheduler,在JobScheduler中经过线程池的方式找到一个单独的线程来提交Job到集群运行(实际上是在线程中基于RDD的Action触发真正的做业的运行)。
为何使用线程池呢?
1.做业不断生成,因此为了提高效率,咱们须要线程池;这和在Executor中经过线程池执行Task有殊途同归之妙;
2.有可能设置了Job的FAIR公平调度的方式,这个时候也须要多线程的支持。
第二部分:从容错架构的角度透视Spark Streaming
咱们知道DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操做就是在固定时间上操做RDD。因此从某种意义上而言,Spark Streaming的基于DStream的容错机制,实际上就是划分到每一次造成的RDD的容错机制,这也是Spark Streaming的高明之处。
RDD做为 分布式弹性数据集,它的弹性主要体如今:
1.自动的分配内存和硬盘,优先基于内存
2.基于lineage容错机制
3.task会指定次数的重试
4.stage失败会自动重试
5.checkpoint和persist 复用
6.数据调度弹性:DAG,TASK和资源管理无关。
7.数据分片的高度弹性
基于RDD的特性,它的容错机制主要就是两种:一是checkpoint,二是基于lineage(血统)的容错。通常而言,spark选择血统容错,由于对于大规模的数据集,作检查点的成本很高。可是有的状况下,不如说lineage链条过于复杂和冗长,这时候就须要作checkpoint。
考虑到RDD的依赖关系,每一个stage内部都是窄依赖,此时通常基于lineage容错,方便高效。在stage之间,是宽依赖,产生了shuffle操做,这种状况下,作检查点则更好。总结来讲,stage内部作lineage,stage之间作checkpoint。
后续的会有什么更深的内幕?且听下回分解。