SparkStreaming动态感知kafka某个topic下partition

说明SparkStreaming动态感知kafka某个topic下partition,需要对kafka版本区分来看。

kafka 0.8版本

结论:kafka 0.8版本和Spark Streaming结合的DirectStream这种形式的API里面,是不支持kafka新增分区或者topic检测的
所以如果想读取新的分区中的数据,那么就得重新启动Spark Streaming应用。
原因:DirectKafkaInputDStream#compute生成的KafkaRDD, 其partitions数与spark streaming启动时topic的partitions数一致,topic的partitions和offset保存在currentOffsets map变量中,这个变量在启动时初始化,后续不会根据topic的partition变化进行更新。所以导致kafka新增的partitions数据会丢失。
结合sparkstreaming源码梳理

  • 对于批处理的Spark Streaming任务来说,分区检测应该在每次job生成获取kafkaRDD,来给kafkaRDD确定分区数并且每个分区赋值offset范围的时候有牵扯,而这段代码就在DirectKafkaInputDStream#compute方法中。
  • 首先compute方法的第一行:
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
  • 这里面获取的是当前生成KafkaRDD每个分区消费的offset的最大值,那么我们需要进入latestLeaderOffsets进一步去看,可以发现下面一行代码:
    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
  • 这个是根据currentOffsets信息来获取最大的offset,由此此处继续深入发现,由于它只是根据currentOffsets信息来获取最大的offset,没有去感知新增的分区,所以Spark Streaming与kafka 0.8结合是不能动态感知分区的。

kafka 0.10版本

结论:kafka 0.10版本与SparkStreaming结合支持新增分区检测
类比上面0.8版本,我们也可以直接去看kafka 0.10这块的源码去检查,他是否会动态生成kafka分区。

  • 进入DirectKafkaInputDStream的compute,看到的第一行代码也是:
    val untilOffsets = clamp(latestOffsets())
  • 在latestOffsets里面,有了新的大陆:
    在这里插入图片描述 可以看到对接kafka0.10高版本中,有newPartitions新增进来了。