InputFormat加载数据

InputFormat是一个抽象类,其定义以下:app

public abstract class InputFormat<K, V> {
	public abstract List<InputSplit> getSplits(JobContext context)
			throws IOException, InterruptedException;
	public abstract 
    	RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

InputFormat会对数据进行两方面的处理:ide

  • 对输入数据进行逻辑切分,造成一个个split
  • 针对每一个split,新建一个RecorReader读取split里面的数据,造成一个个<Key,Value>形式的记录(record),做为Map任务的输入。

运行做业的客户端经过调用getSplits()计算分片,而后将它们发送到application master,application master使用其存储位置信息来调度map任务从而在集群上处理这些分片数据。map任务把输入分片传给InputFormat的createRecordReader()方法来得到这个分片的RecordReader。RecordReader就像是记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,而后在传递给map函数。Mapper的run方法以下:函数

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
     	while (context.nextKeyValue()) {
        	map(context.getCurrentKey(), context.getCurrentValue(), context);
     	}
    } finally {
     	cleanup(context);
    }
  }

另外,输入格式的类型也由具体的输入格式进行设置。根据数据源的不一样,InputFormat由几个不一样的子类:oop

  • FileInputFormat,用于读取普通文件
  • DBInputFormat,用于从SQL table中读取输入数据
  • 其余

类结构层次以下学习

这里主要学习FileInputFormat。(关于split的介绍见split简介.md)code

1. FileInputFormat

FileInputFormat,也是一个抽象类。是全部使用文件做为其数据源的InputFormat实现的基类。FileInputFormat提供两个功能:orm

  • 指定做业的输入文件位置
  • 为输入文件生成分片
1. 输入文件路径指定

FileInputFormat类中提供了四种静态方法来设定Job的输入路径:blog

public static void addInputPath(Job job,Path path) throws IOException
public static void addInputPaths(Job job, String commaSeparatedPaths) throws IOException
public static void setInputPaths(Job job, Path... inputPaths) throws IOException
public static void setInputPaths(Job job,String commaSeparatedPaths) throws IOException

其中addInputPath()和addInputPaths()方法能够将一个或多个路径加入路径列表。setInputPaths()方法一次设定完整的路径列表(会替换前面设置的全部输入路径)。继承

一条路径能够表示一个文件、一个目录或是一个glob(文件和目录的集合)。路径时目录的话,表示要包含这个目录下的全部文件。但当路径为一个目录时,其内容不会被递归处理:若是目录中包含子目录,这些子目录也会被看成文件进行处理,从而引起错误。要进行过滤,可使用setInputPathFilter方法加过滤。递归

2. 分片

FileInputFormat中实现了父类InputFormat的getSplits方法,用于将文件在逻辑上分片,获取一个个的split。

FileInputFormat只分割大文件(超过HDFS块大小)。分片一般与HDFS块大小同样,这个值能够经过设置不一样的Hadoop属性来改变。计算公式为:

max(minimumSize, min(maximumSize, blockSize))

minimumSize表示一个分片中最少的有效字节数,默认为1。maximumSize表示一个分片中最大的有效字节数,默认为Long.MAX_VALUE。blockSize是HDFS的块大小,Hadoop2.0中默认是128M。所以,默认的分片大小就是128M。(split与dataBlock的区别见InputSplit.md)

3. 读取分片数据

FileInputFormat中并无具体实现父类中的createRecordReader方法,FileInputFormat只定义了数据的来源,以及如何将数据分片,可是对于如何解析分片,如何将split中的数据转换为一条条<Key,Value>形式的记录,并无一个肯定的规则。即createRecordReader方法须要由它的子类来具体实现。经常使用的子类实现是TextInputFormat。

1.1 TextInputFormat

TextInputFormat是一个实体类,继承自FileInputFormat,也是默认(没有显式设置的时候)的输入格式。对于split的处理,它产生的键类型是LongWritable,存储该行在整个文件中的偏移;值类型是Text,保存的是这行数据。这是经过在TextInputFormat内部建立了一个LineRecordReader实现的,以下:

@Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

CombineFileInputFormat

避免切分

1.2 其余FileInputFormat实现

KeyValueTextInputFormat

TextInputFormat的键,即每一行在文件中的字节偏移量,一般并非特别有用。一般状况下,文件中的每一行是一个键值对,并使用某个分界符进行分隔,好比制表符。例如由TextOutputFormat(Hadoop默认的OutputFormat)产生的输出就是这种。要正确处理这类文件,使用KeyValueTextInputFormat比较合适。

能够经过mapreduce.input.keyvaluelinerecordreader.key.value.spearator属性来指定分隔符(默认是制表符)。

NLineInputFormat

经过TextInputFormat和KeyValueTextInputFormat,每一个mapper收到的输入行数不一样(行的长度不一样)。若是但愿mapper收到固定行数的输入,须要将NLineInputFormat做为InputFormat使用。与TextInputFormat同样,键是文件中行的字节偏移量,值是行自己。

本地写的Markdown放上来仍是不方便,格式什么的都要再调整一下,有时间得把这个整一下,或者本身搭一个?

相关文章
相关标签/搜索