洗牌和排序阶段都很耗费资源。洗牌须要在map和reduce任务之间传输数据,会致使过大的网络消耗。排序和合并操做的消耗也是很显著的。这一节将介绍一系列的技术来缓解洗牌和排序阶段的消耗。数组
技术46 规避使用reduce网络
Reduce在用于链接数据集的时候将会产生大量的网络消耗。ide
问题函数
须要考虑在MapReduce规避reduce的使用。工具
方案oop
经过将MapReduce参数setNumReduceTasks设置为0来建立一个只有map的做业。学习
讨论优化
洗牌和排序阶段通常都是用来链接数据集。但链接操做并不必定须要洗牌和排序,正如第4章中所介绍的。知足必定条件的链接能够只在map端运行。那么就只须要只有map的做业了。设置只有map的做业的命令以下。this
job.setNumReduceTasks(0);
小结spa
一个只有map的做业的OutputFormat是和普通做业中reduce的OutputFormat同样。如图6.39所示。
若是没法规避reduce,那么就要尽可能减少它对你的做业执行时间的影响。
技术47 过滤和投影
Map到Reduce之间传输数据要经过网络,这个成本很高。
问题
须要减小被洗牌的数据。
方案
减小map输出的每条记录的大小,并尽量地减小map输出的数据量。
讨论
过滤和投影是关系运算中的概念,用以减小须要处理的数据。这些概念也能够用到MapReduce中减小map任务须要输出的数据。如下是过滤和投影的简明定义:
如下是上述概念的演示代码:
1 Text outputKey = new Text(); 2 Text outputValue = new Text(); 3 4 @Override 5 public void map(LongWritable key, Text value, 6 OutputCollector<Text, Text> output, 7 Reporter reporter) throws IOException { 8 9 String v = value.toString(); 10 11 if (!v.startsWith("10.")) { 12 String[] parts = StringUtils.split(v, ".", 3); 13 outputKey.set(parts[0]); 14 outputValue.set(parts[1]); 15 output.collect(outputKey, outputValue); 16 } 17 }
小结
过滤和投影是在须要显著减小MapReduce做业运行时间时最容易的方法中的两种。
若是已经应用了这两种方法,但还须要进一步减小运行时间。那么就能够考虑combine。
技术48 使用combine
Combine能够在map阶段进行聚合操做来减小须要发送到reduce的数据。它是一个map端的优化工具,以map的输出做为输入。
问题
须要在过滤和投影后进一步减小运行时间。
方案
定义一个combine。在做业代码中使用setCombinerClass来调用它。
讨论
在map输出数据到磁盘的过程当中,有两个子过程:溢洒(spill)子过程,合并子过程。Combine在这两个子过程当中都会被调用,如图6.40所示。为了让combine在分组数据中效率最大,能够在两个子过程调用combine以前进行初步(precursory)的排序。
与设置map类相似,做业使用setCombinClass来设置combine。
job.setCombinerClass(Combine.class);
Combine的实现必须严格听从reduce的规格说明。这里将假定使用技术39种的map。将map的输出中的记录按照下述条件合并:第二个八进制数相同。代码以下。
1 public static class Combine implements Reducer<Text, Text, Text, Text> { 2 3 @Override 4 public void reduce(Text key, Iterator<Text> values, 5 OutputCollector<Text, 6 Text> output, 7 Reporter reporter) throws IOException { 8 9 Text prev = null; 10 while (values.hasNext()) { 11 Text t = values.next(); 12 if (!t.equals(prev)) { 13 output.collect(key, t); 14 } 15 prev = ReflectionUtils.copy(job, t, prev); 16 } 17 } 18 }
Combine函数必须是可分布的(distributive)。如图6.40(在前面)所示,combine要被调用屡次处理多个具备相同输入键的记录。这些记录的顺序是不可预测的。可分布函数是指,不论输入数据的顺序如何,最终的结果都同样。
小结
在MapReduce中combine很是有用,它可以减小map和reduce之间的网络传输数据和网络负载。下一个减小执行时间的有用工具就是二进制比较器。
技术49 用Comparator进行超快排序
MapReduce默认使用RawComparator对map的输出键进行比较排序。内置的Writable类(例如Text和IntWritable)是字节级实现。这样不用将字节形式的类解排列(unmarshal)成类对象。若是要经过WritableComparable实现自定义Writable,就有可能延长洗牌和排序阶段的时间,由于它须要进行解排列。
问题
存在自定义的Writable。须要减小做业的排序时间。
方案
实现字节级的Comparator来优化排序中的比较过程。
讨论
在MapReduce中不少阶段,排序是经过比较输出键来进行的。为了加快键排序,全部的map输出键必须实现WritableComparable接口。
1 public interface WritableComparable<T> extends Writable, Comparable<T> { 2 3 }
若是对4.2.1中的Person类进行改造,实现代码以下。
1 public class Person implements WritableComparable<Person> { 2 private String firstName; 3 private String lastName; 4 5 @Override 6 public int compareTo(Person other) { 7 int cmp = this.lastName.compareTo(other.lastName); 8 if (cmp != 0) { 9 return cmp; 10 } 11 return this.firstName.compareTo(other.firstName); 12 } 13 ...
这个Comparator的问题在于,若是要进行比较,就须要将字节形式的map的中间结果数据解排列成Writable形式。解排列要从新建立对象,所以成本很高。
Hadoop中的自带的各类Writable类不但扩展了WritableComparable接口,也提供了基于WritableComparator类的自定义Comparator。代码以下。
1 public class WritableComparator implements RawComparator { 2 3 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 4 5 try { 6 buffer.reset(b1, s1, l1); 7 key1.readFields(buffer); 8 9 buffer.reset(b2, s2, l2); 10 key2.readFields(buffer); 11 } catch (IOException e) { 12 throw new RuntimeException(e); 13 } 14 return compare(key1, key2); 15 } 16 17 /** Compare two WritableComparables. 18 * 19 * <p> The default implementation uses the natural ordering, 20 * calling {@link 21 * Comparable#compareTo(Object)}. */ 22 @SuppressWarnings("unchecked") 23 public int compare(WritableComparable a, WritableComparable b) { 24 return a.compareTo(b); 25 } 26 ... 27 }
要实现字节级的Comparator,须要重载compare方法。这里先学习一下IntWritable类如何实现这个方法。
1 public class IntWritable implements WritableComparable { 2 3 public static class Comparator extends WritableComparator { 4 5 public Comparator() { 6 super(IntWritable.class); 7 } 8 9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 10 int thisValue = readInt(b1, s1); 11 int thatValue = readInt(b2, s2); 12 return (thisValue<thatValue ? -1 : 13 (thisValue==thatValue ? 0 : 1)); 14 } 15 } 16 17 static { 18 WritableComparator.define(IntWritable.class, new Comparator()); 19 }
若是只使用内置的Writable,那就没有必要实现WritableComparator。它们都自带。若是须要使用自定义的Writable做为输出键,那么就须要自定义WritableComparator。这里基于前述Person类来讲明如何实现。
在Person类中,有两个字符串类属性,firstName和lastName。使用writeUTF方法经过DataOutput输出它们。如下是实现代码。
1 private String firstName; 2 private String lastName; 3 4 @Override 5 public void write(DataOutput out) throws IOException { 6 out.writeUTF(lastName); 7 out.writeUTF(firstName); 8 }
首先须要理解Person对象是如何用字节形式表示的。writeUTF方法输出了字节长度(2个字节),字符内容(字符的长度,L1个字节)。如图6.41描述了字节是如何排列的。
假设须要对lastName和firstName进行字典式地比较(译注:就是看字典中的前后顺序)。显然不能直接用整个字节数组,由于其中还有字符长度。那么Comparator就须要足够聪明到可以跳过字符长度。如下是实现代码。
1 @Override 2 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 3 4 int lastNameResult = compare(b1, s1, b2, s2); 5 if (lastNameResult != 0) { 6 return lastNameResult; 7 } 8 int b1l1 = readUnsignedShort(b1, s1); 9 int b2l1 = readUnsignedShort(b2, s2); 10 return compare(b1, s1 + b1l1 + 2, b2, s2 + b2l1 + 2); 11 } 12 13 public static int compare(byte[] b1, int s1, byte[] b2, int s2) { 14 int b1l1 = readUnsignedShort(b1, s1); 15 int b2l1 = readUnsignedShort(b2, s2); 16 return compareBytes(b1, s1 + 2, b1l1, b2, s2 + 2, b2l1); 17 } 18 19 public static int readUnsignedShort(byte[] b, int offset) { 20 int ch1 = b[offset]; 21 int ch2 = b[offset + 1]; 22 return (ch1 << 8) + (ch2); 23 }
小结
writeUTF只支持小于65536字符的字符串类。对于人名来讲,是足够了。大点的,可能就不行。这个时候就须要使用Hadoop的Text类来支持更大的字符串。Text类中的Comparator类的二进制字符串比较器的实现机制和刚才介绍的大体至关。(这个修饰真长。)那么针对Text类的lastName和firstName的Comparator的实现方式也会累死。
下一节将介绍如何减少数据倾斜的影响。