目前平台使用Kafka + Flume的方式进行实时数据接入,Kafka中的数据由业务方负责写入,这些数据一部分由Spark Streaming进行流式计算;另外一部分数据则经由Flume存储至HDFS,用于数据挖掘或机器学习。HDFS存储数据时目录的最小逻辑单位为“小时”,为了保证数据计算过程当中的数据完整性(计算某个小时目录中的数据时,该目录的数据所有写入完毕,且再也不变化),咱们在Flume中加入了以下策略:
每五分钟关闭一次正在写入的文件,即新建立文件进行数据写入。
这样的方式能够保证,当前小时的第五分钟以后就能够开始计算上一小时目录中的数据,必定程度上提升了离线数据处理的实时性。
随着业务的增长,开始有业务方反馈:“HDFS中实际被分析的数据量很小,可是Spark App的Task数目却至关多,不太正常”,咱们跟进以后,发现问题的根源在于如下三个方面:
(1)Kafka的实时数据写入量比较小;
(2)Flume部署多个实例,同时消费Kafka中的数据并写入HDFS;
(3)Flume每五分钟会从新建立文件写入数据(如上所述);
这样的场景直接致使HDFS中存储着数目众多但单个文件数据量很小的状况,间接影响着Spark App Task的数目。
咱们以Spark WordCount为例进行说明,Spark版本为1.5.1。
假设HDFS目录“/user/yurun/spark/textfile”中存在如下文件:
这个目录下仅三个文件包含少许数据:part-0000五、part-000十、part-00015,数据大小均为6 Byte,其他文件数据大小均为0 Byte,符合小文件的场景。
注意:_SUCCESS至关于一个“隐藏”文件,实际处理时一般会被忽略。
常规实现
咱们使用SparkContext textFile完成数据输入,应用运行完成以后,经过Spark History Server的页面能够看到:应用执行过程当中,会产生一个Job,包含两个Stage,每一个Stage包含16个Task,也就是说,Task的总数目为32,以下图所示:
之因此每一个Stage包含16个Task,是由于目录中存有16个文本文件(_SUCCESS不参与计算)。
优化实现
在这个优化的版本中,咱们使用SparkContext newAPIHadoopFile完成数据输入,须要着重说明一下“org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat”,这个类能够将多个小文件合并生成一个Split,而一个Split会被一个Task处理,从而减小Task的数目。这个应用的执行过程当中,会产生两个Job,其中Job0包含一个Stage,一个Task;Job1包含两个Stage,每一个Stage包含一个Task,也就是说,Task的总数目为3,以下图所示:
能够看出,经过使用“org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat”能够很大程度上缓解小文件致使Spark App Task数目过多的问题。