第15课:Spark Streaming源码解读之No Receivers完全思考

本期内容:缓存

1,Direct Access分布式

2,Kafka函数

 

使用No Receiver有更强的控制度和语义一致性。接下来咱们以Kafka为例,讲解不使用Receiver而直接从Kafka的Broker上读取数据,这种方式成为Direct方式。ui

从Spark Streaming-Kafka包中主要有10个类,咱们先讲解KafkaRDD这个类,KafkaRDD继承RDD,其构造器中须要传入Kafka的配置信息(如Kafka的Broker集合),偏移量信息,每一个Topic的Partition的Leader信息,消息处理函数。this

咱们知道自定义RDD,须要实现RDD的抽象方法,那KafkaRDD是如何实现的呢?spa

**
 * :: DeveloperApi ::
 * Implemented by subclasses to compute a given partition.
 */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]对象

 

/**
 * Implemented by subclasses to return the set of partitions in this RDD. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getPartitions: Array[Partition]继承

 

getPartitions方法,从传入的偏移量信息中获取到每一个Topic的Partition的Leader信息,host和port,而后实例化KafkaRDDPartition对象。内存

Compute方法,若是偏移量from和util相等,则返回,不等则实例化KafkaRDDIterator对象。资源

 

其中KafkaRDDPartition继承Partition,封装了Partition的信息,如topic,偏移from和until,Broker的host和port信息。

 

KafkaRDDIterator继承NextIterator,具备iterator的特性,有getNext和hasNext方法,能够对数据迭代操做并计算。KafkaRDDIterator内部,以传入的KafkaParams参数构造了一个和Kafka集合交互的KafkaCluster对象,处理Partition的Leader发生变化时的具体处理办法,重写了getNext方法。

 

若是TaskContext中以前没有失败过,即attemptNumber为0,则直接以KafkaRDDPartition中的host和port信息来链接Kafka,返回SimpleConsumer(Kafak的简单消费者,备注Kafka中还有高级消费者的API);若是以前失败过,则找到该Partition新的Leader Broker信息,而后再进行链接。

 

若是迭代器iter为空或者没有数据,则调用consumer发送FetchRequestBuilder给Broker,获取到一批数据。接下来再次判断iter是否有数据,若是没有则表示以读取到指定的untilOffset了。若是iter有数据,也会对offset和untilOffset进行比较,若是当前消费的offset大于untilOffset,则返回。若是当消费的offset小于untilOffset,则更新当前请求的Offset值,并调用messageHandler来处理当前的数据。其中messageHandler就是用户传入的对读取到的数据具体操做的函数。

 

咱们编写Spark Streaming Kafka Direct应用程序时,会调用KafkaUtils的createDirectStream方法,来建立DirectKafkaInputDStream。内部先构造KafkaCluster对象,并调用getFromOffsets方法获取到起始offset信息。

 

getFromOffsets方法先从传入的KafkaParams中获取auto.offset.reset的值,若是没有设置,则以最大的偏移值来获取最新的数据。

 

DirectKafkaInputDStream继承InputDStream,最大重试次数为1,来确保语义一致性。

 

DirectKafkaInputDStream的compute方法生成RDD。

 

 

其中untilOffsets的值为clamp方法。从maxMessagePerPartition中获取。从配置文件中获取spark.streaming.kafka.maxRatePerPartition,比较maxRateLimitPerPartition和(limit / numPartitions)的值,并取最小值。其中sescPerBatch为传入的Duration的值,获得每个BatchDuration处理的最大Offset值,当前的偏移量与容许每一个Partition最大的消息处理量和该Partition当前的Offset值,这两个的最小值,做为untilOffsets。

 

Spark Streaming能够根据输入的数据流量和当前的处理流量进行比较。动态资源分配调整,能够经过spark.streaming.backpressure.enabled来设置。

 

 

总结:使用Kafka Direct方式没有缓存,不存在内存溢出的问题,而使用Kafka Receiver有缓存。Receiver和Executor绑定,不能分布式,而使用Kafka Direct,默认数据就在多个Executor上。采用Receiver方式,数据来不及处理,延迟屡次后Spark Streaming就有可能奔溃。采用Kafka Direct方式,一批数据处理完,再去取一批数据,不会形成Spark Streaming奔溃。Kafka Direct能够办到语义一致性,确保数据消费。

相关文章
相关标签/搜索