用MapReduce进行数据密集型文本处理-本地聚合

本文译自Working Through Data-Intensive Text Processing with MapReduce html

由于最近忙于 Coursera提供 的一些课程,我已经有一段时间没有写博客了。这些课程很是有意思,值得一看。我买了一本书《Data-Intensive Processing with MapReduce》,做者是Jimmy和Chris Dyer。书里以伪码形式总结了一些了MapReduce的重要算法。。我打算用真正的hadoop代码来实现这本书中第3-6章中出现过的算法,以Tom White的《Hadoop经典指南》做为参考。我假设本文的读者已经了解Hadoop和MapReduce,因此本文再也不详述基础概念。让咱们直接跳到第3章-MapReduce算法设计,从本地聚合开始。 git

本地聚合(Local Aggregation) github

从比较高的抽象层面上来说,mapper输出数据的时候要先把中间结果写到磁盘上,而后穿过网络传给reducer处理。对于一个mapreduce job来讲,将数据写磁盘以及以后的网络传输的代价高昂,由于它们会大大增长延迟。因此,应该尽量减小mapper产生的数据量,这样才能加快job的处理速度。本地聚合就是这样一种减小中间数据量提升job效率的技术。本地聚合并不能代替reducer,由于reducer能够汇集来自不一样mapper的具备一样key的数据。咱们有三种本地聚合的方法: 算法

1.使用Hadoop Combiner的功能 apache

2.Data-Intensive Processing with MapReduce这本书里提到的两种在mapper里聚合的方法 api

固然任何优化都要考虑一些其余因素,咱们将在后面讨论这些。 网络

为了演示本地聚合,我在个人MacBookPro上用Cloudera的hadoop-0.20.2-cdh3u3搭建了了一个伪分布集群环境,咱们将用查尔斯狄更斯的小说《A Christmas Carol》来运行word count。我计划之后在EC2上用更大的数据来作这个实验。 app

Combiners

combiner功能由继承了Reducer class的对象实现。事实上,在咱们的例子里,咱们会重用word count中的reducer来做为combiner。combiner 在配置MapReduce job的时候指定,就像这样: 框架

1 job.setReducerClass(TokenCountReducer.class);

下面是reducer的代码: 分布式

01 public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
02     @Override
03     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throwsIOException, InterruptedException {
04         int count = 0;
05         for (IntWritable value : values) {
06               count+= value.get();
07         }
08         context.write(key,new IntWritable(count));
09     }
10 }

combiner的做用就如它的名字,聚合数据以尽可能减小shuffle阶段的网络传输量。如前所述,reducer仍然须要把来自不一样mapper的一样的key汇集起来。由于combiner功能只是对过程的一个优化,因此Hadoop框架不能保证combiner会被调用多少次。(配置了combinere就必定会执行,可是执行1次仍是n次是预先不肯定的)

在Mapper聚合的方法1

不用combiner的话,替代方法之一只须要对咱们原来的word count mapper作一个小小的修改:

01 public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
02     @Override
03     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
04         IntWritable writableCount = new IntWritable();
05         Text text = new Text();
06         Map<String,Integer> tokenMap = new HashMap<String, Integer>();
07         StringTokenizer tokenizer = new StringTokenizer(value.toString());
08  
09         while(tokenizer.hasMoreElements()){
10             String token = tokenizer.nextToken();
11             Integer count = tokenMap.get(token);
12             if(count == null) count = new Integer(0);
13             count+=1;
14             tokenMap.put(token,count);
15         }
16  
17         Set<String> keys = tokenMap.keySet();
18         for (String s : keys) {
19              text.set(s);
20              writableCount.set(tokenMap.get(s));
21              context.write(text,writableCount);
22         }
23     }
24 }

如咱们所看到的,输出的词的计数再也不是1,咱们用一个map记录处理过的每一个词。处理完毕一行中的全部词,而后遍历这个map,输出每一个词在一行中的出现次数。

在Mapper聚合的方法2

mapper中聚合的第二种方法与上面的例子很是类似,但也有两处不一样 - 在何时创建hashmap以及何时输出hashmap中的结果。在上面的例子里,在每次调用map方法的时候建立map并在调用完成的时候输出。在这个例子里,咱们会把map做为一个实例变量并在mapper的setUp方法里初始化。一样,map的内容要等到全部的map方法调用都完成以后,调用cleanUp方法的时候才输出。

