MapReduce是一个数据处理的编程模型。这个模型很简单,但也不是简单到不可以支持一些有用的语言。Hadoop可以运行以多种语言写成的MapReduce程序。在这一章中,咱们将看看怎样用Java,Ruby,Python语言来写同一个例子。更重要的是,MapReduce程序天生并发运行,这就至关于把可以进行大数据分析的工具交到了某个拥有足够多机器的人手里。java
在咱们的例子中,将会写一个程序来挖掘天气数据。天气传感器每个小时都会在全球的许多地方收集数据,而且也收集了大量的日志数据。这些数据很是适合于用MapReduce分析。由于咱们想要处理全部数据,而且这些数据是半结构化的和面向记录的。python
咱们所使用的数据来自于国家气候数据中心或称为NCDC。数据以行形式ASCII格式存储,每一行一条记录。这种格式支持丰富的气象属性集合,其中许多属性是可选的,长度可变的。简便起见,咱们仅仅关注基本的属性,如温度。温度老是有值而且长度固定。
示例2-1显示了一行记录,而且将主要的属性进行了注释。这一行记录被分红了多行,每一个属性一行。真实文件中,这些属性都会被放进一行,而且没有分隔符。数据库
示例:2-1 0057 332130 # USAF 天气基站标识 99999 # WBAN 天气基站标识 19500101 # 观察日期 0300 # 观察时间 4 +51317 # 纬度 (角度 x 1000) +028783 # 经度 (角度 x 1000) FM-12 +0171 # 海拔 (米) 99999 V020 320 # 风向 (角度) 1 # 质量码 N 0072 1 00450 # 天空最高高度 (米) 1 # 质量码 C N 010000 # 可见距离 (米) 1 # 质量码 N 9 -0128 # 空气温度 (摄氏度 x 10) 1 # 质量码 -0139 # 露点温度 (摄氏度 x 10) 1 # 质量码 10268 # 大气压 (百帕 x 10) 1 # 质量码
数据文件按照日期和天气基站整理。从1901到2001,每年都有一个目录文件。每个目录文件中包括每个天气基站收集到的当年气候数据的压缩文件。例如1990年部分文件:apache
% ls raw/1990 | head 010010-99999-1990.gz 010014-99999-1990.gz 010015-99999-1990.gz 010016-99999-1990.gz 010017-99999-1990.gz 010030-99999-1990.gz 010040-99999-1990.gz 010080-99999-1990.gz 010100-99999-1990.gz 010150-99999-1990.gz
因为有成千上万个天气基站,因此每年都由大量的相关小文件组成。一般处理少许的大文件更容易和有效。因此这些数据须要被预处理,使每年的全部记录都被放到一个文件中(附录C中有详细的方法说明)。编程
如何获取每年的全球最高温度呢?咱们首先不使用Hadoop工具来回答这个问题。
这将会为咱们提供一个性能基准线和检查咱们日后的结果是否准确的方法。
经典的处理行结构数据的工具是awk。示例2-2向咱们展现了如何获取每年全球最高温度。ruby
示例2-2 #!/usr/bin/env bash for year in all/* do echo -ne `basename $year .gz`"\t" gunzip -c $year | \ awk '{ temp = substr($0, 88, 5) + 0; q = substr($0, 93, 1); if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp } END { print max }' done
这个脚本循环处理已经压缩的年文件,首先输出年度值,而后使用awk处理每个文件。awk脚本从这些数据中提取出空气温度和质量码。空气温度经过加0转换成整数,下一步,判断温度(温度9999在NCDC中表示没检测到温度)和质量码是否有效。质量码表示此温度值是否准确或者错误。若是温度值没有问题,则与目前为止最高温度相比较,若是比目前最高温度高,则更新最高温度。当文件中全部行被处理以后,END块被执行,打印出最高温度。下面看看部分运行结果:
```
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
````
源文件中的温度值被扩大了10倍,因此1901年的最高温度是31.7摄氏度,因为在20世纪初读取到的气候值很是有限,因此这个结果只能是近似真实。在硬件是单个超大型高CPU EC2实例计算中跑完整个世纪的数据花了42分钟。bash
为了提升处理速度,咱们须要并行运行部分程序。理论上,咱们很容易想到可使用计算机中全部可用的线程并行处理不一样的年份数据。可是这样仍然存在一些问题。网络
首先,将整个处理工做进程等分为相同的部分并不简单或明显。在这个例子中,不一样的年份的文件大小不同,而且有的差异很大。全部一些处理进程将会完成地早一些,一些将会晚一些。即时完成早的进程再处理其它工做,整个运行时间仍然被最大的文件限制。一个更好的途径是将输入数据分红大小相等的块,而且处理每个数据块。虽然这样可能形成更多的工做量。并发
第二,将每个独立的处理结果合并在一块儿须要额外处理工做。在这个例子中,每年的处理结果都是相互独立的。这些结果会被链接在一块儿,而且按年排序。若是经过数据量大小数据块途径,合并将更加容易出错。就这个例子而言,某一年的数据可能被分红多个数据块,每个数据块都单独处理,并获得每一块的最高温度。最后,咱们还须要找到某年中这些块中最高温度中的最高温度做为这一年的最高温度。app
第三,你仍然会被单个计算机的处理能力限制。若是用单个计算机中全部的处理器,最快的处理时间是20分钟,那么,你不可能更快。并且有的数据集超过单个计算机的处理能力。当使用多台计算机一块儿处理时,一些其它的因素又会影响性性能,主要有协调性和可靠性两类。谁来执行全部的做业?咱们将怎么处理失败的进程?
因此,虽然并行处理是可行的,但倒是不那么容易控制的,是复杂的。使用像Hadoop这样的框架来处理这些问题极大地帮助了咱们。
为了充分利用Hadoop提供的并行处理优点,咱们须要将咱们的查询写在一个MapReduce做业中。在本地的,小数据量地测试后,咱们将可以在集群中运行它。
MapReduce将处理过程分红两阶段,map阶段和reduce阶段。每阶段将key-value键值对作为输入和输出。开发者能够选择输入输出参数类型,也能指定两个函数:map函数和reduce函数。
map阶段的输入数据是原始的NCDC数据。咱们选择文本格式。文本中的每一行表示一条文本记录。key值是行开头距离当前文件开头的位移,可是咱们不须要它,忽略便可。
map函数很简单。由于咱们仅关心年份和温度,因此获取每行的年度和温度便可,其它属性不须要。这个例子中,仅仅是一个数据准备阶段,以某种方法准备reduce函数可以处理的数据。map函数仍是一个丢弃坏记录的地方,例如那些没有测量到的,不许备的或错误的温度。
为了展示map怎么样工做的,选取少许的输入数据进行说明(为了适应页面宽度,一些没有使用到的列用省略号表示)
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
这些行以key-value的形式提供给map函数:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
关键值是行的位移,在map函数中咱们能够忽略它。map函数仅仅须要获取到年度和温度值(以粗体表示的数据),而后输出。输出的时候将温度值转换成整数。
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
map的输出结果在被送往reduce函数以前被MapReduce框架按照关键字排序合并处理。因此在进行下一步以前,reduce函数会接收到以下数据:
(1949, [111, 78])
(1950, [0, 22, −11])
如上所示,每年的全部温度值都合并到一个列表中。reduce函数所要作的就是遍历每年的温度,而后找到最高温度。
(1949, 111)
(1950, 22)
以上就是最终的输出:每年的最高温度。
整个数据流程如图2-1所示。在图表底部是对应的Unix命令。它模拟整个MapReduce流程,咱们将会在这章节的后面Hadoop Streaming中看到。
在知道了MapReduce程序怎么样工做了以后,下一步是用代码实现它。咱们须要作三件事情:map函数,reduce函数,运行做业的代码。map功能以Mapper抽象类表示
,它申明了一个map()抽象方法。示例2-3显示了map函数的实现。
示例2-3 import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
Mapper类是一个泛型,有四个形参,分别表示输入key,输入值,输出key,和map函数输出值类型。就当前的例子来讲,输入key是一个长整型的位移,输入值是一行文本,输出key是年份,输出会是是空气温度(整数)。Hadoop使用它本身的基本类型集而不使用JAVA内建的基本类型。由于Hadoop本身的基本类型对网络序列化进行了优化。这些基本类型能够在 org.apache.hadoop.io pack‐
age中找到。这里咱们使用 LongWritable类型,它表示长文本类型,对应了Java的String类型,又使用了 IntWritable类型,对应于Java的Integer类型。
map函数被传了一个key值和一个value值,咱们把包含输入的一行文本转换成Java String类型数据,并使用String的SubString方法取到咱们感兴趣的列值。
map函数也提供了一个Context实例,以便将输出结果写入其中。在咱们的这个例子中,咱们把年份做为文本类型Key值写到Context中,把温度封闭成IntWritable类型也写入Context.而且只有温度有效而且质量码显示当前温度的获取是正常的时候才写入。
reduce功能相似地用Reduce抽象类表示,实例类见示例2-4
示例2-4 import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
Reduce抽象类也是一个泛型类,也具备四个形参。reduce函数的输入类型必须匹配map的输出类型,即Text和IntWritable.此例子中,reduce函数的输出是Text和IntWritable类型,分别表示年份与当前年份最高温度。经过遍历温度值,将当前温度值与最高温度比较来找到当前年份的最高温度。
第三部分是运行MapReduce做业的代码,见示例2-5.
示例2-5 import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Job对象指明运行一个做业所须要的全部设置以及让你控制做业如何执行。当咱们在一个Hadoop集群上运行这个做业的时候,咱们须要将代码打包成JAR文件,Hadoop会把JAR文件在集群中分发。咱们能够经过setJarByClass方法指定类文件,而不须要显示指明JAR文件的名字。Hadoop会搜索包含setJarByClass指定的类的相关JAR文件。
建立了一个实例Job后,指定输入和输出文件路径。经过调用 FileInputFormat 的静态方法addInputPath()指定输入路径,此路径能够是一个文件,也能够是一个目录。若是是一个目录,输入的数据包含此目录下全部文件。还能够是文件类型。就像方法名所表示的那样,addInputPath()能够被调用屡次以便添加多个输入路径。
输出路径经过FileOutputFormat 的静态方法setOutputPath()指定。输出路径仅能够指定一次。它指定了一个目录。reduce会把它的输出结果的文件放到这个目录下。这个目录在运行Hadoop以前不该该存在。由于若是存在Hadoop将会报错并不会执行做业。这是为了预防数据丢失。由于若是不当心覆盖了同一目录下其它做业的输出结果将是很是使人懊恼的。
下一步使用 setMapperClass() 和setReducerClass()方法指定map和reduce类。setOutputKeyClass()和 setOutputValueClass()方法控制reduce函数输出参数的类型。必须和Reduce抽象类中参数的一致。map输出参数的类型默认是相同的类型。因此若是map和reduce函数有相同的输出参数类型时就不须要特别指定了。就像咱们这个例子这样。然而,若是它们不相同,就须要经过 setMapOutputKeyClass() 和setMapOutputValueClass()函数来指定map的输出参数类型。
map函数的输入参数类型经过输入格式指定。咱们没有显示地设置,由于咱们使用了默认的TextInputFormat格式。
在指定了自定义的map和reduce函数以后,就能够准备执行做业了。Job类的waitForCompletion()方法用于提交做业,并用等待做业完成。这个方法须要一个参数,用以表示是否将做业日志详细信息输出到控制台。若是为true,就输出。这个方法的返回值是一个布尔类型,用于表示做业的执行成功与否。成功返回true,失败返回false。这里咱们将成功与否转换成了0或1。
这部分使用的Java MapReduce API以及这本书所使用的全部API被称为"New API"。 它代替了功能相同的老的API。这两种API区别请查看附录D,而且附录D有如何在这两种API转换的相关建议。固然你也能在这儿用旧的API完成相同功能的获取每一年最高温度的应用。
在完成MapReduce做业编写以后,正常状况下使用少许数据集测试运行,方便当即检测出代码问题。首先以脱机模式安装Hadoop(附录A中有说明),这个模式下Hadoop使用本地文件生成本地做业运行。能够在这本书的网站上找到安装和编译这个示例的说明。
让咱们使用上面五行数据运行这个做业,输出结果稍微调整了一下以便适应页面,而且有一些行被删除了。
% export HADOOP_CLASSPATH=hadoop-examples.jar % hadoop MaxTemperature input/ncdc/sample.txt output 14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 1 14/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:1 14/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local26392882_0001 14/09/16 09:48:40 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_0001 14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task: attempt_local26392882_0001_m_000000_0 14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0 is done. And is in the process of committing 14/09/16 09:48:40 INFO mapred.LocalJobRunner: map 14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0' done. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task: attempt_local26392882_0001_m_000000_0 14/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task: attempt_local26392882_0001_r_000000_0 14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments 14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments 14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0 is done. And is in the process of committing 14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/09/16 09:48:40 INFO mapred.Task: Task attempt_local26392882_0001_r_000000_0 28 | Chapter 2: MapReduce is allowed to commit now 14/09/16 09:48:40 INFO output.FileOutputCommitter: Saved output of task 'attempt...local26392882_0001_r_000000_0' to file:/Users/tom/book-workspace/ hadoop-book/output/_temporary/0/task_local26392882_0001_r_000000 14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce > reduce 14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_r_000000_0' done. 14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task: attempt_local26392882_0001_r_000000_0 14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce task executor complete. 14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 running in uber mode : false 14/09/16 09:48:41 INFO mapreduce.Job: map 100% reduce 100% 14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 completed successfully 14/09/16 09:48:41 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=377168 FILE: Number of bytes written=828464 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=61 Input split bytes=129 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=61 Reduce input records=5 Reduce output records=2 Spilled Records=10 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=39 Total committed heap usage (bytes)=226754560 File Input Format Counters Bytes Read=529 File Output Format Counters Bytes Written=29
当咱们在hadoop命令第一个参数填写一个类名的时候,会启动一个JVM(JAVA虚拟机),并执行这个类。hadoop命令添加hadoop库和库所依赖的其它库文件到Classpath变量,而且加载hadoop配置。为了将应用中的类文件添加到classpath中,咱们定义了一个 HADOOP_CLASSPATH环境变量,来加载咱们所写的hadoop脚本。
当以本地(脱机)模式运行时,这本书中全部程序都假设你已经以这种方法设置了 HADOOP_CLASSPATH环境变量。这条命令应该在示例代码所在目录运行。
做业运行日志提供了一些有用的信息。例如,咱们能看到这个做业被给了一个做业ID:job_local26392882_0001.运行了一个map任务和一个reduce任务(ID分别是:attempt_local26392882_0001_m_000000_0 和attempt_local26392882_0001_r_000000_0)。知道做业和任务ID在调用MapReduce做业时将颇有用。
最后还有一部分名为"Counters"的数据,这部分数据是Hadoop为每个做业生成的统计信息。这些信息将对于检查处理的数据与预期的数据是否同样很是有用。例如,咱们能知道经过系统各部分的记录数,5条map输入记录,5条map输出记录(能够看出map对于每一条有效的输入记录都有对应的一条输出记录)。还能看出以key值分红2组的5条reduce输入记录,以及2条输出记录。
输出结果写入输出目录。每个reduce函数生成一个输出文件。这个做业只有一个reduce函数,因此只产生一个文件。名称是part-r-00000:
% cat output/part-r-00000
1949 111
1950 22
这个结果跟以前手工计算的一致。这个结果表示1949年最高温度是11.1摄氏度,1950是2.2度。
你已经知道了MapReduce怎么样处理少许数据。如今是时候全局看系统,而且对于大数据处理的数据流。简单来讲,到目前为止,咱们所举的例子都用的本地计算机的文件。更进一步,咱们将要在分布计算机(特别是HDFS,咱们将在下一节中学到)中存储文件数据。使用Hadoop的资源管理系统YARN(第4节),Hadoop会将MapReduce计算过程分发到各个计算机中计算,而这些计算机每一台都保存着一部分数据。让咱们来看看这些是如何发生的。
首先,MapReduce做业是客户端须要去执行的工做单元。它包括输入数据,MapReduce程序以及一些配置信息。Hadoop会把这个做业分红多个任务步骤执行。有两种类型:map任务和reduce任务。这些任务经过YARN计划调度并在分布式系统节点上运行。若是一个任务失败了,YARN会把它放到另一个节点上从新运行。
Hadoop会把输入数据化分红大小相同的数据片段(被称为输入片或均片),Hadoop会为每个片建立一个map任务。map任务会一条条记录地循环执行用户自定义的map函数,直到这个片段中全部记录处理完毕。
不少片段意味着处理每个片段的时间比一次处理整个输入数据的时间少。因此当咱们并发地处理这些片段,而这些片段很小时,可以更好地负载均衡。因此一个性能好的机器比一个性能差些的机器可以相应在处理更多地片段。即便这些机器性能彻底同样,失败的处理进程或者同时运行的做业使负载均衡成为可能(Even if the machines are identical, failed processes or other jobs running
concurrently make load balancing desirable)。而且当片段细粒度越高,负载均衡的质量也会越高。
别外一方面,若是片段过于小,管理片段和建立Map任务所花费的时候则会成为整个做业执行时间的瓶颈。对于大多数做业来讲,一个好的片段大小趋向于一个HDFS块的大小,默认是128M。这个大小能够被集群(Cluster)改变(集群的改成会影响在机群中新建立的全部文件),或者文件新建时就指定。
Hadoop尽可能会在输入数据存放的HDFS那个节点运行Map任务,由于这样不会占用宝贵的集群带宽资源。这被称为本地优化。而后,有时候拥有HDFS数据的节点上正运行着其它Map任务,做业调试器会尝试着在当前集群其它空闲的节点上建立一个Map任务。极少状况下,会到其它集群中的某个节点中建立一个Map任务,这样就须要集群间网络传输。这三种可能性在图表2-2中展现:
如今清楚了为何最优的片段大小是设置成HDFS块大小。由于这样作是数据能被存储在一个节点上的最大数据量。若是一个片段跨两个块大小,任何一个HDFS节点都不太可能储存两个块大小的数据量,这个势必会形成片段的部分数据经过网络传输到正在运行Map任务的节点上。这明显的比直接在本地运行Map任务的性能差一些。
Map任务会将它的输出结果写入本地硬盘中,而不是HDFS,为何要这样作?由于Map的输出只是中间的输出,后续它将会被Reduce任务处理产生最终输出结果。一旦做业完成了,Map的输出结果能够被丢弃,因此将Map的输出结果复制到HDFS中没必要要的。若是在Reduce利用Map的输入结果前,节点运行失败了。Hadoop将在自动的在另一个节点中从新执行这个Map任务,从新产生输入结果。
Reduce任务没有像Map任务那样利用数据本地化的优点,一个Reduce任务的输入每每来自全部Map任务的输出。就拿目前的例子来讲,咱们有一个Reduce任务,其输入数据来自全部的Map任务。所以存储的Map结果必须经过网络传输到运行Reduce的节点上。以后这些传过来的数据会被合并,并传到用户自定义的reduce函数中执行。Reduce的输出结果正常都会存储在HDFS中。就像第三节说明的,对于存储Reduce输出结果的每个HDFS块,第一份复制的数据会存储在本地,其它复制的数据会存储在其它集群可靠的HDFS块中。所以存储Reduce的输出结果肯定须要消耗网络带宽,但也仅仅和一个正常的HDFS输出通道消耗的同样多。
拥有一个Reduce任务的数据流在图表2-3中展现。虚线框表示节点,虚线箭头表示节点内的数据传输。实线的箭头表示节点间的数据传输。
Reduce任务的个数不是由输入数据量的大小决定,而是单独指定的。在"默认的MapReduce做业"那一节,你将会看到对于给定的做业,如何选择Reduce任务的个数。
当有多个reduce时,map任务会将它们的结果分区,每个map任务会为每个reduce任务建立一个分区。每个分区里能够用不少个key和ke关联的值,但某一个key的全部记录必须在同一个分区里。分区这个过程可以被用户自定义的函数控制,但通常来说,默认的分区函数已经可以工做地很好了。它使用哈希函数来将key分类。
多个reduce的通常数据流程图在图表2-4显示。这张图表清楚地显示了map和reduce之间的数据流为何被通俗地叫作"洗牌"。"洗牌"的过程比这个图表显示的更复杂。你将会在"洗牌和排序"这一节中看到,调整它能够对做业的运行时间有很大影响。
最后,也能够有零个reduce任务。这种状况发生在仅并发执行map任务就可以输出结果的时候。此时数据的传输仅发生在map的输出结果写入HDFS的时候(如图2-5)。
许多MapReduce做业执行时间被集群的带宽资源限制。因此值得咱们去尽可能减小map与reduce之间传输的数据量。Hadoop容许用户指定一个组合函数,以便在map输出结果后执行。这个组合函数的输出造成了reduce任务的输入。因为组合函数是优化函数,因此Hadoop不能确保为每个map输出记录调用多少次组合函数。也就是说,零次,一次或屡次调用组合函数,reduce最终都应该输出相同的结果。
组合函数的这种特性限制了它能被使用的业务情形。用一个例子能更好说明。假设最大的温度,例如1950的,被两个map任务处理,由于1950年数据分布在不一样的片段中。假如第一个map任务输出以下结果:
(1950,0)
(1950,20)
(1950,10)
第二个map输出以下结果:
(1950,25)
(1950,15)
Hadoop将会用以上全部值组成列表传给reduce
(1950,[0,20,10,25,15])
输出:
(1950,25)
既然25是当前列表最大的值。咱们就像使用reduce函数同样用一个组合函数找出每个map结果中的最大温度值。这样的话,reduce获得如下值:
(1950,[20,25])
而且产生与以前相同的结果。咱们能够用一种更简洁的方式表示上面的过程:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
然而,并非全部这样的处理都是合适的,例如,咱们要计算平均温度,就不能在组合函数中计算平均温度,由于:mean(0, 20, 10, 25, 15) = 14,可是mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15。
组合函数不能代替Reduce函数(Reduce函数仍然须要用来处理来自不一样map的含有相同key值的记录),可是它能帮助减小在map与reduce之间传递的数据量。所以,在你的MapReduce做业中,老是值得咱们考虑是否使用组合函数。
回到以前JAVA MapReduce程序,组合函数使用Reduce类定义,在这个应用中,它与Reduce功能同样。咱们惟一要作的就是在做业中设定组合类(示例2-6)。
示例2-6 public class MaxTemperatureWithCombiner { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperatureWithCombiner <input path> " +"<output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperatureWithCombiner.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相同的程序将在全量数据库执行。MapReduce特性是无形中扩大了能处理的数据量大小和硬件体积,运行在10个节点的EC2集群上,这个程序跑了6分钟。在第6节中咱们将会看看在集群中运行程序具体的一些技术特性。
Hadoop给MapReduce提供了API容许你用除了JAVA语言以外的其它语言写map和reduce函数。Hadoop流使用Unix系统标准流做业Hadoop和你的程序之间的接口,因此你能使用任意其它的可以读取Unix系统标准流输入数据并可以将数据写到标准输出的语言来写MapReduce程序。
流天生地就适用于文本处理。Map的输入数据经过标准的输入流输入到你自定义的map函数中。在map函数中,你将会一行一行的处理数据,而后将这些数据写入到输出流中。map会用Tab分隔key和value,并将它们作为键值对单独一行输出。这些数据将会以相同的格式作为reduce函数的输入。在输入之间,框架将会把它们按照键值排序,而后reduce会处理这些行,而后将结果输出到标准的输出流。
让咱们以流的方式重写查找每年最高温度的MapReduce程序来讲明。
map函数以Ruby语言编写,见示例2-7
示例2-7 #!/usr/bin/env ruby STDIN.each_line do |line| val = line year, temp, q = val[15,4], val[87,5], val[92,1] puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end
在示例2-7代码块中,Ruby从标准全局IO常量类型STDIN中读取输入数据,而后遍历每一行数据,找到行中相关的字段,若是有效,则输出到标准的输出流。
有必要看一下Streaming与Java MapReduce API之间的区别。 Java API会一条条记录地调用map函数,而后若是使用Streaming形式,map函数能够本身决定怎么样处理输入数据,能够多行一块儿处理也能够单行处理。JAVA map实现的函数是被推数据,可是它仍然能够考虑经过将多条记录放到一个实例变量中来实现一次处理多行的操做。这种状况下,你须要实现cleanup()方法,以便知道最后一条记录处理完的时候,可以结束处理。
因为示例2-7基于标准的输入输出操做,能够不经过Hadoop测试,直接经过Unix命令。
% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb 1950 +0000 1950 +0022 1950 -0011 1949 +0111 1949 +0078
reduce函数稍微有点复杂,如示例2-8
#!/usr/bin/env ruby last_key, max_val = nil, -1000000 STDIN.each_line do |line| key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}\t#{max_val}" if last_key
同map函数同样,reduce函数也会从标准输入中遍历行,但不同的是,当处理每个key组的时候,须要存储某个状态。在这个示例中,关键字是年,咱们存储最后一次遍历的key,并保存每个key组中最大的温度。MapReduce框架会确保输入数据会按照关键值排序,因此咱们知道若是当前key值不一样于上一次遍历的key值时,咱们就进入了新的key组。当使用JAVA API时,reduce函数输入的数据就已经按照key值分好了组,而不像Streaming同样须要人为地去判断key组边界。
对于每一行,咱们取得key和value值,而后看看是否到达了一组的最后( last_key && last_key != key), 若是到达了,咱们记录下这组的Key和最高温度,以Tab制表符分隔,而后初始化最高温度,若是没有到达组的最后,则更新当前Key值的最高温度。最后一行做用是确保最后一个Key组的最高温度可以被记录。
咱们如今可以用Unix命令来模拟整个的MapReduce传输通道(等效于图2-1中所示的Unix通道)。
% cat input/ncdc/sample.txt | \ ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \ sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb 1949 111 1950 22
输出结果与Java程序的同样。下一步使用Hadoop来运行。
Hadoop命令不支持流选项,不过,你能够在jar选项中指定Streaming JAR文件,而后指定输入和输出文件路径,以及map和redeuce脚本文件,看起来以下:
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -input input/ncdc/sample.txt \ -output output \ -mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \ -reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
当在一个集群中基于大数据执行时,咱们须要使用-combiner选项来指定组合函数。
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\ ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \ -input input/ncdc/all \ -output output \ -mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \ -combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \ -reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
注意咱们使用了-files选项,当咱们在集群上运行流程序时,须要将map和reduce脚本文件复制到集群中。
流程序支持任意可以从标准输入读取数据并将数据写入标准输出的语言。因此使用读者更熟悉的Python,再写一遍以上例子。map脚本如示例2-9,reduce脚本如示例2-10.
示例2-9:map script #!/usr/bin/env python import re import sys for line in sys.stdin: val = line.strip() (year, temp, q) = (val[15:19], val[87:92], val[92:93]) if (temp != "+9999" and re.match("[01459]", q)): print "%s\t%s" % (year, temp) 示例:2-10 reduce script import sys (last_key, max_val) = (None, -sys.maxint) for line in sys.stdin: (key, val) = line.strip().split("\t") if last_key and last_key != key: print "%s\t%s" % (last_key, max_val) (last_key, max_val) = (key, int(val)) else: (last_key, max_val) = (key, max(max_val, int(val))) if last_key: print "%s\t%s" % (last_key, max_val)
咱们能像Ruby中同样以相同的方法来运行这个做业。
% cat input/ncdc/sample.txt | \ ch02-mr-intro/src/main/python/max_temperature_map.py | \ sort | ch02-mr-intro/src/main/python/max_temperature_reduce.py 1949 111 1950 22
本文是笔者翻译自《OReilly.Hadoop.The.Definitive.Guide.4th.Edition》第一部分第2节,后续将继续翻译其它章节。虽尽力翻译,但奈何水平有限,错误再所不免,若是有问题,请不吝指出!但愿本文对你有所帮助。
本文转自个人简书博客