hadoop笔记七:MapReduce程序实例

1.统计好友对数

1)数据

joe,    jon    
joe , kia    
joe, bob    
joe ,ali
kia,    joe    
kia ,jim    
kia, dee
dee    ,kia    
dee, ali
ali    ,dee    
ali, jim    
ali ,bob    
ali, joe    
ali ,jon
jon,    joe    
jon ,ali
bob,    joe    
bob ,ali    
bob, jim
jim    ,kia    
jim, bob    
jim ,ali

2)分析

从上面的文件格式与内容,有多是出现用户名和好友名交换位置的两组数据,这时候这就要去重了。java

好比说:apache

joe,  jon编程

jon,  joeapp

这样的数据,咱们只能保留一组。ide

3)代码实现

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FriendMapReduceData extends Configured implements Tool {

	static class FriendMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		Text key = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String[] strs = value.toString().split(",");
			if (strs.length == 2) {
				// 将每行中姓名按照大小排序,而后做为key进行输出
				String name1 = strs[0].replaceAll(" ", "");
				String name2 = strs[1].replaceAll(" ", "");
				this.key = new Text(name1.compareTo(name2) > 0 ? name1 + "," + name2 : name2 + "," + name1);
				context.write(this.key, NullWritable.get());
			}

		}
	}

	static class FriendReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values,
				Reducer<Text, NullWritable, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, "FriendMapReduceData");
		job.setJarByClass(this.getClass());

		job.setMapperClass(FriendMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);

		job.setReducerClass(FriendReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new FriendMapReduceData(), args));
	}
}

4)执行以及结果

提交数据文件到文件系统中函数

bin/hadoop fs -put /home/softwares/test/friend input/friend

执行oop

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.FriendMapReduceData -Dinput=input/friend -Doutput=output/friend

结果测试

2.词频统计

1)数据

Text 1: the weather is good
Text 2: today is good
Text 3: good weather is good
Text 4: today has good weatherthis

2)分析

解析每一个单词出现的次数spa

3)代码实现

package com.jf.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordMapReduceData extends Configured implements Tool {

	static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		private Text key = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// 解析每行中单词数量
			StringTokenizer words = new StringTokenizer(value.toString());
			while (words.hasMoreElements()) {
				this.key = new Text(words.nextToken());
				context.write(this.key, new IntWritable(1));
			}
		}
	}

	static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable intWritable : values) {
				count += intWritable.get();
			}
			context.write(key, new IntWritable(count));
		}
	}

	public int run(String[] args) throws Exception {
		// 构建做业输入和输出
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, "WordMapReduceData");
		job.setJarByClass(this.getClass());
		// 设置Mapper函数
		job.setMapperClass(WordMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 设置Reducer函数
		job.setReducerClass(WordReducer.class);
		// 设置输出格式
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 设置输入和输出目录
		FileInputFormat.addInputPath(job, input);
		FileOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new WordMapReduceData(), args));
	}
}

4)执行结果

提交文件到文件系统中

bin/hadoop fs -put /home/softwares/test/words input/words

执行

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.WordMapReduceData -Dinput=input/words -Doutput=output/words

结果

3.统计成绩

1)数据

chinese.txt

a|李一|88
a|王二|26
a|张三|99
a|刘四|58
a|陈五|45
a|杨六|66
a|赵七|78
a|黄八|100
a|周九|62
a|吴十|12

math.txt

c|李一|83
c|王二|36
c|张三|92
c|刘四|58
c|陈五|75
c|杨六|66
c|赵七|63
c|黄八|60
c|周九|62
c|吴十|72

english.txt

b|李一|36
b|王二|66
b|张三|86
b|刘四|56
b|陈五|43
b|杨六|86
b|赵七|99
b|黄八|80
b|周九|70
b|吴十|33

2)分析

有三个数据文件,按照以下格式统计每一个人成绩

3)代码实现

解析类

package com.jf.mapreduce;

import org.apache.hadoop.io.Text;

public class ScoreRecordParser {

	private String id;
	private String name;
	private String score;

	/**
	 * 解析数据
	 * @param line
	 * @return
	 */
	public boolean parse(String line) {
		String[] strs = line.split("\\|");
		if (strs.length != 3) {
			return false;
		}
		id = strs[0].trim();
		name = strs[1].trim();
		score = strs[2].trim();
		if (id.length() > 0 && name.length() > 0 && score.length() > 0) {
			return true;
		} else {
			return false;
		}
	}

