这一章要讲 Spark Streaming,讲以前首先回顾下它的用法,具体用法请参照《Spark Streaming 编程指南》。html
val ssc = new StreamingContext(sparkConf, Seconds(1));
// 得到一个DStream负责链接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);
// 对每一行数据执行Split操做
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// 输出结果
wordCounts.print();
ssc.start(); // 开始
ssc.awaitTermination(); // 计算完毕退出
复制代码
一、首先实例化一个 StreamingContext编程
二、调用 StreamingContext 的 socketTextStreambash
三、对得到的 DStream 进行处理网络
四、调用 StreamingContext 是 start 方法,而后等待socket
咱们看 StreamingContext 的 socketTextStream 方法吧。ide
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
复制代码
一、StoageLevel 是 StorageLevel.MEMORY_AND_DISK_SER_2函数
二、使用 SocketReceiver 的 bytesToLines 把输入流转换成可遍历的数据this
继续看 socketStream 方法,它直接 new 了一个spa
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
复制代码
继续深刻挖掘 SocketInputDStream,追述一下它的继承关系,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。线程
具体实现 ReceiverInputDStream 的类有好几个,基本上都是从网络端来数据的。
它实现了 ReceiverInputDStream 的 getReceiver 方法,实例化了一个 SocketReceiver 来接收数据。
SocketReceiver 的 onStart 方法里面调用了 receive 方法,处理代码以下:
socket = new Socket(host, port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
复制代码
一、new 了一个 Socket 来接收数据,用 bytesToLines 方法把 InputStream 转换成一行一行的字符串。
二、把每一行数据用 store 方法保存起来,store 方法是从 SocketReceiver 的父类 Receiver 继承而来,内部实现是:
def store(dataItem: T) {
executor.pushSingle(dataItem)
}
复制代码
executor 是 ReceiverSupervisor 类型,Receiver 的操做都是由它来处理。这里先不深纠,后面咱们再说这个 pushSingle 的实现。
到这里咱们知道 lines 的类型是 SocketInputDStream,而后对它是一顿的转换,flatMap、map、reduceByKey、print,这些方法都不是 RDD 的那种方法,而是 DStream 独有的。
讲到上面这几个方法,咱们开始转入 DStream 了,flatMap、map、reduceByKey、print 方法都涉及到 DStream 的转换,这和 RDD 的转换是相似的。咱们讲一下 reduceByKey 和 print。
reduceByKey 方法和 RDD 同样,调用的 combineByKey 方法实现的,不同的是它直接 new 了一个 ShuffledDStream 了,咱们接着看一下它的实现吧。
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}
复制代码
在 compute 阶段,对经过 Time 得到的 rdd 进行 reduceByKey 操做。接下来的 print 方法也是一个转换:
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
复制代码
打印前十个,超过 10 个打印 "..."。须要注意 register 方法。
ssc.graph.addOutputStream(this)
复制代码
它会把代码插入到当前的 DStream 添加到 outputStreams 里面,后面输出的时候若是没有 outputStream 就不会有输出,这个须要记住哦!
前戏结束以后,ssc.start() 高潮开始了。 start 方法很小,最核心的一句是 JobScheduler 的 start 方法。咱们得转到 JobScheduler 方法上面去。
下面是 start 方法的代码:
def start(): Unit = synchronized {
  // 接受到JobSchedulerEvent就处理事件
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
}
复制代码
一、启动了一个 Actor 来处理 JobScheduler 的 JobStarted、JobCompleted、ErrorReported 事件。
二、启动 StreamingListenerBus 做为监听器。
三、启动 ReceiverTracker。
四、启动 JobGenerator。
咱们接下来看看 ReceiverTracker 的 start 方法。
def start() = synchronized {if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
receiverExecutor.start()
}
}
复制代码
一、首先判断了一下 receiverInputStreams 不能为空,那 receiverInputStreams 是怎么时候写入值的呢?答案在 SocketInputDStream 的父类 InputDStream 当中,当实例化 InputDStream 的时候会在 DStreamGraph 里面添加 InputStream。
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
ssc.graph.addInputStream(this)
//....
}
复制代码
二、实例化 ReceiverTrackerActor,它负责 RegisterReceiver(注册 Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销 Receiver)等事件的处理。
三、启动 receiverExecutor(实际类是 ReceiverLauncher,这名字起得。。),它主要负责启动 Receiver,start 方法里面调用了 startReceivers 方法吧。
private def startReceivers() {
   // 对应着上面的那个例子,getReceiver方法得到是SocketReceiver
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
// 查看是否全部的receivers都有优先选择机器,这个须要重写Receiver的preferredLocation方法,目前只有FlumeReceiver重写了
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// 建立一个并行receiver集合的RDD, 把它们分散到各个worker节点上
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// 在worker节点上启动Receiver的方法,遍历全部Receiver,而后启动
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException("Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
// 运行这个重复的做业来确保全部的slave都已经注册了,避免全部的receivers都到一个节点上
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// 把receivers分发出去,启动
ssc.sparkContext.runJob(tempRDD, startReceiver)
}
复制代码
一、遍历 receiverInputStreams 获取全部的 Receiver。
二、查看这些 Receiver 是否全都有优先选择机器。
三、把 SparkContext 的 makeRDD 方法把全部 Receiver 包装到 ParallelCollectionRDD 里面,并行度是 Receiver 的数量。
四、发个小任务给确保全部的 slave 节点都已经注册了(这个小任务有点儿莫名其妙,感受怪怪的)。
五、提交做业,启动全部 Receiver。
Spark 写得实在是太巧妙了,竟然能够把 Receiver 包装在 RDD 里面,当作是数据来处理!
启动 Receiver 的时候,new 了一个 ReceiverSupervisorImpl,而后调的 start 方法,主要干了这么三件事情,代码就不贴了。
一、启动 BlockGenerator。
二、调用 Receiver 的 OnStart 方法,开始接受数据,并把数据写入到 ReceiverSupervisor。
三、调用 onReceiverStart 方法,发送 RegisterReceiver 消息给 driver 报告本身启动了。
ok,到了这里,重点落到了 BlockGenerator。前面说到 SocketReceiver 把接受到的数据调用 ReceiverSupervisor 的 pushSingle 方法保存。
// 这是ReceiverSupervisorImpl的方法
def pushSingle(data: Any) {
blockGenerator += (data)
}
// 这是BlockGenerator的方法
def += (data: Any): Unit = synchronized {
currentBuffer += data
}
复制代码
咱们看一下它的 start 方法吧。
def start() {
blockIntervalTimer.start()
blockPushingThread.start()
}
复制代码
它启动了一个定时器 RecurringTimer 和一个线程执行 keepPushingBlocks 方法。
先看 RecurringTimer 的实现:
while (!stopped) {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
}
复制代码
每隔一段时间就执行 callback 函数,callback 函数是 new 的时候传进来的,是 BlockGenerator 的 updateCurrentBuffer 方法。
private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.put(newBlock)
}
} catch {case t: Throwable =>
reportError("Error in block updating thread", t)
}
}
复制代码
它 new 了一个 Block 出来,而后添加到 blocksForPushing 这个 ArrayBlockingQueue 队列当中。
提到这里,有两个参数须要你们注意的:
spark.streaming.blockInterval 默认值是200
spark.streaming.blockQueueSize 默认值是10
复制代码
这是前面提到的间隔时间和队列的长度,间隔时间默认是 200 毫秒,队列是最多能容纳 10 个 Block,多了就要阻塞了。
咱们接下来看一下 BlockGenerator 另外启动的那个线程执行的 keepPushingBlocks 方法到底在干什么?
private def keepPushingBlocks() {
    while(!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
   // ...退出以前把剩下的也输出去了
}
复制代码
它在把 blocksForPushing 中的 block 不停的拿出来,调用 pushBlock 方法,这个方法属于在实例化 BlockGenerator 的时候,从 ReceiverSupervisorImpl 传进来的 BlockGeneratorListener 的。
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}, streamId, env.conf)
复制代码
一、reportError,经过 actor 向 driver 发送错误报告消息 ReportError。
二、调用 pushArrayBuffer 保存数据。
下面是 pushArrayBuffer 方法:
def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}
复制代码
一、把 Block 保存到 BlockManager 当中,序列化方式为以前提到的 StorageLevel.MEMORY_AND_DISK_SER_2(内存不够就写入到硬盘,而且在 2 个节点上保存的方式)。
二、调用 reportPushedBlock 给 driver 发送 AddBlock 消息,报告新添加的 Block,ReceiverTracker 收到消息以后更新内部的 receivedBlockInfo 映射关系。
前面只讲了数据的接收和保存,那数据是怎么处理的呢?
以前一直讲 ReceiverTracker,而忽略了以前的 JobScheduler 的 start 方法里面最后启动的 JobGenerator。
def start(): Unit = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
复制代码
一、启动一个 actor 处理 JobGeneratorEvent 事件。
二、若是是已经有 CheckPoint 了,就接着上次的记录进行处理,不然就是第一次启动。
咱们先看 startFirstTime 吧,CheckPoint 之后再说吧,有点儿小复杂。
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
}
复制代码
一、timer.getStartTime 计算出来下一个周期的到期时间,计算公式:(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以当前的时间 / 除以间隔时间,再用 math.floor 求出它的上一个整数(即上一个周期的到期时间点),加上 1,再乘以周期就等于下一个周期的到期时间。
二、启动 DStreamGraph,启动时间 = startTime - graph.batchDuration。
三、启动 Timer,咱们看看它的定义:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
复制代码
到这里就清楚了,DStreamGraph 的间隔时间就是 timer 的间隔时间,启动时间要设置成比 Timer 早一个时间间隔,缘由再慢慢探究。
能够看出来每隔一段时间,Timer 给 eventActor 发送 GenerateJobs 消息,咱们直接去看它的处理方法 generateJobs 吧,中间忽略了一步,你们本身看。
private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
复制代码
下面是 generateJobs 方法。
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
复制代码
一、DStreamGraph 生成 jobs。
二、从 stream 那里获取接收到的 Block 信息。
三、调用 submitJobSet 方法提交做业。
四、提交完做业以后,作一个 CheckPoint。
先看 DStreamGraph 是怎么生成的 jobs。
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
jobs
}
复制代码
outputStreams 在这个例子里面是 print 这个方法里面添加的,这个在前面说了,咱们继续看 DStream 的 generateJob。
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
复制代码
一、调用 getOrCompute 方法得到 RDD
二、new 了一个方法去提交这个做业,缺什么都不作
为何呢?这是直接跳转的错误,呵呵,由于这个 outputStream 是 print 方法返回的,它应该是 ForEachDStream,因此咱们应该看的是它里面的 generateJob 方法。
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
复制代码
这里请你们千万要注意,不要在这块被卡住了。
咱们看看它这个 RDD 是怎么出来的吧。
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// 这个RDD已经被生成过了,直接用就是了
case Some(oldRDD) => Some(oldRDD)
// 还没生成过,就调用compte函数生成一个
case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
         // 设置保存的级别
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
}
         // 若是如今须要,就作CheckPoint
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
}
         // 添加到generatedRDDs里面去,能够再次利用
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
None
}
} else {
None
}
}
}
}
复制代码
从上面的方法能够看出来它是经过每一个 DStream 本身实现的 compute 函数得出来的 RDD。咱们找到 SocketInputDStream,没有 compute 函数,在父类 ReceiverInputDStream 里面找到了。
override def compute(validTime: Time): Option[RDD[T]] = {
// 若是出现了时间比startTime早的话,就返回一个空的RDD,由于这个极可能是master挂了以后的错误恢复
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
复制代码
经过 DStream 的 id 把 receiverTracker 当中把接收到的 block 信息所有拿出来,记录到 ReceiverInputDStream 自身的receivedBlockInfo 这个 HashMap 里面,就把 RDD 返回了,RDD 里面实际包含的是 Block 的 id 的集合。
如今咱们就能够回到以前 JobGenerator 的 generateJobs 方法,咱们就清楚它这句是提交的什么了。
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
复制代码
JobSet 是记录 Job 的完成状况的,直接看 submitJobSet 方法吧。
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
} else {
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
}
}
复制代码
遍历 jobSet 里面的全部 jobs,经过 jobExecutor 这个线程池提交。咱们看一下 JobHandler 就知道了。
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}
复制代码
一、通知 eventActor 处理 JobStarted 事件。
二、运行 job。
三、通知 eventActor 处理 JobCompleted 事件。
这里的重点是 job.run,事件处理只是更新相关的 job 信息。
def run() {
result = Try(func())
}
复制代码
在遍历 BlockRDD 的时候,在 compute 函数获取该 Block(详细请看 BlockRDD),而后对这个 RDD 的结果进行打印。
到这里就算结束了,最后来个总结吧,图例在下一章补上,这一章只是过程分析:
一、能够有多个输入,咱们能够经过 StreamingContext 定义多个输入,好比咱们监听多个(host,ip),能够给它们定义各自的处理逻辑和输出,输出方式不只限于 print 方法,还能够有别的方法,saveAsTextFiles 和 saveAsObjectFiles。这块的设计是支持共享 StreamingContext 的。
二、StreamingContext 启动了 JobScheduler,JobScheduler 启动 ReceiverTracker 和 JobGenerator。
三、ReceiverTracker 是经过把 Receiver 包装成 RDD 的方式,发送到 Executor 端运行起来的,Receiver 起来以后向 ReceiverTracker 发送 RegisterReceiver 消息。
三、Receiver 把接收到的数据,经过 ReceiverSupervisor 保存。
四、ReceiverSupervisorImpl 把数据写入到 BlockGenerator 的一个 ArrayBuffer 当中。
五、BlockGenerator 内部每一个一段时间(默认是 200 毫秒)就把这个 ArrayBuffer 构形成 Block 添加到 blocksForPushing 当中。
六、BlockGenerator 的另一条线程则不断的把加入到 blocksForPushing 当中的 Block 写入到 BlockManager 当中,并向 ReceiverTracker 发送 AddBlock 消息。
七、JobGenerator 内部有个定时器,按期生成 Job,经过 DStream 的 id,把 ReceiverTracker 接收到的 Block 信息从 BlockManager 上抓取下来进行处理,这个间隔时间是咱们在实例化 StreamingContext 的时候传进去的那个时间,在这个例子里面是 Seconds(1)。