第10课:Spark Streaming源码解读之流数据不断接收全生命周期完全研究和思考

  

本期内容:apache

1, 数据接收架构设计模式设计模式

2, 数据接收源码完全研究架构

 

Spark Streaming应用程序有如下特色:ide

1,不断持续接收数据oop

3, Receiver和Driver不在同一节点中性能

 

接收数据,存储数据,汇报数据的metedata给Driverspa

数据接收的过程相似于MVC线程

Mode:Driver架构设计

View:Receiver设计

Control:ReceiverSupervisorImpl

Receiver的启动由ReceiverSupervisorImpl来控制,Receiver接收到数据交给ReceiverSupervisorImpl来存储。

RDD中的元素必需要实现序列化,才能将RDD序列化给Executor端。Receiver就实现了Serializable接口。

// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
  if (scheduledLocations.isEmpty) {
    ssc.sc.makeRDD(Seq(receiver), 1)
  } else {
    val preferredLocations = scheduledLocations.map(_.toString).distinct
    ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
  }

 

@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

 

处理Receiver接收到的数据,存储数据并汇报给Driver,Receiver是一条一条的接收数据的。

/**
 * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
 * which provides all the necessary functionality for handling the data received by
 * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
 * object that is used to divide the received data stream into blocks of data.
 */
private[streaming] class ReceiverSupervisorImpl(
    receiver: Receiver[_],
    env: SparkEnv,
    hadoopConf: Configuration,
    checkpointDirOption: Option[String]
  ) extends ReceiverSupervisor(receiver, env.conf) with Logging {

经过限定数据存储速度来实现限流

接收数据,合并成buffer,放入block队列

在ReceiverSupervisorImpl启动会调用BlockGenerator对象的start方法。

那么BlockGenerator类是用来干什么的呢?从源码上注释能够说明该类来把一个Receiver接收到的数据合并到一个Block而后写入到BlockManager中。该类内部有两个线程,一个是周期性把数据生成一批对象,而后把先前的一批数据封装成Block。另外一个线程时把Block写入到BlockManager中。

 

BlockGenerator类继承自ReateLimiter类,说明咱们不能限定接收数据的速度,可是能够限定存储数据的速度,转过来就限定流动的速度。

BlockGenerator类有一个定时器(默认每200ms将接收到的数据合并成block)和一个线程(把block写入到BlockManager),200ms会产生一个Block,即1秒钟生成5个Partition。过小则生成的数据片中数据过小,致使一个Task处理的数据少,性能差。实际经验获得不要低于50ms。

那BlockGenerator是怎么被建立的?

 

BlockGenerator类中的定时器会回调updateCurrentBuffer方法。

Receiver不断的接收数据,BlockGenerator类经过一个定时器,把Receiver接收到的数据,把多条合并成Block,再放入到Block队列中。

 

运行在Executor端的ReceiverSupervisorImpl须要与Driver端的ReceiverRacker进行通讯,传递元数据信息metedata,其中ReceiverSupervisorImpl经过RPC的名称获取到ReceiverRacker的远程调用。

在ReceiverTracker调用start方法启动的时候,会以ReceiverTracker的名称建立RPC通讯体。ReceiverSupervisorImpl就是和这个RPC通讯体进行消息交互的。

在ReceiverTrackerEndpoint接收到ReceiverSupervisorImpl发送的注册消息,把其RpcEndpoint保存起来。

对应的Executor端的ReceiverSupervisorImpl也会建立Rpc消息通讯体,来接收来自Driver端ReceiverTacker的消息。

BlockGenerator类中的线程每隔10ms从队列中获取Block,写入到BlockManager中。

相关文章
相关标签/搜索