	public boolean parse(Text value) {
		return parse(value.toString());
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getScore() {
		return score;
	}

	public void setScore(String score) {
		this.score = score;
	}
}

MapReduce类

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ScoreMapReduceData extends Configured implements Tool {

	static class ScoreMapper extends Mapper<LongWritable, Text, Text, Text> {
		
		private ScoreRecordParser parser = null;
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			parser = new ScoreRecordParser();
			System.out.println("map解析");
			//按照姓名进行分组
			if(parser.parse(value)){
				System.out.println("map:key="+parser.getName()+",value="+value.toString());
				context.write(new Text(parser.getName()),value);
			}
		}
	}

	static class ScoreReducer extends Reducer<Text, Text, Text, Text> {
		
		private ScoreRecordParser parser = null;
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			StringBuffer buffer = new StringBuffer();
			parser = new ScoreRecordParser();
			String id = null;
			int score = 0;
			double sum = 0;
			double avg = 0;
			for (Text value : values) {
				if(parser.parse(value)){
					id= parser.getId();
					score = Integer.parseInt(parser.getScore());
					if(id.equals("a")){
						buffer.append("语文:"+score+"\t");
						sum+=score;
					}else if(id.equals("b")){
						buffer.append("英文:"+score+"\t");
						sum+=score;
					}else if(id.equals("c")){
						buffer.append("数学:"+score+"\t");
						sum+=score;
					}
				}
			}
			avg = sum/3;
			buffer.append("总分:"+sum+"\t平均分:"+avg);
			System.out.println("reduce:key="+key.toString()+",value="+buffer.toString());
			context.write(key, new Text(buffer.toString()));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));

        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(ScoreMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(ScoreReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);
		return job.waitForCompletion(true)?0:1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new ScoreMapReduceData(), args));
	}

}

4)执行结果

由于执行有三个文件,因此须要把三个文件提交到一个文件夹中,执行时-Dinput指定文件夹便可

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.ScoreMapReduceData -Dinput=input/score -Doutput=output/score

由于有三个文件会放到三个数据块上,因此就会有3个map去执行

执行结果

4.倒排索引

1)数据

file1

Welcome to MapReduce World

file2

MapReduce is simple

file3

MapReduce is powerful is simple

file4

hello MapReduce Bye MapReduce

2)分析

有上面四个数据文件,要获得结果:

某个单词  file_1:出现次数,file_2:出现次数,file_3:出现次数,file_4:出现次数

及统计每一个单词在每一个文件中出现的次数。首先统计的是单词的次数,因此map输出时key能够设定为单词。因为统计的是单词在每一个文件中的次数,也就是说咱们能够首先经过map解析出来的就是每一个单词所在的文件。如:(is : file2),(is : file3),(is:file3) 

这样在通过洗牌以后到reduce的输入就是is:file2,file3,file3。这样就方便咱们统计每一个单词在各文件中出现的次数。

3)编程

package com.jf.mapreduce;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class InvertIndexMapReduce extends Configured implements Tool {

	/**
	 *hadoop在调用map和reduce类时采用的反射调用,内部类须要有实例,因此采用静态类
	 * @author Administrator
	 *
	 */
	static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {

		private Text fileName = null;

		@Override
		protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
			this.fileName = new Text(fileName);
		}

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");
			String tmp = null;
			while (tokenizer.hasMoreTokens()) {
				tmp = tokenizer.nextToken().trim();
				if (tmp.length() > 0) {
					context.write(new Text(tmp), this.fileName);
				}
			}
		}
	}

	static class IndexReducer extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String tmp = null;
			Map<String, Integer> map = new HashMap<String, Integer>();
			for (Text text : values) {
				tmp = text.toString();
				if (map.get(tmp) != null) {
					map.put(tmp, map.get(tmp) + 1);
				} else {
					map.put(tmp, 1);
				}
			}
			StringBuffer buffer = new StringBuffer();
			for (String mk : map.keySet()) {
				if (buffer.length() > 0) {
					buffer.append("," + mk + ":" + map.get(mk));
				} else {
					buffer.append(mk + ":" + map.get(mk));
				}
			}
			context.write(key, new Text(buffer.toString()));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, "InvertIndexMapReduce");
		job.setJarByClass(this.getClass());

		job.setMapperClass(IndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(IndexReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new InvertIndexMapReduce(), args));
	}

}

4)测试结果

上传文件