01 public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
02  
03     private  Map<String,Integer> tokenMap;
04  
05     @Override
06     protected void setup(Context context) throws IOException, InterruptedException {
07            tokenMap = new HashMap<String, Integer>();
08     }
09  
10     @Override
11     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
12         StringTokenizer tokenizer = new StringTokenizer(value.toString());
13         while(tokenizer.hasMoreElements()){
14             String token = tokenizer.nextToken();
15             Integer count = tokenMap.get(token);
16             if(count == null) count = new Integer(0);
17             count+=1;
18             tokenMap.put(token,count);
19         }
20     }
21  
22  
23     @Override
24     protected void cleanup(Context context) throws IOException, InterruptedException {
25         IntWritable writableCount = new IntWritable();
26         Text text = new Text();
27         Set<String> keys = tokenMap.keySet();
28         for (String s : keys) {
29             text.set(s);
30             writableCount.set(tokenMap.get(s));
31             context.write(text,writableCount);
32         }
33     }
34 }
正如上面的代码所示,在   mapper里,跨越全部map方法调用,记录每一个词的出现次数。经过这样作,大大减小了发送到reducer的记录数量,可以减小MapReduce任务的运行时间。达到的效果与使用MapReduce框架的combiner功能相同,可是这种状况下你要本身保证你的聚合代码是正确的。可是使用这种方法的时候要注意,在map方法调用过程当中始终保持状态是有问题的,这有悖于“map”功能的原义。并且,在map调用过程当中保持状态也须要关注你的内存使用。总之,根据不一样状况来作权衡,选择最合适的办法。

结果

如今让咱们来看一下不一样mapper的结果。由于job运行在伪分布式模式下,这个运行时间不足以参考,不过咱们仍然能够推断出使用了本地聚合以后是如何影响真实集群上运行的MapReduce job的效率的。

每一个词输出一次的Mapper:

1 12/09/13 21:25:32 INFO mapred.JobClient:     Reduce shuffle bytes=366010
2 12/09/13 21:25:32 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:25:32 INFO mapred.JobClient:     Spilled Records=63118
4 12/09/13 21:25:32 INFO mapred.JobClient:     Map output bytes=302886

在mapper中聚合方法1:

1 12/09/13 21:28:15 INFO mapred.JobClient:     Reduce shuffle bytes=354112
2 12/09/13 21:28:15 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:28:15 INFO mapred.JobClient:     Spilled Records=60704
4 12/09/13 21:28:15 INFO mapred.JobClient:     Map output bytes=293402

在mapper中聚合方法2:

1 12/09/13 21:30:49 INFO mapred.JobClient:     Reduce shuffle bytes=105885
2 12/09/13 21:30:49 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:30:49 INFO mapred.JobClient:     Spilled Records=15314
4 12/09/13 21:30:49 INFO mapred.JobClient:     Map output bytes=90565

使用了Combiner:

1 12/09/13 21:22:18 INFO mapred.JobClient:     Reduce shuffle bytes=105885
2 12/09/13 21:22:18 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:22:18 INFO mapred.JobClient:     Spilled Records=15314
4 12/09/13 21:22:18 INFO mapred.JobClient:     Map output bytes=302886
5 12/09/13 21:22:18 INFO mapred.JobClient:     Combine input records=31559
6 12/09/13 21:22:18 INFO mapred.JobClient:     Combine output records=7657

正如所料,没有作任何聚合的Mapper效果最差,而后是“在mapper中聚合方法1”,差之了了。“在mapper中聚合方法2”与使用了combiner的结果很近似。比起前两种方法,他们节省了2/3的shuffle字节数。这等于减小了一样数量的网络数据传输量,十分有利于提升MapReduce job的运行效率。不过要记住,方法2或者combiner并不必定可以应用于全部的MapReduce jobs word count很适合于这种场景,可是别的状况可不必定。

结论

正如你看到的,使用mapper里聚合方法和combiner是有好处的,不过当你在寻求提高MapReduce jobs的性能的时候你应该多考虑一些因素。至于选哪一种方法,这取决于你如何权衡。

本文是《Data Intensive Processing with MapReduce中的算法实现系列文章的最新一篇。该系列文章的第一篇在此 。在第一篇里,咱们讨论了使用本地聚合技术来减小shuffle阶段的网络传输数据量。减小须要传输的数据量是提升mapreduce job的性能的最有效的办法。咱们在上一篇文章里用了word count来演示本地聚合。由于咱们须要的只是一个最终统计结果,而在计算最终结果的过程当中改变累加的分组与顺序都不会影响最终结果,因此咱们能够重用reducer来做为combiner。可是若是想计算平均值怎么办?这种状况下原来的办法就行不通了,由于整体的平均值不等于各部分平均值的平均。不过若是可以清楚的意识到这一点,咱们仍是可使用本地聚合方法的。在本文的例子中咱们将使用 在Hadoop经典指南》中出现过的 美国国家气候中心的天气数据  样原本计算1901年每月的平均气温。使用combiner和mapper中聚合的计算平均值的算法可在《Data Intensive Processing with MapReduce》的3.1.3找到。

