joe, jon joe , kia joe, bob joe ,ali kia, joe kia ,jim kia, dee dee ,kia dee, ali ali ,dee ali, jim ali ,bob ali, joe ali ,jon jon, joe jon ,ali bob, joe bob ,ali bob, jim jim ,kia jim, bob jim ,ali
从上面的文件格式与内容,有多是出现用户名和好友名交换位置的两组数据,这时候这就要去重了。java
好比说:apache
joe, jon编程
jon, joeapp
这样的数据,咱们只能保留一组。ide
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendMapReduceData extends Configured implements Tool { static class FriendMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text key = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String[] strs = value.toString().split(","); if (strs.length == 2) { // 将每行中姓名按照大小排序,而后做为key进行输出 String name1 = strs[0].replaceAll(" ", ""); String name2 = strs[1].replaceAll(" ", ""); this.key = new Text(name1.compareTo(name2) > 0 ? name1 + "," + name2 : name2 + "," + name1); context.write(this.key, NullWritable.get()); } } } static class FriendReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, "FriendMapReduceData"); job.setJarByClass(this.getClass()); job.setMapperClass(FriendMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(FriendReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new FriendMapReduceData(), args)); } }
提交数据文件到文件系统中函数
bin/hadoop fs -put /home/softwares/test/friend input/friend
执行oop
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.FriendMapReduceData -Dinput=input/friend -Doutput=output/friend
结果测试
Text 1: the weather is good
Text 2: today is good
Text 3: good weather is good
Text 4: today has good weatherthis
解析每一个单词出现的次数spa
package com.jf.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordMapReduceData extends Configured implements Tool { static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text key = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 解析每行中单词数量 StringTokenizer words = new StringTokenizer(value.toString()); while (words.hasMoreElements()) { this.key = new Text(words.nextToken()); context.write(this.key, new IntWritable(1)); } } } static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count += intWritable.get(); } context.write(key, new IntWritable(count)); } } public int run(String[] args) throws Exception { // 构建做业输入和输出 Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, "WordMapReduceData"); job.setJarByClass(this.getClass()); // 设置Mapper函数 job.setMapperClass(WordMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置Reducer函数 job.setReducerClass(WordReducer.class); // 设置输出格式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new WordMapReduceData(), args)); } }
提交文件到文件系统中
bin/hadoop fs -put /home/softwares/test/words input/words
执行
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.WordMapReduceData -Dinput=input/words -Doutput=output/words
结果
chinese.txt
a|李一|88
a|王二|26
a|张三|99
a|刘四|58
a|陈五|45
a|杨六|66
a|赵七|78
a|黄八|100
a|周九|62
a|吴十|12
math.txt
c|李一|83
c|王二|36
c|张三|92
c|刘四|58
c|陈五|75
c|杨六|66
c|赵七|63
c|黄八|60
c|周九|62
c|吴十|72
english.txt
b|李一|36
b|王二|66
b|张三|86
b|刘四|56
b|陈五|43
b|杨六|86
b|赵七|99
b|黄八|80
b|周九|70
b|吴十|33
有三个数据文件,按照以下格式统计每一个人成绩
解析类
package com.jf.mapreduce; import org.apache.hadoop.io.Text; public class ScoreRecordParser { private String id; private String name; private String score; /** * 解析数据 * @param line * @return */ public boolean parse(String line) { String[] strs = line.split("\\|"); if (strs.length != 3) { return false; } id = strs[0].trim(); name = strs[1].trim(); score = strs[2].trim(); if (id.length() > 0 && name.length() > 0 && score.length() > 0) { return true; } else { return false; } } public boolean parse(Text value) { return parse(value.toString()); } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getScore() { return score; } public void setScore(String score) { this.score = score; } }
MapReduce类
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ScoreMapReduceData extends Configured implements Tool { static class ScoreMapper extends Mapper<LongWritable, Text, Text, Text> { private ScoreRecordParser parser = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { parser = new ScoreRecordParser(); System.out.println("map解析"); //按照姓名进行分组 if(parser.parse(value)){ System.out.println("map:key="+parser.getName()+",value="+value.toString()); context.write(new Text(parser.getName()),value); } } } static class ScoreReducer extends Reducer<Text, Text, Text, Text> { private ScoreRecordParser parser = null; @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer buffer = new StringBuffer(); parser = new ScoreRecordParser(); String id = null; int score = 0; double sum = 0; double avg = 0; for (Text value : values) { if(parser.parse(value)){ id= parser.getId(); score = Integer.parseInt(parser.getScore()); if(id.equals("a")){ buffer.append("语文:"+score+"\t"); sum+=score; }else if(id.equals("b")){ buffer.append("英文:"+score+"\t"); sum+=score; }else if(id.equals("c")){ buffer.append("数学:"+score+"\t"); sum+=score; } } } avg = sum/3; buffer.append("总分:"+sum+"\t平均分:"+avg); System.out.println("reduce:key="+key.toString()+",value="+buffer.toString()); context.write(key, new Text(buffer.toString())); } } public int run(String[] args) throws Exception { Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(ScoreMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new ScoreMapReduceData(), args)); } }
由于执行有三个文件,因此须要把三个文件提交到一个文件夹中,执行时-Dinput指定文件夹便可
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.ScoreMapReduceData -Dinput=input/score -Doutput=output/score
由于有三个文件会放到三个数据块上,因此就会有3个map去执行
执行结果
file1
Welcome to MapReduce World
file2
MapReduce is simple
file3
MapReduce is powerful is simple
file4
hello MapReduce Bye MapReduce
有上面四个数据文件,要获得结果:
某个单词 file_1:出现次数,file_2:出现次数,file_3:出现次数,file_4:出现次数
及统计每一个单词在每一个文件中出现的次数。首先统计的是单词的次数,因此map输出时key能够设定为单词。因为统计的是单词在每一个文件中的次数,也就是说咱们能够首先经过map解析出来的就是每一个单词所在的文件。如:(is : file2),(is : file3),(is:file3)
这样在通过洗牌以后到reduce的输入就是is:file2,file3,file3。这样就方便咱们统计每一个单词在各文件中出现的次数。
package com.jf.mapreduce; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class InvertIndexMapReduce extends Configured implements Tool { /** *hadoop在调用map和reduce类时采用的反射调用,内部类须要有实例,因此采用静态类 * @author Administrator * */ static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> { private Text fileName = null; @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); this.fileName = new Text(fileName); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString(), " "); String tmp = null; while (tokenizer.hasMoreTokens()) { tmp = tokenizer.nextToken().trim(); if (tmp.length() > 0) { context.write(new Text(tmp), this.fileName); } } } } static class IndexReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String tmp = null; Map<String, Integer> map = new HashMap<String, Integer>(); for (Text text : values) { tmp = text.toString(); if (map.get(tmp) != null) { map.put(tmp, map.get(tmp) + 1); } else { map.put(tmp, 1); } } StringBuffer buffer = new StringBuffer(); for (String mk : map.keySet()) { if (buffer.length() > 0) { buffer.append("," + mk + ":" + map.get(mk)); } else { buffer.append(mk + ":" + map.get(mk)); } } context.write(key, new Text(buffer.toString())); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, "InvertIndexMapReduce"); job.setJarByClass(this.getClass()); job.setMapperClass(IndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(IndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new InvertIndexMapReduce(), args)); } }
上传文件
执行命令
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.InvertIndexMapReduce -Dinput=input/index -Doutput=output/index
执行结果
File System Counters FILE: Number of bytes read=249 FILE: Number of bytes written=698080 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=549 HDFS: Number of bytes written=168 HDFS: Number of read operations=18 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Launched map tasks=4 Launched reduce tasks=2 Data-local map tasks=4 Total time spent by all maps in occupied slots (ms)=131026 Total time spent by all reduces in occupied slots (ms)=31874 Total time spent by all map tasks (ms)=131026 Total time spent by all reduce tasks (ms)=31874 Total vcore-seconds taken by all map tasks=131026 Total vcore-seconds taken by all reduce tasks=31874 Total megabyte-seconds taken by all map tasks=134170624 Total megabyte-seconds taken by all reduce tasks=32638976 Map-Reduce Framework Map input records=4 Map output records=16 Map output bytes=205 Map output materialized bytes=285 Input split bytes=444 Combine input records=0 Combine output records=0 Reduce input groups=9 Reduce shuffle bytes=285 Reduce input records=16 Reduce output records=9 Spilled Records=32 Shuffled Maps =8 Failed Shuffles=0 Merged Map outputs=8 GC time elapsed (ms)=2090 CPU time spent (ms)=7370 Physical memory (bytes) snapshot=1040441344 Virtual memory (bytes) snapshot=5057056768 Total committed heap usage (bytes)=597049344 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=105 File Output Format Counters Bytes Written=168
结果文件
friendList.txt
joe, jon joe , kia joe, bob joe ,ali kia, joe kia ,jim kia, dee dee ,kia dee, ali ali ,dee ali, jim ali ,bob ali, joe ali ,jon jon, joe jon ,ali bob, joe bob ,ali bob, jim jim ,kia jim, bob jim ,ali
从每队好友关系中,获取每一个人的好友列表
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendListMapReduceData extends Configured implements Tool { static class FriendListMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] names = line.replaceAll(" ", "").split(","); if (names.length == 2) { context.write(new Text(names[0]), new Text(names[1])); } } } static class FriendListReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer buffer = new StringBuffer(); for (Text text : values) { buffer.append("," + text.toString()); } context.write(key, new Text(buffer.toString())); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(FriendListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(FriendListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new FriendListMapReduceData(), args)); } }
friendList2.txt
bob ,joe,jim,ali jon ,ali,joe kia ,dee,jim,joe ali ,jon,joe,bob,jim,dee dee ,ali,kia jim ,ali,bob,kia joe ,ali,bob,kia,jon
除掉本人之外,好友列表里面的每队都组成一个好友对
如:A,B,C,D则会组成的好友对有,B-C,B-D,C-D
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendListMapReduceData2 extends Configured implements Tool { static class FriendListMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] names = line.replaceAll(" ", "").split(","); //这里须要注意,双层循环,匹配出好友列表里面每两个名字都有机会组成一对好友 for (int i = 1; i < names.length - 1; i++) { for (int j = 1; j < names.length - i; j++) { //这里比较一下,让一对好友造成惟一的key,避免出现A-B,B-A的状况出现 if (names[i].compareTo(names[i + j]) >= 0) { System.out.println(names[i] + ":" + names[i + j]); context.write(new Text(names[i] + ":" + names[i + j]), new IntWritable(1)); } else { context.write(new Text(names[i + j] + ":" + names[i]), new IntWritable(1)); } } } } } static class FriendListReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(FriendListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(FriendListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new FriendListMapReduceData2(), args)); } }