经过案例对SparkStreaming透彻理解-3

本期内容:多线程

  1. 解密Spark Streaming Job架构和运行机制架构

  2. 解密Spark Streaming 容错架构和运行机制框架

 

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,并且发展前景广阔,加之Spark的生态系统,Streaming能够方便调用其余的诸如SQL,MLlib等强大框架,它必将一统天下。socket

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。若是能够掌握Spark streaming这个复杂的应用程序,那么其余的再复杂的应用程序都不在话下了。这里选择Spark Streaming做为版本定制的切入点也是大势所趋。分布式

    本节课经过从job和容错的总体架构上来考察Spark Streaming的运行机制。ide

用以前已有的最简单的例子:oop

// Socket来源的单词计数
// YY课堂:天天20:00现场授课频道68917580
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala")
val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
words.print()
ssc.start()

 

跟踪源码能够发现:ui

在初始化 StreamingContext时,建立了以下几个对象:this

// StreamingContext.scala line 183
private[streaming] val scheduler = new JobScheduler(this)

 

而JobScheduler在初始化的时候,会初始化jobGenerator,且包含receiverTracker。spa

// JobScheduler.scala line 50
private val jobGenerator = new JobGenerator(this) // line 50
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()

// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null // 56

 

再看建立DStream的部分

// StreamingContext.scala line 327
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

// StreamingContext.scala line 345
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel) // line 351
}

 

 

// SocketInputDStream.scala line 33
private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  // 这个方法是关键
  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

 

再看 ssc.start

// StreamingContext.scala line 594
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()

          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            scheduler.start() // line 610
          }
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            scheduler.stop(false)
            state = StreamingContextState.STOPPED
            throw e
        }
        StreamingContext.setActiveContext(this)
      }
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      env.metricsSystem.registerSource(streamingSource)
      uiTab.foreach(_.attach())
      logInfo("StreamingContext started")
    case ACTIVE =>
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
  }
}

 

第610行,调用了scheduler.start,scheduler就是以前初始化是产生的JobScheduler。

// JobScheduler.scala line 62
def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc) // line 80
  inputInfoTracker = new InputInfoTracker(ssc)
  receiverTracker.start()
  jobGenerator.start()
  logInfo("Started JobScheduler")
}

 

请看80行,将receiverTracker初始化:

// ReceiverTracker.scala line 101
private[streaming]
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
  private val receiverInputStreamIds = receiverInputStreams.map { _.id }
  private val receivedBlockTracker = new ReceivedBlockTracker(
    ssc.sparkContext.conf,
    ssc.sparkContext.hadoopConfiguration,
    receiverInputStreamIds,
    ssc.scheduler.clock,
    ssc.isCheckpointPresent,
    Option(ssc.checkpointDir)
  )

 

调用receiverTracker.start和jobGenerator.star

// ReceiverTracker.scala line 148
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint(
      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers() // line 157
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}

 

launchReceivers()

// ReceiverTracker.scala line 413
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver() // 这个就是SocketInputDStream.getReceiver(),本例中是SocketReceiver ,见SocketInputDStream.scala line 34
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers)) // line 423
}

 

看看StartAllReceivers是如何被消费的?

// ReceiverTracker.scala line 448
// Local messages
case StartAllReceivers(receivers) =>
  val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // 尽可能负载均匀
  for (receiver <- receivers) {
    val executors = scheduledLocations(receiver.streamId)
    updateReceiverScheduledExecutors(receiver.streamId, executors)
    receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
    startReceiver(receiver, executors) // 启动接收器,再也不进一步深究,有兴趣的能够继续查看源码
  }

 

再回到JobScheduler.scala line 83,jobGenerator.start

// JobGenerator.scala line 79
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    startFirstTime()
  }
}

 

至此消息接收和Job生成器已启动。

 

在StreamingContext调用start方法的内部实际上是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,而且调用JobGenerator和ReceiverTacker的start方法

 

  1.JobGenerator启动后会不断的根据batchDuration生成一个个的Job

 

  2.ReceiverTracker启动后首先在Spark Cluster中启动Receiver(实际上是在Executor中先启动ReceiverSupervisor),在Receiver收到数据后会经过ReceiverSupervisor存储到Executor而且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会经过ReceivedBlockTracker来管理接受到的元数据信息

 

  每一个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,至关于Runnable接口实例,此时要想运行Job须要提交给JobScheduler,在JobScheduler中经过线程池的方式找到一个单独的线程来提交Job到集群运行(实际上是在线程中基于RDD的Action触发真正的做业的运行)。

 

  为何使用线程池呢?

 

   1.做业不断生成,因此为了提高效率,咱们须要线程池;这和在Executor中经过线程池执行Task有殊途同归之妙;

 

   2.有可能设置了Job的FAIR公平调度的方式,这个时候也须要多线程的支持。

 

  第二部分:从容错架构的角度透视Spark Streaming

 

  咱们知道DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操做就是在固定时间上操做RDD。因此从某种意义上而言,Spark Streaming的基于DStream的容错机制,实际上就是划分到每一次造成的RDD的容错机制,这也是Spark Streaming的高明之处。

 

  RDD做为 分布式弹性数据集,它的弹性主要体如今:

 

  1.自动的分配内存和硬盘,优先基于内存

 

  2.基于lineage容错机制

 

  3.task会指定次数的重试

 

  4.stage失败会自动重试

 

  5.checkpoint和persist 复用

 

  6.数据调度弹性:DAG,TASK和资源管理无关。

 

  7.数据分片的高度弹性

 

  基于RDD的特性,它的容错机制主要就是两种:一是checkpoint,二是基于lineage(血统)的容错。通常而言,spark选择血统容错,由于对于大规模的数据集,作检查点的成本很高。可是有的状况下,不如说lineage链条过于复杂和冗长,这时候就须要作checkpoint。

 

  考虑到RDD的依赖关系,每一个stage内部都是窄依赖,此时通常基于lineage容错,方便高效。在stage之间,是宽依赖,产生了shuffle操做,这种状况下,作检查点则更好。总结来讲,stage内部作lineage,stage之间作checkpoint。

后续的会有什么更深的内幕?且听下回分解。

相关文章
相关标签/搜索