没有放之四海而皆准的方法

咱们在上一篇文章里介绍了两种减小数据的方法,Hadoop Combiner和在mapper中聚合。Combiner被视为是一个优化措施,所以框架不会保证它会被调用多少次。因此,mapper输出的数据格式必须是符合reducer输入格式的,以便在combiner根本没有运行的状况下最终结果仍是正确的。为了计算平均气温,咱们须要改变一下mapper的输出。

Mapper 的变化

在 word-count的例子里,没有优化的mapper输出每一个词和值为1的计数。combiner和在mapper中聚合的方法经过一个hashmap,将每一个词做为key,出现次数做为值,保存在hashmap中来减小输出。若是combiner没有调用,reducer将收到一系列key是单词,值为1的数据,这与以前的结果是同样的。(若是使用在mapper中聚合的话就不会发生这种状况,由于聚合是发生在mapper的代码里的,因此必定会被执行)。为了计算平均值,咱们的mapper须要输出一个字符串key(年月)和一个定制的实现了writable接口的对象, TemperatureAveragingPair。这个对象有两个数字属性,气温以及该气温的频数。咱们能够参考《Hadoop经典指南》中的 MaximumTemperatureMapper  来创建 AverageTemperatureMapper:
01 public class AverageTemperatureMapper extendsMapper<LongWritable, Text, Text, TemperatureAveragingPair> {
02  //sample line of weather data
03  //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999
04  
05  
06     private Text outText = new Text();
07     private TemperatureAveragingPair pair = new TemperatureAveragingPair();
08     private static final int MISSING = 9999;
09  
10     @Override
11     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
12         String line = value.toString();
13         String yearMonth = line.substring(15, 21);
14  
15         int tempStartPosition = 87;
16  
17         if (line.charAt(tempStartPosition) == '+') {
18             tempStartPosition += 1;
19         }
20  
21         int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
22  
23         if (temp != MISSING) {
24             outText.set(yearMonth);
25             pair.set(temp, 1);
26             context.write(outText, pair);
27         }
28     }
29 }

经过让mapper输出key和TemperatureAveragingPair对象,无论combiner有没有执行咱们的mapreduce程序都能输出正确的结果。

Combiner

咱们要减小传输的数据量,就要尽量把相同气温的计数合并,可是又不能影响最终的平均数计算。当combiner执行的时候,它会把具备相同key的TemperatureAveragingPair 合并成一个,包含汇总的气温和频度计数。下面是combiner的代码:

01   public class AverageTemperatureCombiner extendsReducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {
02     private TemperatureAveragingPair pair = new TemperatureAveragingPair();
03  
04     @Override
05     protected voidreduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throwsIOException, InterruptedException {
06         int temp = 0;
07         int count = 0;
08         for (TemperatureAveragingPair value : values) {
09              temp += value.getTemp().get();
10              count += value.getCount().get();
11         }
12         pair.set(temp,count);
13         context.write(key,pair);
14     }
15 }

可是咱们确实很关心如何减小须要传输给reducer的数据量,下面咱们将会看看如何实现这个目的。

在mapper中合并平均值

与word-count相同,为了计算均值,在mapper中合并的方法会用到一个hashmap,它以年月为key,以TemperatureAveragingPair为值。合并相同年月的数据的时候咱们须要取出以该年月为key的TemperatureAveragingPair对象,将temperature属性和count属性累加。最后在cleanUp方法被调用的时候会输出hashmap中全部的key和TemperatureAveragingPair。

