输入数据概要html
输入数据一般驻留在较大的文件中,一般几十或者数百GB,甚至更大。MapReduce处理的基本原则之一是将输入数据分割成块。这些块能够在多台计算机上并行处理,在Hadoop的术语中这些块被称为输入分片(Input Split)。每一个分片应该足够小以实现更细粒度的并行。(若是全部的输入数据都在一个分片中,那就没有并行了。) 另外一方面,每一个分片也不能过小,不然启动与中止各个分片处理所需的开销将占去很大一部分执行时间。java
因此说:数据库
1、单个文件要足够的大,这样才能被分片,才会有并行。apache
2、分片大小不能过小,不然分片太多影响效率。api
并行处理切分输入数据的原则app
输入数据一般为单一的大文件,揭示了其背后Hadoop通用文件系统 (尤为是HDFS) 的一些设计策略。ide
例如,Hadoop的文件系统提供了FSDataInputStream类用于读取文件,而未采用Java中的java.io.DataInputStream FSDataInputStream扩展了DataInputStream以支持随机读,MapReduce须要这样特性,由于一台机器可能被指派从输入文件的中间开始处理一个分片。oop
若是没有随机访问,而须要从头开始一直读取到分片的位置,效率就会很是低。你还能够看到HDFS为了存储MapReduce并行切分和处理的数据所作的设计。优化
HDFS按块存储文件并分布在多台机器上。笼统而言,每一个文件块为一个分片。因为不一样的机器会存储不一样的块,若是每一个分片/块都由它所驻留的机器进行处理,就自动实现了并行。此外,因为HDFS在多个节点上复制数据块以实现可靠性,MapReducer能够选择任意一个包含分片/数据块副本的节点。this
1、必须支持随机访问。
2、每一个分片由它所驻留的机器进行处理,这样就实现了并行。
3、因为存在副本的缘由,MapReducer能够选择任意包含该分片的节点。
Hadoop默认的读入模式
MapReduce操做是基于键/值对的,Hadoop默认地将输入文件中的每一行视为一个记录,而键/值对分别为该行的字节偏移(key)和内容(value)。
你也许不会把全部的数据都这样记录,Hadoop也支持一些其余的数据格式,并容许自定义格式。
输入分片与记录边界
请注意,输入分片是一种记录的逻辑划分,而HDFS数据块是对输入数据的物理分割。当它们一致时,效率会很是高,但在实际应用中从未达到彻底一致。记录可能会跨过数据块的边界。Hadoop确保所有记录都被处理。处理特定分片的计算节点会从一个数据块中获取记录的一个片断,该数据块可能不是该记录的"主"数据块,而会存放在远端。为获取一个记录片断所需的通讯成本是微不足道的,由于它相对而言不多发生。
InputFormat接口
Hadoop分割与读取输入文件的方式被定义在InputFormat接口中
TextInputFormat是InputFormat的默认实现
从TextInputFormat返回的键为每行的字节偏移量,好像是不多用
InputFormat的其余经常使用实现
InputFormat |
描述 |
TextInputFormat |
在文本文件中的每一行均为一个记录。键 (key) 为一行的字节偏移,而值 (value)为一行的内容 Key: LongWritable Value: Text |
KeyValueTextInputFormat |
在文本文件中的每一行均为一个记录,以每行的第一个分隔符为界,分隔符以前的键(key),以后的是值(value)。分离器在属性key.value.separator.in.input.line中设定,默认为制表符(\t) Key: Text Value: Text |
SequenceFileInputFormat<K, V> |
用于读取序列文件的InputFormat。键和值由用户定义。序列文件为Hadoop专用的压缩二进制文件格式。它专用于一个MapReduce做业和其余MapReduce做业之间传送数据 Key: K (用户定义) Value: V (用户定义) |
NLineInputFormat |
与TextInputFormat相同,但每一个分片必定有N行。N在属性mapred.line.input.format.linespermap中设定,默认为1 Key:LongWritable Value: Text |
KeyValueTextInputFormat在更结构化的输入文件中使用,由一个预约义的字符,一般为制表符(\t),将每行 (记录) 的键与值分开。例如,一个以制表符分割的,由时间戳和URL组成的数据文件也须要是这样的:
17:16:18 http://hadoop.apache.org/core/docs/r0.19.0/api/index.html
17:16:19 http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html
...
你能够设置JobConf对象使用KeyValueTextInputFormat类读取这个文件:
conf.setInputFormat(KeyValueTextInputFormat.class);
由前面的示例文件可知,mapper读取的第一个记录将包含一个键 "17:16:18" 和一个值
"http://hadoop.apache.org/core/docs/r0.19.0/api/index.html"。而mapper读到的第二个记录将包含键"17:16:19"和值http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html... 如此递归。
以前,咱们在mapper中曾使用LongWritable和Text分别做为键(key)和值(value)的类型。在TextInputFormat中,由于值为用数字表示的偏移量,因此LongWritable是一个合理的键类型。而当使用KeyValueTextInputFormat时,不管是键和值都为Text类型,你必须改变Mapper的实现以及map()方法来适应这个新的键 (key) 类型
输入到MapReduce做业的数据未必都是些外部数据,实际上,一个MapReduce做业的输入经常是其余一些MapReduce的输出。
你能够自定义输出格式
默认的输出格式与KeyValueTextInputFormat可以读取的数据格式保存一致 (即记录中的每行均为一个由制表符分割的键和值)。不过, Hadoop提供了更加有效的二进制压缩文件格式,称为序列文件。这个序列文件为Hadoop处理作了优化,当连接多个MapReduce做业时,它是首选格式。读取序列文件的InputFormat类为SequenceFileInputFormat
生成一个定制的InputFormat——InputSplit和RecordReader
InputFormat是一个仅包含两个方法的接口。
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
这两个方法总结了InputStream需执行的两个功能:
1、肯定全部用于输入数据的文件,并将之分割为输入分片。每一个map任务分配一个分片
2、提供一个对象 (RecordReader),循环提取给定分片中的记录,并解析每一个记录为预约义类型的键与值
那么,谁去考虑如何将文件划分为分片呢?
1、FileInputFormat类主要负责分片
2、InputFormat类都是FileInputFormat类的子类
3、FileInputFormat实现了getSplits()方法,不过保留了getRecordReader()抽象让子类实现
4、FileInputFormat中实现的getSplits()把输入数据粗略地划分为一组分片,分片数目在numSplits中限定,且每一个分片得大小必须大于mapred.min.split.size个字节,但小于文件系统的块
5、在实际状况中,一个分片最终老是以一个块未大小,在HDFS中默认为64MB
FileInputFormat说明
1、FileInputFormat有必定数量的protected方法,子类能够经过覆盖改变其行为
2、其中一个就是isSplitable(FileSystem fs, Path fileName)方法,它检查你是否能够将给定文件分片,默认实现老是返回true,所以全部大于一个分块的文件都要分片。
3、有时你可能不想该文件分片,这时你就能够覆盖isSplittable()来返回false,例如一些文件压缩方案并不支持分割,一些数据处理操做,如文件转换,须要把每一个文件视为一个原子记录,也不能将之分片
RecordReader接口说明
这个接口负责把一个输入分片解析为记录,再把每一个记录解析为一个键/值对
public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;
K createKey();
V createValue();
long getPos() throws IOException;
public void close() throws IOException;
float getProgress() throws IOException;
}
咱们不用本身写RecordReader,仍是利用Hadoop所提供的类。
1、LineRecordReader实现RecordReader<LongWritable, Text>。它在TextInputFormat中被用于每次读取一行,以字节偏移做为键,以行的内容做为值。
2、而KeyValueLineRecordReader则被用在KeyValueTextInputFormat中
3、在大多数状况下,自定义RecordReader是基于现有实现的封装,并把大多数操做放在next()方法中。
自定义的TimeUrlTextInputFormat相似于KeyValueTextInputFormat,之前的键/值是Text/Text,如今是Text/URLWritable,URLWritable是自定义的
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> {
@Override
public RecordReader<Text, URLWritable> getRecordReader(
InputSplit input, JobConf job, Reporter reporter) throws IOException {
return new TimeUrlLineRecordReader(job, (FileSplit) input);
}
}
// 咱们的URLWritable类很是简单:
public class URLWritable implements Writable {
protected URL url;
public URLWritable() {}
public URLWritable(URL url) {
this.url = url;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(url.toString());
}
public void readFields(DataInput in) throw IOException {
url = new URL(in.readUTF());
}
public void set(String s) throws MalformedURLException {
url = new URL(s);
}
}
TimeUrlLineRecordReader实现
class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> {
private KeyValueLineRecordReader lineReader;
private Text lineKey, lineValue;
public TimeUrlLineRecordReader(JonConf job, FileSplit split) throws IOException {
lineReader = new KeyValueLineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
public boolean next(Text key, URLWritable value) throws IOException {
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key.set(lineKey);
value.set(lineValue.toString());
return true;
}
public Text createKey() {
return new Text("");
}
public URLWritable createValue() {
return new URLWritable();
}
public long getPos() throws IOException {
return lineReader.getPos();
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}
TImeUrlLineRecordReader类生成一个KeyValueRecordReader对象,并直接把getPos()、getProgress()以及close()方法调用传递给它。而next()方法将lineValueText对象转换为URLWritable类型
OurputFormat类是什么?
当MapReduce输出数据到文件时,使用的时OutputFormat类,它与InputFormat类类似,由于每一个reducer仅需将它的输出写入本身的文件中,输出无需分片。输出文件放在一个公用目录中,一般命名为part-nnnnn,这里nnnnn是reducer的分区ID。RecordWriter对象将输出结果进行格式化,而RecordReader对输入格式进行解析。
几乎咱们处理的全部OutputFormat类都是从FileOutputFormat抽象类继承来的
你能够经过调用JobConf对象中的setOutputFormat()定制OutputFormat
注意:你可能会奇怪,为何要将OutputFormat (InputFormat) 和 FileOutputFormat (FileInputFormat) 区分开,彷佛全部OutputFormat (InputFormat) 类都扩展了FileOutputFormat (FielInputFormat) 是否有不处理文件的OutFormat (InputFormat)类?没错,NullOutputFormat简单地实现了OutputFomat,并不须要继承FileOutputFormat。更重要的是,OutputFormat (InputFormat) 类处理的是数据库,并不是文件,并且在类的层次关系中OutputFormat (InputFormat) 类是区别于FileoutputFormat (FileInputFormat) 的一个独立分支。这些类有专门的应用,有兴趣的读者能够在网上搜寻在线Java文档进一步了解DBInputFormat和DBOutputFormat
主要的OutputFormat类,默认为TextOutputFormat
OutputFormat |
描述 |
TextOutputFormat<K, V> |
将每一个记录写为一行文本。键和值以字符串的形式写入,并以制表符(\t)分隔。这个分隔符能够在属性mapred.textoutputformat.separator中修改 |
SequenceFileOutputFormat<K, V> |
以Hadoop专有序列文件格式写入键/值对。与SequenceFileInputFormat配合使用 |
NullOutputFormat<K, V> |
无输出 |
默认的OutputFormat是TextOutputFormat,将每一个记录写为一行文本。每一个记录的键和值经过toString()被转换为字符串(string),并以制表符 (\t) 分隔。分隔符能够在mapred.textoutputformat.separator属性中修改。
1、TextOutputFormat采用可被KeyValueInputFormat识别的格式输出数据。
2、若是把键的类型设为NullWritable,它也能够采用可被TextInputFormat识别的输出格式,在这种状况下,在键/值对中没有键,也没有分隔符。
3、若是想彻底禁止输出,应该使用NullOutputFormat
4、若是让reducer采用本身的方式输出,而且不须要Hadoop写任何附加的文件,能够限制Hadoop的输出。
5、SequenceFileOutputFormat以序列文件格式输出数据,使其能够经过SequenceFileInputFormat来读取。它有助于经过中间数据结果将MapReduce做业串接起来
MapReduce程序的核心是Map和Reduce操做,还有其余的操做?
1、data spliting(数据分割)
2、shuffling(洗牌)
3、Partitining(分组)
4、Combining(合并)
5、Hadoop还支持采用不一样的格式读入和输出数据