如何将一个反向引用索引的程序的Reducer输出的类型改成IntWritableapache
public static class Reduce extends MapReduceBaseapp
implements Reducer<Text, Text, Text, IntWritable> {框架
public void reduce(Text key, Iterator<Text> values,oop
OutputCollector<Text, IntWritable> output,性能
Reporter reporter) throws IOException {spa
int count = 0;orm
while (values.hasNext()) {对象
values.next();索引
count++;接口
}
output.collect(key, new IntWritable(count));
}
}
计算不一样引用次数专利的数目
之因此选择K2、V2、K3和V3的数据为IntWritable类型,是由于它们的数据必然为整数,而且使用IntWritable比Text更高效
public class CitationHistogram extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, IntWritable, IntWritable> {
private final static IntWritable uno = new IntWritable(1);
private IntWritable citationCount = new IntWritable();
public void map(Text key, Text, value,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException {
citationCount.set(Integer.parseInt(value.toString()));
output.collect(citationCount, uno);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
public void reduce(IntWritable key, Iterator<IntWritable> values,
OutputCollector<IntWritable, IntWritable > output,
Reporter reporter) throws IOException {
int count = 0;
while (values.hasNext()) {
count += values.next().get();
}
output.collect(key, new IntWritable(count));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, CitationHistogram.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("CitationHistogram");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFomrat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CitationHistogram(), args);
system.exit(res);
}
}
须要说明的几点:
1、类名为CitationHistogram
2、输入格式为KeyValueTextInputFormat,输出格式为TextOutputFormat
3、KeyValueTextInputFormat默认以"<tab>"制表符进行分割,可使用job.set("key.value.separator.in.input.line", ","),修改分隔符为",",其余须要自行修改
4、MapClass类使用了两个私有成员变量uno 和citationCount,为何要这样定义呢?
出于对效率的考虑citationCount和uno的定义被放在类中而不是方法中,有多少记录,map()方法就会被调用多少次 (对每一个JVM而言,就是一个分片中的记录数)。减小在map()方法中生成的对象个数能够提升性能,并减小垃圾回收。因为citationCount和uno被传递给output.collect(),咱们依赖output.collect()方法的约定不会修改这两个对象。
5、Reducer计算每一个key对应的值的总数,这彷佛并不高效,由于咱们知道全部的值都是1,为何咱们还要去加它们呢?缘由在于,它会让咱们在之后能更容易地增长一个combiner来提升性能。
Hadoop API是改了又改,变了又变
1、0.20版本被视为旧API和将来稳定API之间的过渡
2、为了保持向后兼容,版本0.20以及以后的版本支持下一代API并将旧API标记为弃用
并不推荐立刻转向新的API
1、0.20版本,许多Hadoop自身的类库尚未基于新的API重写,若是MapReduce代码使用0.20中的新API,这些类将没法被使用。
2、在0.20版本以前,0.18.3仍被不少人认为是最完备与最稳定的版本
3、虽然在新版本的API中有所改变,可是改变的仅仅影响基础的MapReduce模板,咱们能够基于新API所作的改变,重写这个模板以备未来使用。
你可能会奇怪为何不提0.19?
通常的意见认为它的初始版本问题比较多,有许多bug,一些副版本试图解决这个问题,但社区但愿直接跳到版本0.20
新版本的API作了哪些改动?
1、在新的API中org.apahce.hadoop.mapred的许多类都被移走了,多数被放入org.apache.hadoop.mapreduce中,并且类库都放在org.apache.hadoop.mapreduce.lib的一个包里。当转为使用新API时,org.apache.hadoop.mapred下全部的类的import声明(或者彻底引用)就不存在了,它们都被弃用
2、新API中最有益的变化是引入了上下文对象context,最直接的影响在于替换了map()和reduce()方法中使用的OutputCollector和Reporter对象。如今将经过调用Context.writer()而不是OutputCollector.collect()输出键/值对。深远的影响是统一了应用代码和MapReduce框架之间的通讯,并固定了Mapper和Reducer的API,使得添加新功能时不会改变基本方法签名,新的功能仅仅时在context对象上增长的方法,在引入这些方法以前写的程序不会感知到这些新方法,它们能够在更新的版本中继续编译与运行
3、新的map()和reduce()方法分别被包含在新的抽象Mapper和Reducer中,它们取代了原始API中的Mapper和Reducer接口(org.apache.hadoop.mapred.Mapper和org.apache.hadoop.mapred.Reducer)。新的抽象类也替换了MapReudceBase类,使之被弃用
4、新的map()和reduce()方法多了一两处细微的改变,它们能够抛出InterruptedException而非单一的IOException,并且,reduce()方法再也不以Iterator而以Iterable来接受一个值的列表,这样更容易使用Java的foreach语义来实现迭代。
回顾一下原始的API中的签名
public static class MapClass extends MapReduceBase
implements Mapper<K1, V1, K2, V2> {
public void map(K1 key, V1 value,
OutputCollector<K2, V2> output,
Reporter reporter) throws IOException { }
}
public static class Reduce extends MapReduceBase
implements Reducer<K2, V2, K3, V3> {
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output,
Reporter reporter) throws IOException { }
}
新的API中的签名,体会与上面签名的不一样
public static class MapClass extends Mapper<K1, V2, K2, V2> {
public void map(K1 key, V1 value, Context context)
throws IOException, InterruptedException { }
}
public static class Reduce extends Reducer<K2, V2, K3, V3> {
public void reduce(K2 key, Iterable<V2> values, Context context)
throws IOException, InterruptedException { }
}
你还须要改变driver中的一些内容来支持新的API
1、在新的API中JobConf和JobClient被替换了,它们的功能已经被放入Configuration类(它本来是JobConf的父类)和一个新的类Job中
2、Configuration类纯粹为了配置做业而设,而Job类负责定义和控制一个做业的执行
3、好比,setOutputKeyClass()和setOutputValueClass()等方法被从JobConf转移到了Job
4、做业的构造和提交执行如今放在Job中,本来须要使用JobConf来构造一个做业:
JobConf job = new JobConf(conf, MyJob.class);
job.setJobName("MyJob");
而如今可经过Job类完成:
Job job = new Job(conf, "MyJob");
job.setJarByClass(MyJob.class);
之前是经过JobClient提交做业去执行:
JobClient.runJob(job);
如今一样经过Job类来完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
基于版本0.20及其以上的新API重写的Hadoop基础程序模板(Hadoop 1.X也适用)
public class MyJob extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWirtable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citation = value.toString().split(",");
context.write(new Text(citation[1]), new Text(citation[0]));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
// Iterable类型容许foreach循环
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
if (csv.length() > 0) csv += ",";
csv += val.toString();
}
context.write(key, new Text(csv));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "MyJob");
job.setJarByClass(MyJob.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce,.class);
// 兼容InputFormat类
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}
1、这段代码实现了反向索引功能
2、KeyValueTextInputFormat类未被移植到版本0.20的新API中,重写这个模板咱们不得不使用TextInputFormat