MapReduce编程二

(1) InputFormat接口正则表达式

用户须要实现该接口以指定输入文件的内容格式。该接口有两个方法app

public interface InputFormat<K, V> {
 
     InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 
     RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
 
}负载均衡

其中getSplits函数将全部输入数据分红numSplits个split,每一个split交给一个map task处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每一个record解析成key/value对。ide

Hadoop自己提供了一些InputFormat:函数

TextInputFormat
做为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型。oop

KeyValueTextInputFormat
一样用于读取文件,若是行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;若是没有分隔符,整行做为 key,value为空。性能

SequenceFileInputFormat
用于读取sequence file。 sequence file是Hadoop用于存储数据自定义格式的binary文件。它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;SequenceFileAsTextInputFormat,将key和value以Text类型读出。优化

SequenceFileInputFilter
根据filter从sequence文件中取得部分知足条件的数据,经过 setFilterClass指定Filter,内置了三种 Filter,RegexFilter取key值知足指定的正则表达式的记录;PercentFilter经过指定参数f,取记录行数%f==0的记录;MD5Filter经过指定参数f,取MD5(key)%f==0的记录。spa

NLineInputFormat
能够将文件以行为单位进行split,好比文件的每一行对应一个map。获得的key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。orm

MultipleInputs

用于多个数据源的join

(2)Mapper接口
用户需继承Mapper接口实现本身的Mapper,Mapper中必须实现的函数是

Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()通常是用来进行一些map()前的准备工做,map()则通常承担主要的处理工做,cleanup()则是收尾工做如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。

(3)Partitioner接口
用户需继承该接口实现本身的Partitioner以指定map task产生的key/value对交给哪一个reduce task处理,好的Partitioner能让每一个reduce task处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是
getPartition(  K2   key, V2 value, int numPartitions)
该函数返回<K2 V2>对应的reduce task ID。
用户若是不提供Partitioner,Hadoop会使用默认的(其实是个hash函数)。

Partitioner如何使用
•实现Partitioner接口覆盖getPartition()方法
•Partitioner示例
        public static class MyPartitioner extends Partitioner<Text, Text> {
          
         @Override
            public int getPartition(Text key, Text value, int numPartitions) {
             }
 
}
Partitioner需求示例
•需求描述
•数据文件中含有省份
•须要相同的省份送到相同的Reduce里
•从而产生不一样的文件
•步骤
•实现Partitioner,覆盖getPartition
•根据省份字段进行切分

(4)Combiner

combine函数把一个map函数产生的<key,value>对(多个key, value)合并成一个新的<key2,value2>. 将新的<key2,value2>做为输入到reduce函数中,其格式与reduce函数相同。Combiner使得map task与reduce task之间的数据传输量大大减少,可明显提升性能。大多数状况下,Combiner与Reducer相同。

什么状况下可使用Combiner
•能够对记录进行汇总统计的场景,如求和。
•求平均数的场景就不可使用了
Combiner执行时机
•运行combiner函数的时机有可能会是merge完成以前,或者以后,这个时机能够由一个参数控制,即 min.num.spill.for.combine(default 3)
•当job中设定了combiner,而且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件以前运行
•经过这样的方式,就能够在spill很是多须要merge,而且不少数据须要作conbine的时候,减小写入到磁盘文件的数据数量,一样是为了减小对磁盘的读写频率,有可能达到优化做业的目的。
•Combiner也有可能不执行, Combiner会考虑当时集群的负载状况。

Combiner如何使用
•继承Reducer类

public static class Combiner extends Reducer<Text, Text, Text, Text> {
       public void reduce(Text key, Iterator<Text> values,
               OutputCollector<Text, Text> output, Reporter reporter)
               throws IOException {
                 }
    }

(5)Reducer接口

实现本身的Reducer,必须实现reduce函数

(6)OutputFormat
用户经过OutputFormat指定输出文件的内容格式,不过它没有split。每一个reduce task将其数据写入本身的文件,文件名为part-nnnnn,其中nnnnn为reduce task的ID。

public abstract class OutputFormat<K, V> {  
  /** 
   * 建立一个记录写入器
   */ 
  public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;   
  /** 
   * 检查结果输出的存储空间是否有效
   */ 
  public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;  
  /**
   * 建立一个任务提交器
   */ 
  public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; 
}

TextOutputFormat,输出到纯文本文件,格式为 key + " " + value。NullOutputFormat,hadoop中的/dev/null,将输出送进黑洞。SequenceFileOutputFormat, 输出到sequence file格式文件。MultipleSequenceFileOutputFormat, MultipleTextOutputFormat,根据key将记录输出到不一样的文件。DBInputFormat和DBOutputFormat,从DB读取,输出到DB。

相关文章
相关标签/搜索