先贴下案例源码apache
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext} object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据 val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口 val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce words.print() // 打印结果 ssc.start() // 启动 ssc.awaitTermination() ssc.stop(true) } }
上文已经从源码分析到将Receiver做为RDD提交给Spark,高层调度器调度此JobSubmitted事件后,发送LaunchTask消息给Executor。本文主要聚焦executor上运行Receiversocket
了解Spark Core的读者都知道,Executor是运行在Worker所在的节点的。具体的进程名为CoarseGrainedExecutorBackend。固然,这是在Standalone集群模式下。ide
看下CoarseGrainedSchedulerBackend中,给Executor发送事件函数
// CoarseGrainedSchedulerBackend.scala line 244 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
CoarseGrainedExecutorBackend中的 receive方法。oop
// CoarseGrainedExecutorBackend.scala line 89 case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
launchTask,源码分析
实例化TaskRunner,此TaskRunner实现了Runnableui
提交给线程池执行this
// Executor.scala line 118 def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
TaskRunner,继承自Runnable。spa
run().net
反序列化Task;line 193
执行Task;line 213
// Executor.scala line 152 class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable{ // ... 一些成员变量初始化 // line 180 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 startGCTime = computeTotalGcTime() try { val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // line 193 task.setTaskMemoryManager(taskMemoryManager) // ... 一些代码 logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() var threwException = true val (value, accumUpdates) = try { val res = task.run( // line 213 taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) { throw new SparkException(errMsg) } else { logError(errMsg) } } } val taskFinish = System.currentTimeMillis() // ... 一些代码 val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() // ... 一些代码 val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { // ... 一些代码 } finally { runningTasks.remove(taskId) } } }
Task.run中 runTask。此处是ResultTask.scala
// Task.scala line 67 final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem) : (T, AccumulatorUpdates) = { context = new TaskContextImpl( stageId, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, metricsSystem, internalAccumulators, runningLocally = false) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try { (runTask(context), context.collectAccumulators()) // line 89 } finally { context.markTaskCompleted() try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask() // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may // not be strictly necessary, we should revisit whether we can remove this in the future. val memoryManager = SparkEnv.get.memoryManager memoryManager.synchronized { memoryManager.notifyAll() } } } finally { TaskContext.unset() } } }
ResultTask.runTask
反序列化出方法func和RDD;line 61;
执行此方法;line 66;而此时的方法就是SparkContext.scala line 1992:(context: TaskContext, iter: Iterator[T]) => cleanF(iter)
传入 TaskContext和一个Iterator[Receiver]参数,调用cleanF(iter)。
// ResultTask.scala line 57 override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) }
而cleanF(iter)就是 ReceiverTracker.scala line 564中的startReceiverFunc
再回顾下startReceiverFunc的定义。此方法就是一个包含了Receiver类型的迭代器,返回Unit的函数。
// ReceiverTracker.scala line 564 val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } }
此时,会循环获取此Receiver,实例化ReceiverSupervisorImpl。而后start。
至此,Receiver才真正的在Worker的Executor上执行。
这才是流处理的第一步,接受数据器已经启动。
下节将从源码分析数据是如何接收的。