01 public class AverageTemperatureCombiningMapper extendsMapper<LongWritable, Text, Text, TemperatureAveragingPair> {
02  //sample line of weather data
03  //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999
04  
05  
06     private static final int MISSING = 9999;
07     private Map<String,TemperatureAveragingPair> pairMap = newHashMap<String,TemperatureAveragingPair>();
08  
09  
10     @Override
11     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
12         String line = value.toString();
13         String yearMonth = line.substring(15, 21);
14  
15         int tempStartPosition = 87;
16  
17         if (line.charAt(tempStartPosition) == '+') {
18             tempStartPosition += 1;
19         }
20  
21         int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
22  
23         if (temp != MISSING) {
24             TemperatureAveragingPair pair = pairMap.get(yearMonth);
25             if(pair == null){
26                 pair = new TemperatureAveragingPair();
27                 pairMap.put(yearMonth,pair);
28             }
29             int temps = pair.getTemp().get() + temp;
30             int count = pair.getCount().get() + 1;
31             pair.set(temps,count);
32         }
33     }
34  
35  
36     @Override
37     protected void cleanup(Context context) throws IOException, InterruptedException {
38         Set<String> keys = pairMap.keySet();
39         Text keyText = new Text();
40         for (String key : keys) {
41              keyText.set(key);
42              context.write(keyText,pairMap.get(key));
43         }
44     }
45 }

用这种在mapper中合并的方法,咱们在屡次map调用之间保存了信息,确保了可以对产出数据进行削减。尽管保持跨mapper的状态是一件须要当心的事情,但这在某些状况下确实颇有效。

Reducer

在这种状况reducer的逻辑就很简单了,遍历每一个key的全部值,把temperatures 和counts加和,而后相除。

01 public class AverageTemperatureReducer extendsReducer<Text, TemperatureAveragingPair, Text, IntWritable> {
02     private IntWritable average = new IntWritable();
03  
04     @Override
05     protected voidreduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throwsIOException, InterruptedException {
06         int temp = 0;
07         int count = 0;
08         for (TemperatureAveragingPair pair : values) {
09             temp += pair.getTemp().get();
10             count += pair.getCount().get();
11         }
12         average.set(temp / count);
13         context.write(key, average);
14     }
15 }

结果

正如预料,使用了combiner和mapper中合并方法的结果大幅减小了输出数据。
没有优化的状况:

01 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input groups=12
02 12/10/10 23:05:28 INFO mapred.JobClient:     Combine output records=0
03 12/10/10 23:05:28 INFO mapred.JobClient:     Map input records=6565
04 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce shuffle bytes=111594
05 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce output records=12
06 12/10/10 23:05:28 INFO mapred.JobClient:     Spilled Records=13128
07 12/10/10 23:05:28 INFO mapred.JobClient:     Map output bytes=98460
08 12/10/10 23:05:28 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
09 12/10/10 23:05:28 INFO mapred.JobClient:     Combine input records=0
10 12/10/10 23:05:28 INFO mapred.JobClient:     Map output records=6564
11 12/10/10 23:05:28 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input records=6564

使用了Combiner的状况:

01 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input groups=12
02 12/10/10 23:07:19 INFO mapred.JobClient:     Combine output records=12
03 12/10/10 23:07:19 INFO mapred.JobClient:     Map input records=6565
04 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce shuffle bytes=210
05 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce output records=12
06 12/10/10 23:07:19 INFO mapred.JobClient:     Spilled Records=24
07 12/10/10 23:07:19 INFO mapred.JobClient:     Map output bytes=98460
08 12/10/10 23:07:19 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
09 12/10/10 23:07:19 INFO mapred.JobClient:     Combine input records=6564
10 12/10/10 23:07:19 INFO mapred.JobClient:     Map output records=6564
11 12/10/10 23:07:19 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input records=12

在mapper中合并的状况:

01 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input groups=12
02 12/10/10 23:09:09 INFO mapred.JobClient:     Combine output records=0
03 12/10/10 23:09:09 INFO mapred.JobClient:     Map input records=6565
04 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce shuffle bytes=210
05 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce output records=12
06 12/10/10 23:09:09 INFO mapred.JobClient:     Spilled Records=24
07 12/10/10 23:09:09 INFO mapred.JobClient:     Map output bytes=180
08 12/10/10 23:09:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
09 12/10/10 23:09:09 INFO mapred.JobClient:     Combine input records=0
10 12/10/10 23:09:09 INFO mapred.JobClient:     Map output records=12
11 12/10/10 23:09:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input records=12

计算结果:
(注意: 例子里使用的文件中的的温度是摄氏度*10的结果)

Non-Optimized Combiner       In-Mapper-Combiner Mapper
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77

结论

咱们用两种场景来演示了本地聚合,一个场景比较简单只要简单重用reducer做为combiner就能够,另外一个稍微复杂一些,必须对数据作必定的组织,这两种例子都充分证实了本地聚合可以极大提升处理过程的效率。

相关资源

相关文章
相关标签/搜索