(1) InputFormat接口正则表达式
用户须要实现该接口以指定输入文件的内容格式。该接口有两个方法app
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
}负载均衡
其中getSplits函数将全部输入数据分红numSplits个split,每一个split交给一个map task处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每一个record解析成key/value对。ide
Hadoop自己提供了一些InputFormat:函数
TextInputFormat
做为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型。oop
KeyValueTextInputFormat
一样用于读取文件,若是行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;若是没有分隔符,整行做为 key,value为空。性能
SequenceFileInputFormat
用于读取sequence file。 sequence file是Hadoop用于存储数据自定义格式的binary文件。它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;SequenceFileAsTextInputFormat,将key和value以Text类型读出。优化
SequenceFileInputFilter
根据filter从sequence文件中取得部分知足条件的数据,经过 setFilterClass指定Filter,内置了三种 Filter,RegexFilter取key值知足指定的正则表达式的记录;PercentFilter经过指定参数f,取记录行数%f==0的记录;MD5Filter经过指定参数f,取MD5(key)%f==0的记录。spa
NLineInputFormat
能够将文件以行为单位进行split,好比文件的每一行对应一个map。获得的key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。orm
MultipleInputs
用于多个数据源的join
(2)Mapper接口
用户需继承Mapper接口实现本身的Mapper,Mapper中必须实现的函数是
Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()通常是用来进行一些map()前的准备工做,map()则通常承担主要的处理工做,cleanup()则是收尾工做如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。
(3)Partitioner接口
用户需继承该接口实现本身的Partitioner以指定map task产生的key/value对交给哪一个reduce task处理,好的Partitioner能让每一个reduce task处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是
getPartition( K2 key, V2 value, int numPartitions)
该函数返回<K2 V2>对应的reduce task ID。
用户若是不提供Partitioner,Hadoop会使用默认的(其实是个hash函数)。
Partitioner如何使用
•实现Partitioner接口覆盖getPartition()方法
•Partitioner示例
public static class MyPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
}
}
Partitioner需求示例
•需求描述
•数据文件中含有省份
•须要相同的省份送到相同的Reduce里
•从而产生不一样的文件
•步骤
•实现Partitioner,覆盖getPartition
•根据省份字段进行切分
(4)Combiner
combine函数把一个map函数产生的<key,value>对(多个key, value)合并成一个新的<key2,value2>. 将新的<key2,value2>做为输入到reduce函数中,其格式与reduce函数相同。Combiner使得map task与reduce task之间的数据传输量大大减少,可明显提升性能。大多数状况下,Combiner与Reducer相同。
什么状况下可使用Combiner
•能够对记录进行汇总统计的场景,如求和。
•求平均数的场景就不可使用了
Combiner执行时机
•运行combiner函数的时机有可能会是merge完成以前,或者以后,这个时机能够由一个参数控制,即 min.num.spill.for.combine(default 3)
•当job中设定了combiner,而且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件以前运行
•经过这样的方式,就能够在spill很是多须要merge,而且不少数据须要作conbine的时候,减小写入到磁盘文件的数据数量,一样是为了减小对磁盘的读写频率,有可能达到优化做业的目的。
•Combiner也有可能不执行, Combiner会考虑当时集群的负载状况。
Combiner如何使用
•继承Reducer类
public static class Combiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
}
}
(5)Reducer接口
实现本身的Reducer,必须实现reduce函数
(6)OutputFormat
用户经过OutputFormat指定输出文件的内容格式,不过它没有split。每一个reduce task将其数据写入本身的文件,文件名为part-nnnnn,其中nnnnn为reduce task的ID。
public abstract class OutputFormat<K, V> {
/**
* 建立一个记录写入器
*/
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;
/**
* 检查结果输出的存储空间是否有效
*/
public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;
/**
* 建立一个任务提交器
*/
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;
}
TextOutputFormat,输出到纯文本文件,格式为 key + " " + value。NullOutputFormat,hadoop中的/dev/null,将输出送进黑洞。SequenceFileOutputFormat, 输出到sequence file格式文件。MultipleSequenceFileOutputFormat, MultipleTextOutputFormat,根据key将记录输出到不一样的文件。DBInputFormat和DBOutputFormat,从DB读取,输出到DB。