许多mapreduce做业会受限与集群的带宽,所以尽可能下降map和reduce任务之间的数据传输是有必要的。Hadoop容许用户针对map任务的输出指定一个combiner函数处理map任务的输出,并做为reduce函数的输入。由于combine是优化方案,因此Hadoop没法肯定针对map输出记录须要调用多少次combine函数。in the other word,无论调用多少次combine函数,reducer的输出结果都是同样的。
The contract for the combiner function constrains the type of function that may be used。
combiner函数协议会制约可用的函数类型。举个例子:app
假设第一个map输出以下:函数
(1950, 0) (1950, 20) (1950, 10)
第二个map输出以下:oop
(1950, 25) (1950, 15)
reduce函数被调用时,其输入是优化
(1950, [0, 20, 10, 25, 15])
结果:code
(1950, 25)
若是调用combine函数,像reduce函数同样去寻找 每一个map的输出的最大温度。那么输出结果应该是:orm
(1950, [20, 25])
reduce 输出结果和之前同样。可用经过下面的表达式来讲明气温数值的函数调用:get
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
并非全部函数都有这个属性。例如,咱们计算平均气温,就不能使用平均函数做为combiner。it
mean(0, 20, 10, 25, 15) = 14
可是:io
mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15
combiner函数不能取代reducer。但它能有效减小mapper和reducer之间的数据传输量。table
指定一个 combiner
Job job = Job.getInstance(); job.setJarByClass(MaxTemperatureJob.class); job.setJobName("max temperature"); //方法为何不保持一致,不是一我的写的? FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); //设置combiner job.setCombinerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // job.setInputFormatClass(); System.out.println(job.waitForCompletion(true) ? 0 : 1);