目录java
Map阶段:若干个maptask并发实例,彻底并行运行,互不相干。apache
Reduce阶段:若干个reducetask并发实例,彻底并行运行,可是他们的数据依赖于Map阶段的输出。编程
注意:MapReduce模型只能包含一个map阶段和一个reduce阶段;若是业务逻辑很是复杂,就只能使用多个MapReduce程序,串行运行。windows
1. 编写Mapper缓存
// 注意:hadoop1.0版本中是mapred下包,hadoop2.0是mapreduce下的包 import org.apache.hadoop.mapreduce.Mapper; // 继承Mapper父类,泛型为输入和输出的<K, V>;并重写父类的map方法 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 每行文本都会执行一次map方法. * * @param key 文本偏移量. * @param value 一行文本. * @param context 上下文对象. * @throws IOException . * @throws InterruptedException 当阻塞方法收到中断请求时抛出. */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\\s+"); // 拆分一行中的单词 for (String word : words) { context.write(new Text(word), new IntWritable(1)); // 输出一个<K, V> } } }
2. 编写Reducer并发
// 继承Reducer类,输入的<K, V>类型为map端输出<K, V>类型 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * 相同的key只会执行一次reduce方法 * * @param key map端输出的key * @param values 相同key的value集合 * @param context 上下文对象 * @throws IOException . * @throws InterruptedException . */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 当前的 key出现了多少次 int count = 0; // values中的数据是反序列化过来的,最好不要直接使用values中的bean for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); // 输出 } }
3. 编写Driverapp
// Driver的做用是将这个Mapper和Reducer程序打包成一个Job,并提交该Job public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 不须要为 conf设置HDFS等参数,由于conf会调用系统默认的配置文件, // 因此这个mr程序在哪里运行就会调用哪里的配置文件,在集群上运行就会使用集群的设置文件。 Configuration conf = new Configuration(); // 删除输出文件,或者手动删除 // FileHelper.deleteDir(args[1], conf); // 根据配置文件实例化一个 Job,并取个名字 Job job = Job.getInstance(conf, "MyWordCount"); // 设置 Jar的位置 job.setJarByClass(WordCountDriver.class); // 设置 Mapper运行类,以及输出的key和value的类型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置 Reducer的运行类,以及输出的key和value的类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置分区(能够不用设置) // 当设置的分区数大于实际分区数时,能够正常执行,多出的分区为空文件; // 当设置的分区数小于实际分区数时,会报错。 job.setNumReduceTasks(4); // 若是设置的 numReduceTasks大于 1,而又没有设置自定义的 PartitionerClass // 则会调用系统默认的 HashPartitioner实现类来计算分区。 job.setPartitionerClass(WordCountPartitioner.class); // 设置combine job.setCombinerClass(WordCountCombiner.class); // 设置输入和输出文件的位置 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务,等待执行结果,参数为 true表示打印信息 boolean result = job.waitForCompletion(true); // 根据 job的返回值自定义退出 System.exit(result?0:1); } }
4. 运行框架