Spark Streaming fileStream实现原理

fileStream是Spark Streaming Basic Source的一种,用于“近实时”地分析HDFS(或者与HDFS API兼容的文件系统)指定目录(假设:dataDirectory)中新近写入的文件,dataDirectory中的文件须要知足如下约束条件:
 
(1)这些文件格式必须相同,如:统一为文本文件;
(2)这些文件在目录dataDirectory中的建立形式比较特殊:必须以原子方式被“移动”或“重命名”至目录dataDirectory中;
(3)一旦文件被“移动”或“重命名”至目录dataDirectory中,文件不能够被改变,例如:追加至这些文件的数据可能不会被处理。
 
之因此称之为“近实时”就是基于约束条件(2),文件的数据必须所有写入完成,而且被“移动”或“重命名”至目录dataDirectory中以后,这些文件才能够被处理。
 
调用示例以下:
 
 
directory:指定待分析文件的目录;
filter:用户指定的文件过滤器,用于过滤directory中的文件;
newFilesOnly:应用程序启动时,目录directory中可能已经存在一些文件,若是newFilesOnly值为true,表示忽略这些文件;若是newFilesOnly值为false,表示须要分析这些文件;
conf:用户指定的Hadoop相关的配置属性;
 
注:fileStream有另外两个重载方法,在此再也不赘述。
 
若是分析的文件是文本文件,Spark提供了一个便利的方法:
 
 
fileStream的实现原理是比较简单的:以固定的时间间隔(duration)不断地探测目录(dataDirectory),每次探测时将时间段(now - duration, now]内新写入的文件(即文件的最近修改时间处于时间区间(now - duration, now])封装为RDD交由Spark处理。
 
Spark Streaming有一个核心组件:DStream,fileStream的实现依赖于其中的一个实现类:FileInputDStream。
 
 
而FileInputDStream的核心逻辑就是探测文件、封装RDD,由方法compute(重写至DStream compute)实现,
 
 
compute方法的注释引出了一个很重要的问题:咱们为何须要维护一个最近已分析文件的列表?
 
假设探测目录为dataDirectory,探测时间间隔为duration,当前时间为now,则本次选择的文件须要知足条件:文件的最近修改时间须要处于区间(now - duration, now],此时文件可能有如下几个状态:
 
(1)文件的最后修改时间小于或等于now - duration;
(2)文件的最后修改时间处于区间(now - duration, now);
(3)文件的最后修改时间等于now;
(4)文件的最后修改时间大于now;
 
考虑第(3)种状况,文件的最后修改时间等于now,它有可能在探测以前已被移动至目录dataDirectory,或者在探测时或探测完成以后被移动至目录dataDirectory;若是是后二者,就可能会出现文件“丢失”的状况(即文件不被处理),由于下次探测的时间点为now + duration,探测的时间范围为(now, now + duration],最近修改时间等于now的文件已不处于该区间。为了不或减小文件“丢失”的状况,因此Spark Streaming fileStream容许将探测的时间范围向“前”扩展为(now - n * duration, now],以下所示:
 
 
ignore threshold:now - n * duration
current batch time:now
remember window:n * duration
 
也就是说,每一次探测时,咱们会选择文件的最后修改时间处于区间(ignore threshold, current batch time]的文件,但其中有些文件可能在前几回的探测中已经被分析,为了防止出现重复分析的状况,咱们须要记录时间区间(ignore threshold, current batch time](remember window)内已经被分析过的文件有哪些。
 
下面咱们来分析compute的处理流程:
 
1. 寻找新的文件;
 
 
 
(1)计算ignore threshold;
 
这一步有两个重要的变量须要说明:initialModTimeIgnoreThreshold和durationToRemember。
 
initialModTimeIgnoreThreshold
 
 
它的值与newFilesOnly有关,newFilesOnly表示Spark Streaming App刚刚启动时是否分析目录dataDirectory中已有的文件:
 
newFilesOnly == true:不须要分析目录dataDirectory中已有的文件,所以initialModTimeIgnoreThreshold的值被设置为“当前时间”,表示仅仅分析最近修改时间大于“当前时间”的文件;
 
newFilesOnly == false:须要分析目录dataDirectory中已有的文件,所以initialModTimeIgnoreThreshold的值被设置为0(文件的最近修改时间必大于0)。
 
durationToRemember
 
 
slideDuration:表示探测的时间间隔。
 
 
minRememberDurationS:默认值为60s,能够经过属性spark.streaming.fileStream.minRememberDuration进行修改。
 
 
 
经过上面的代码能够看出,durationToRemember = slideDuration * math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt,durationToRemember就是咱们前面提到的remember window,也就是说这个时间区间内的已分析文件会被记录。
 
 
ignore threshold取initialModTimeIgnoreThreshold、currentTime - durationToRemember.milliseconds的最大值,这也意味着即便newFilesOnly值为false,dataDirectory中的文件也不会被所有分析,只有最近修改时间大于currentTime - durationToRemember.milliseconds的文件才会被分析。
 
(2)建立过滤器实例;
 
 
过滤器实例实际就是Hadoop PathFilter实例,依赖于方法isNewFile构建,顾名思义这个过滤器实例是用来选取新文件的,新文件的标准须要知足如下四个条件:
 
 
a. 文件路径匹配用户指定的过滤器实例;
b. 文件的最近修改时间大于modTimeIgnoreThreshold;
c. 文件的最近修改时间小于或等于currentTime;
d. 文件尚没有被分析过,即文件没有出如今最近已分析文件的列表recentlySelectedFiles。
 
这里须要额外说明一下c,为何文件的最近修改时间不能大于currentTime?这主要是为了防止Spark Streaming应用重启时出现文件被重复分析的状况。
 
假设应用的终止时间为time,重启时间为time + 5 * duration,recentlySelectedFiles仅保存最近一个duration已经被分析过的文件,即保存的时间窗口为duration;应用重启以后,第一次探测的时间为time + duration,若是容许文件的最近修改时间大于currentTime(即time + duration),则最近修改时间处于时间区间(time, +∞)的文件将所有被分析,这些文件被记入recentlySelectedFiles;第二次探测的时间为time + 2 * duration,由于recentlySelectedFiles的时间窗口为duration,此时能够认为它的值已经被清空,若是容许文件的最近修改时间大于currentTime(即time + 2 * duration),则最近修改时间处于时间区间(time + 2 * duration, +∞)的文件将所有被分析,这种状况下能够看出最近修改时间处于时间区间(time + 2 * duration, +∞)的文件被重复分析;此外探测时间为time + 3 * duration、time + 4 * duration、time + 5 * duration也将出现相似文件被重复分析的状况。综上所述,每次探测文件时,文件的最近修改时间不能大于currentTime。
 
(3)获取知足过滤器实例条件的文件路径;
 
 
至此,寻找“新”文件的流程结束。
 
2. 将找到的新文件加入已分析文件列表;
 
 
recentlySelectedFiles中的过时数据是由方法clearMetadata负责清理的。
 
3. 将找到的新文件封装为RDD;
 
 
 
(1)遍历新文件(路径),将每个新文件(路径)经过SparkContext newAPIHadoopFile转换为一个RDD,最后造成一个RDD列表:fileRDDs;
(2)将fileRDDs转换为一个UnionRDD并返回;
 
至此,compute的整个处理流程结束。能够看出,整个流程中最为复杂的部分就是每次探测新文件的过程,特别是时间区间的选取以及最近已分析文件的缓存。
相关文章
相关标签/搜索