joe, jon
joe , kia
joe, bob
joe ,ali
kia, joe
kia ,jim
kia, dee
dee ,kia
dee, ali
ali ,dee
ali, jim
ali ,bob
ali, joe
ali ,jon
jon, joe
jon ,ali
bob, joe
bob ,ali
bob, jim
jim ,kia
jim, bob
jim ,ali
有一个friends.txt文件,里面的一行的格式是:java
用户名,好友名apache
1)需求app
统计有多少对好友ide
2)分析函数
从上面的文件格式与内容,有多是出现用户名和好友名交换位置的两组数据,这时候这就要去重了。oop
好比说:测试
joe, jonthis
jon, joespa
这样的数据,咱们只能保留一组。3d
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class DuplicateData_0010 extends Configured implements Tool{ static class DuplicateDataMapper extends Mapper<LongWritable,Text,Text,NullWritable>{ Text key = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); if (split.length==2){ String name_1 = split[0].trim(); String name_2 = split[1].trim(); if (!name_1.equals(name_2)){ this.key.set( name_1.compareTo(name_2)>0? name_1+","+name_2: name_2+","+name_1); context.write(this.key,NullWritable.get()); } } } } static class DuplicatteDataReducer extends Reducer<Text,NullWritable,Text,NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } @Override public int run(String[] strings) throws Exception { Configuration conf = getConf(); Path input= new Path(conf.get("iniput")); Path output= new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName() + "Lance"); job.setJarByClass(this.getClass()); job.setMapperClass(DuplicateDataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(DuplicatteDataReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new DuplicateData_0010(),args)); } }
设有4组原始文本数据:
Text 1: the weather is good Text 2: today is good
Text 3: good weather is good Text 4: today has good weather
1)需求
求每篇文章每一个单词出现的次数
2)分析
第一:
第二:
预期出现的结果:
1)编写一个CountWordMapper类去实现Mapper
/** *经过继承org.apache.hadoop.mapreduce.Mapper编写本身的Mapper */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one=new IntWritable(1); //统计使用变量 private Text word=new Text(); //单词变量 /** * key:当前读取行的偏移量 * value: 当前读取的行 * context:map方法执行时上下文 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub StringTokenizer words=new StringTokenizer(value.toString()); while(words.hasMoreTokens()){ word.set(words.nextToken()); context.write(word, one); } } }
2)编写一个CountWordReducer类去实现Reducer
/** * 经过继承org.apache.hadoop.mapreduce.Reducer编写本身的Reducer */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * key:待统计的word * values:待统计word的全部统计标识 * context:reduce方法执行时的上下文 */ @Override protected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int count=0; for(IntWritable one:values){ count+=one.get(); } context.write(key, new IntWritable(count)); } }
3)编写一个WordCount做业调度的驱动程序WordCountDriver
/** * WordCount做业调度的驱动程序 * */ public class WordCountDriver { public static void main(String[] args) throws Exception { // 构建新的做业 Configuration conf=new Configuration(); Job job = Job.getInstance(conf, "Word Count"); job.setJarByClass(WordCountDriver.class); // 设置Mapper和Reducer函数 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 设置输出格式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // ᨀ交做业执行 System.exit(job.waitForCompletion(true)?0:1); } }
1)前期准备
将程序打成jar包: wordcount.jar
准备好Text 1-4文件
2)运行
yarn jar wordcount.jar com.briup.WordCount input output
chinese.txt
a|李一|88 a|王二|26 a|张三|99 a|刘四|58 a|陈五|45 a|杨六|66 a|赵七|78 a|黄八|100 a|周九|62 a|吴十|12
english.txt
b|李一|36 b|王二|66 b|张三|86 b|刘四|56 b|陈五|43 b|杨六|86 b|赵七|99 b|黄八|80 b|周九|70 b|吴十|33
math.txt
c|李一|83 c|王二|36 c|张三|92 c|刘四|58 c|陈五|75 c|杨六|66 c|赵七|63 c|黄八|60 c|周九|62 c|吴十|72
我看查看chinese.txt查看数据格式:
a表明语文:李一表明名字:88表明语文成绩
根据上面的数据,统计一下分数,格式以下:
1)编写一个解析类解析上面的每门课的数据
ScoreRecordParser
import org.apache.hadoop.io.Text; public class ScoreRecordParser{ private String id; private String name; private String score; public boolean parse(String line){ String[] strs=line.split("[|]"); if(strs.length<3){ return false; } id=strs[0].trim(); name=strs[1].trim(); score=strs[2].trim(); if(id.length()>0&&name.length()>0&&score.length()>0){ return true; }else{ return false; } } public boolean parse(Text line){ return parse(line.toString()); } public String getId(){ return id; } public void setId(String id){ this.id=id; } public String getName(){ return name; } public void setName(String name){ this.name=name; } public String getScore(){ return score; } public void setScore(String score){ this.score=score; } }
2)实现需求
CalculateScore_0010
import com.briup.bd1702.hadoop.mapred.utils.ScoreRecordParser; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CalculateScore_0010 extends Configured implements Tool{ private static ScoreRecordParser parser=new ScoreRecordParser(); static class CalculateScoreMapper extends Mapper<LongWritable,Text,Text,Text>{ private Text key=new Text(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ if(parser.parse(value)){ this.key.set(parser.getName()); context.write(this.key,value); } } } static class CalculateScoreReducer extends Reducer<Text,Text,Text,Text>{ private Text value=new Text(); @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ StringBuffer buffer=new StringBuffer(); double sum=0; for(Text text:values){ if(parser.parse(text)){ String id=parser.getId(); String score=parser.getScore(); switch(id){ case "a":{ buffer.append("语文:"+score+"\t"); break; } case "b":{ buffer.append("英语:"+score+"\t"); break; } case "c":{ buffer.append("数学:"+score+"\t"); break; } } sum+=Double.parseDouble(score); } } buffer.append("总分:"+sum+"\t平均分:"+(sum/3)); this.value.set(buffer.toString()); context.write(key,this.value); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(CalculateScoreMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(CalculateScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_CalculateScore_0010(),args)); } }
这里执行由于有三个文件,咱们用一个目录去存储,而后在-Dinput中指定这个目录就能够了 。
在上面的三个文件中,都是特别小的,因此三个文件要用三个数据块去存储,而后用三个map去执行者三个文件。
首先知道什么是倒排索引?
好比所咱们有file_1到file_4这四篇文章,咱们须要求出:某个单词,在每一篇文章出现的次数
好比说输出格式是这样的:
某个单词 file_1:出现次数,file_2:出现次数,file_3:出现次数,file_4:出现次数
file_1
Welcome to MapReduce World
file_2
MapReduce is simple
file_3
MapReduce is powerful is simple
file_4
hello MapReduce Bye MapReduce
1)需求
实现文件输出格式以下:
某个单词 file_1:出现次数,file_2:出现次数,file_3:出现次数,file_4:出现次数
2)分析
好比MapReduce这个单词,咱们分析一下:
在map端出来的格式:
注意:f1,f2,f3,f4表明文件名
通过洗牌以后,进入reduce的数据格式:
在reduce怎么处理呢?
咱们构建一个Map集合用来存放某个路径在这个集合中出现的次数:
最后就能够造成咱们想要的文件:
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class InvertIndex_0010 extends Configured implements Tool{ static class InvertIndexMapper extends Mapper<LongWritable,Text,Text,Text>{ private Text word=new Text(); private Text file=new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException{ String fileName=((FileSplit)context.getInputSplit()) .getPath().getName(); file.set(fileName); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{ StringTokenizer words= new StringTokenizer(value.toString()," "); String item=null; while(words.hasMoreTokens()){ item=words.nextToken(); if(item.trim().length()>=1){ word.set(item.trim()); context.write(word,file); } } } } static class InvertIndexReducer extends Reducer<Text,Text,Text,Text>{ private Map<String,Integer> index=new HashMap<>(); private Text value=new Text(); @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ index.clear(); for(Text file:values){ String fileName=file.toString(); if(index.get(fileName)!=null){ index.put(fileName,index.get(fileName)+1); }else{ index.put(fileName,1); } } StringBuffer buffer=new StringBuffer(); Set<Entry<String,Integer>> entries=index.entrySet(); for(Entry<String,Integer> entry:entries){ buffer.append(","+entry.getKey().toString()+":"+entry.getValue().toString()); } this.value.set(buffer.toString()); context.write(key,value); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(InvertIndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(InvertIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_InvertIndex_0010(),args)); } }
注意:
这里使用了一个StringTokenizer来分割数据:
StringTokenizer words= new StringTokenizer(value.toString()," "); String item=null; while(words.hasMoreTokens()){ item=words.nextToken(); if(item.trim().length()>=1){ word.set(item.trim()); context.write(word,file); } }
首先咱们要知道什么是共现次数?
咱们分析一个用户数据来解释:
joe, jon
joe , kia
joe, bob
joe ,ali
kia, joe
kia ,jim
kia, dee
dee ,kia
dee, ali
ali ,dee
ali, jim
ali ,bob
ali, joe
ali ,jon
jon, joe
jon ,ali
bob, joe
bob ,ali
bob, jim
jim ,kia
jim, bob
jim ,ali
上面这个数据中,在一行中左边是一个用户,右边是它的好友。
那咱们能够根据上面的数据列出全部用户的好友列表:
ali,bob,jim,dee,jon,joe
bob,jim,ali,joe
dee,kia,ali
jim,ali,bob,kia
joe,ali,bob,kia,jon
jon,joe,ali
kia,dee,jim,joe
接下来咱们把每一个用户的好友列表每两两组成一对,在全部用户的好友列表中去计算,这两两组成的一对共出现了几回。
好比说:
bob,jim组成了一组,在余下的好友列表中两两组成去计算共出现了几回。(除了用户自己),也就是下面的数据。
dee,jon,joe
jim,ali,joe
kia,ali
ali,bob,kia
ali,bob,kia,jon
joe,ali
dee,jim,joe
接下来就是jin,dee。而后是dee,jon依次类推。。。
从上面的分析咱们能够得出预期的结果为:
ali,bob 2 ali,jim 1 ali,joe 2 ali,jon 1 ali,kia 3 bob,dee 1 bob,jim 1 bob,joe 1 bob,jon 2 bob,kia 2 dee,jim 2 dee,joe 2 dee,jon 1 jim,joe 3 jim,jon 1 joe,jon 1 jon,kia 1
咱们能够分两步去写,也就是分两个MapReduce任务。第一个MapReduce计算好友列表。第二个在每两两组成一组,计算这一组所出现的次数。
1)计算好友列表
import com.briup.bd1702.hadoop.mapred.utils.FriendRecordParser; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendList_0010 extends Configured implements Tool{ static class FriendListMapper extends Mapper<LongWritable,Text,Text,Text>{ private Text userName=new Text(); private Text friendName=new Text(); private FriendRecordParser parser=new FriendRecordParser(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ parser.parse(value); if(parser.isValid()){ userName.set(parser.getUserName()); friendName.set(parser.getFriendName()); context.write(userName,friendName); System.out.println("----"+userName+"----"+friendName+"----"); } } } static class FriendListReducer extends Reducer<Text,Text,Text,Text>{ private Text friendsNames=new Text(); @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException{ StringBuffer buffer=new StringBuffer(); for(Text name:values){ buffer.append(","+name); } System.out.println("++++"+buffer.toString()+"++++"); friendsNames.set(buffer.toString()); context.write(key,friendsNames); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(FriendListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(FriendListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_FriendList_0010(),args)); } }
2)计算共现次数
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Cooccurence_0010 extends Configured implements Tool{ static class CooccurenceMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ private Text key=new Text(); private IntWritable value=new IntWritable(1); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] strs=value.toString().split(","); for(int i=1;i<strs.length-1;i++){ for(int j=i+1;j<strs.length;j++){ //这个的目的是:两个数据造成一对以后,顺序固定的问题。 this.key.set(strs[i].compareTo(strs[j])<0? strs[i]+","+strs[j]: strs[j]+","+strs[i]); context.write(this.key,this.value); } } } } static class CooccurenceReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int count=0; for(IntWritable value:values){ count+=value.get(); } context.write(key,new IntWritable(count)); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(CooccurenceMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(CooccurenceReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00020_Cooccurence_0010(),args)); } }
喜欢就点个“推荐”哦~!