执行命令

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.InvertIndexMapReduce -Dinput=input/index -Doutput=output/index

执行结果

File System Counters
		FILE: Number of bytes read=249
		FILE: Number of bytes written=698080
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=549
		HDFS: Number of bytes written=168
		HDFS: Number of read operations=18
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=4
	Job Counters 
		Launched map tasks=4
		Launched reduce tasks=2
		Data-local map tasks=4
		Total time spent by all maps in occupied slots (ms)=131026
		Total time spent by all reduces in occupied slots (ms)=31874
		Total time spent by all map tasks (ms)=131026
		Total time spent by all reduce tasks (ms)=31874
		Total vcore-seconds taken by all map tasks=131026
		Total vcore-seconds taken by all reduce tasks=31874
		Total megabyte-seconds taken by all map tasks=134170624
		Total megabyte-seconds taken by all reduce tasks=32638976
	Map-Reduce Framework
		Map input records=4
		Map output records=16
		Map output bytes=205
		Map output materialized bytes=285
		Input split bytes=444
		Combine input records=0
		Combine output records=0
		Reduce input groups=9
		Reduce shuffle bytes=285
		Reduce input records=16
		Reduce output records=9
		Spilled Records=32
		Shuffled Maps =8
		Failed Shuffles=0
		Merged Map outputs=8
		GC time elapsed (ms)=2090
		CPU time spent (ms)=7370
		Physical memory (bytes) snapshot=1040441344
		Virtual memory (bytes) snapshot=5057056768
		Total committed heap usage (bytes)=597049344
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=105
	File Output Format Counters 
		Bytes Written=168

结果文件

5.共现矩阵

第一步:统计好友列表

1)数据

friendList.txt

joe,    jon    
joe , kia    
joe, bob    
joe ,ali
kia,    joe    
kia ,jim    
kia, dee
dee    ,kia    
dee, ali
ali    ,dee    
ali, jim    
ali ,bob    
ali, joe    
ali ,jon
jon,    joe    
jon ,ali
bob,    joe    
bob ,ali    
bob, jim
jim    ,kia    
jim, bob    
jim ,ali

2)分析

从每队好友关系中,获取每一个人的好友列表

3)编程

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FriendListMapReduceData extends Configured implements Tool {

    static class FriendListMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] names = line.replaceAll(" ", "").split(",");
            if (names.length == 2) {
                context.write(new Text(names[0]), new Text(names[1]));
            }
        }
    }

    static class FriendListReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            StringBuffer buffer = new StringBuffer();
            for (Text text : values) {
                buffer.append("," + text.toString());
            }
            context.write(key, new Text(buffer.toString()));
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Path input = new Path(conf.get("input"));
        Path output = new Path(conf.get("output"));

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(FriendListMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(FriendListReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job, input);
        TextOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new FriendListMapReduceData(), args));
    }
}

4)结果

第二步:把每个人的好友列表两两组成一对,统计每队出现的次数

1)数据

friendList2.txt

bob	,joe,jim,ali
jon	,ali,joe
kia	,dee,jim,joe
ali	,jon,joe,bob,jim,dee
dee	,ali,kia
jim	,ali,bob,kia
joe	,ali,bob,kia,jon

2)分析

除掉本人之外,好友列表里面的每队都组成一个好友对

如:A,B,C,D则会组成的好友对有,B-C,B-D,C-D

3)编程

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FriendListMapReduceData2 extends Configured implements Tool {

	static class FriendListMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] names = line.replaceAll(" ", "").split(",");
			//这里须要注意,双层循环,匹配出好友列表里面每两个名字都有机会组成一对好友
			for (int i = 1; i < names.length - 1; i++) {
				for (int j = 1; j < names.length - i; j++) {
					//这里比较一下,让一对好友造成惟一的key,避免出现A-B,B-A的状况出现
					if (names[i].compareTo(names[i + j]) >= 0) {
						System.out.println(names[i] + ":" + names[i + j]);
						context.write(new Text(names[i] + ":" + names[i + j]), new IntWritable(1));
					} else {
						context.write(new Text(names[i + j] + ":" + names[i]), new IntWritable(1));
					}
				}

			}
		}
	}

	static class FriendListReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable value : values) {
				count += value.get();
			}
			context.write(key, new IntWritable(count));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(FriendListMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setReducerClass(FriendListReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new FriendListMapReduceData2(), args));
	}
}

4)结果

相关文章
相关标签/搜索