第2课:经过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming

内容:网络

1,解密Spark Streaming运行机制架构

2,解密Spark Streaming架构并发

DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操做,转过来对其内部的RDD操做。socket

纵轴为空间维度:表明的是RDD的依赖关系构成的具体的处理逻辑的步骤,是用DStream来表示的。oop

横轴为时间维度:按照特定的时间间隔不断地生成job对象,并在集群上运行。spa

随着时间的推移,基于DStream Graph 不断生成RDD Graph ,也即DAG的方式生成job,并经过Job Scheduler的线程池的方式提交给spark cluster不断的执行。线程

由上可知,RDD    与  DStream的关系以下对象

RDD是物理级别的,而 DStream 是逻辑级别的继承

DStream是RDD的封装类,是RDD进一步的抽象队列

DStream 是RDD的模板。DStream要依赖RDD进行具体的数据计算

注意:纵轴维度须要RDD,DAG的生成模板,须要TimeLine的job控制器

横轴维度(时间维度)包含batch interval,窗口长度,窗口滑动时间等。

3,Spark Streaming源码解析

StreamingContext方法中调用JobSchedulerstart方法

JobGenerator的start方法中,调用startFirstTime方法,来开启定时生成Job的定时器

startFirstTime方法,首先调用DStreamGraph的start方法,而后再调用RecurringTimer的start方法。

timer对象为一个定时器,根据batchInterval时间间隔按期向EventLoop发送GenerateJobs的消息。

接收到GenerateJobs消息后,会回调generateJobs方法。

generateJobs方法再调用DStreamGraph的generateJobs方法生成Job

DStreamGraph的generateJobs方法

DStreamGraph的实例化是在StreamingContext中的

DStreamGraph类中保存了输入流和输出流信息

回到JobGenerator的start方法中receiverTracker.start()

其中ReceiverTrackerEndpoint对象为一个消息循环体

launchReceivers方法中发送StartAllReceivers消息

接收到StartAllReceivers消息后,进行以下处理

StartReceiverFunc方法以下,实例化Receiver监控者,开启并等待退出

supervisor的start方法中调用startReceiver方法

咱们以socketTextStream为例,其启动的是SocketReceiver,内部开启一个线程,来接收数据。

内部调用supervisor的pushSingle方法,将数据汇集后存放在内存中

supervisor的pushSingle方法以下,将数据放入到defaultBlockGenerator中,defaultBlockGenerator为BlockGenerator,保存Socket接收到的数据

BlockGenerator对象中有一个定时器,来更新当前的Buffer

BlockGenerator对象中有一个线程,来从阻塞队列中取出数据

调用ReceiverSupervisorImpl类中的继承BlockGeneratorListener的匿名类中的onPushBlock方法。

receivedBlockHandler对象以下

这里咱们讲解BlockManagerBasedBlockHandler的方式

trackerEndpoint以下

实际上是发送给ReceiverTrackerEndpoint类,

InputInfoTracker类的reportInfo方法只是对数据进行记录统计

其generateJob方法是被DStreamGraph调用

DStreamGraph的generateJobs方法是被JobGenerator类的generateJobs方法调用。

JobGenerator类中有一个定时器,batchInterval发送GenerateJobs消息

总结:

1,当调用StreamingContext的start方法时,启动了JobScheduler

2,当JobScheduler启动后会前后启动ReceiverTracker和JobGenerator

3,ReceiverTracker启动后会建立ReceiverTrackerEndpoint这个消息循环体,来接收运行在Executor上的Receiver发送过来的消息

4,ReceiverTracker在启动时会给本身发送StartAllReceivers消息,本身接收到消息后,向Spark提交startReceiverFunc的Job

5,startReceiverFunc方法中在Executor上启动Receiver,并实例化ReceiverSupervisorImpl对象,来监控Receiver的运行

6,ReceiverSupervisorImpl对象会调用Receiver的onStart方法,咱们以SocketReceiver为例,启动一个线程,链接Server,读取网络数据先调用ReceiverSupervisorImpl的pushSingle方法,

保存在BlockGenerator对象中,该对象内部有个定时器,放到阻塞队列blocksForPushing,等待内部线程取出数据放到BlockManager中,并发AddBlock消息给ReceiverTrackerEndpoint。

ReceiverTrackerEndpoint为ReceiverTracker的内部类,在接收到addBlock消息后将streamId对应的数据阻塞队列streamIdToUnallocatedBlockQueues中

7,JobGenerator启动后会启动以batchInterval时间间隔发送GenerateJobs消息的定时器

8,接收到GenerateJobs消息会前后触发ReceiverTracker的allocateBlocksToBatch方法和DStreamGraph的generateJobs方法

9,ReceiverTracker的allocateBlocksToBatch方法会调用getReceivedBlockQueue方法从阻塞队列streamIdToUnallocatedBlockQueues中根据streamId获取数据

10,DStreamGraph的generateJobs方法,继而调用变量名为outputStreams的DStream集合的generateJob方法

11,继而调用DStream的getOrCompute来调用具体的DStream的compute方法,咱们以ReceiverInputDStream为例,compute方法是从ReceiverTracker中获取数据

相关文章
相关标签/搜索