MapReduce程序本质上是并行运行的,所以能够将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心。网络
MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每阶段都以键-值对做为输入和输出。map函数是一个数据准备阶段,经过这种方式来准备数据,使reduce函数可以继续对它进行处理。除此以外,map函数仍是一个比较适合去除已损记录的地方。app
public class MyMapper extends MapReduceBase框架
implements Mapper<LongWritable, Text, Text, IntWritable> {ide
@override函数
public void map(LongWritable key, Text value, Context context) {oop
...... // 输入键值对处理过程性能
context.write(new Text(...), new IntWritable(...));优化
}spa
}设计
Mapper类是一个泛型类,有四个形参,分别指定map函数的输入键、输入值、输出键、输出值的类型。Hadoop自己提供了一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。map()方法还提供Context实例用于输出内容的写入,将数据按照输出键-值类型进行格式化便可。
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@override
public viod reduce(Text text, Iterable<IntWritable> values, Context context) {
...... // 输入键值对处理过程
context.write(key, new IntWritable(...));
}
}
reduce函数也有四个形式参数类型用于指定输入和输出类型。reduce函数的输入类型必须匹配map函数的输出类型。
public class MyApplication {
public static void main(String[] args) {
// ①
Job job = new Job();
job.setJarByClass(MyApplication.class);
job.estJobName(“My Application”);
// ②
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// ③
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// ④
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// ⑤
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
① Job对象指定做业执行规范,能够用来控制整个做业的运行。在Hadoop集群上运行这个做业时,要把代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。
② 构造Job对象后,须要指定输入和输出数据的路径。调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径,这个路径能够是单个的文件、一个目录(此时,将目录下全部文件当作输入)或符合特定文件模式的一系列文件。能够屡次调用addInputPath()来实现多路径的输入。调用FileOutputFormat类中的静态方法setOutputPath()来指定输出路径(只能有一个输出路径)。这个方法指定是reduce函数输出文件的写入目录。在运行做业前该目录是不该该存在的,不然Hadoop会报错并拒绝运行做业。这种预防措施的目的是防止数据丢失。
③ 经过setMapperClass()和serReducerClass()方法指定要用的map类型和reduce类型。
④ setOutputKeyClass()和setOutputValueClass()方法控制reduce函数的输出类型,而且必须和Reduce类产生的相匹配。map函数的输出类型默认状况下和reduce函数是相同的,所以若是mapper产生出和reducer相同的类型时,不须要单独设置。若是不一样,则必须经过setMapOutputKeyClass()和setMapOutputValueClass方法来设置map函数的输出类型。
⑤ Job中的waitForCompletion()方法提交做业并等待执行完成。该方法惟一的参数是一个标识,指示是否已生成详细输出。当标识为true时,做业会把其进度信息写到控制台。返回值是一个布尔值,表示执行的成败。
Hadoop将MapReduce的输入数据划分红等长的小数据块,称为输入分片(input split)或简称“分片”。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条记录。若是分片切分得过小,那么管理分片的总时间和构建map任务的总时间将决定做业的整个执行时间。对于大多数做业来讲,一个合理的分片大小趋向于HDFS的一个块的大小,默认是128MB,能够针对集群调整这个默认值,或在每一个文件建立时指定。
Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,能够得到最佳性能,由于它无需使用集群带宽资源。这就是所谓的“数据本地化优化”。可是,有时对于一个map任务的输入分片来讲,存储该分片的HDFS数据块复本的全部节点可能正在运行其余map任务,此时做业调度须要从某一数据块所在的机架中的一个节点上寻找一个空闲的map槽(slot)来运行该map任务分片。很是偶然的状况下(基本不会发生),会使用其余机架中的节点运行该map任务。
最佳分片的大小与块大小相同是由于它是确保能够存储在单个节点上的最大输入快的大小。若是分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能存储这两个数据块,所以分片中的部分数据须要经过网络传输到map任务运行的节点。
map任务将其输出写入本地硬盘,而非HDFS。若是运行map任务的节点在将map中间结果传送给reduce任务以前失败,Hadoop将在另外一个节点上从新运行这个map任务以再次构建map中间结果。
reduce任务并不具有数据本地化的优点,单个reduce任务的输入一般来自于全部mapper的输出。对于reduce输出的每一个HDFS块,第一个复本存储在本地节点上,其余复本出于可靠性考虑存储在其余机架的节点中。
reduce任务的数量并不是由输入数据的大小决定,相反是独立指定的。若是有多个reduce任务,每一个map任务就会针对输出进行分区(partition),即为每一个reduce任务建一个分区。分区可由用户定义的分区函数控制,但一般默认的partitioner经过哈希函数来分区。
Hadoop容许用户针对map任务的输出指定一个combiner,combiner函数的输出做为reduce函数的输入。无论调用combiner多少次,reducer的输出结果都是同样的,combiner函数能帮助减小mapper和reducer之间的数据传输量。combiner函数是经过Reducer类来定义的。
Hadoop Streaming使用Unix标准流做为Hadoop和应用程序之间的接口,适合用于文本处理。
map的输入数据经过标准输入流传递给map函数,而且是一行一行地传输,最后将结果行写到标准输出。map输出的键-值对是以一个制表符分隔的行,reduce函数的输入格式与之相同并经过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。
Streaming和Java MapReduce API设计差别:Java API控制的map函数一次只处理一条记录,针对输入数据中的每一条记录,该框架均需调用Mapper的map()方法来处理;而在Streaming中,map程序能够本身决定如何处理输入数据。