MapReduce的设计,最重要的是要找准key,而后制定一系列的数据处理流程。MapReduce的Map中,会把key相同的分配到同一个reduce中,对于key的选择,能够找到某个相同的因素。如下面的几个例子说明。java
有一份多我的的好友名单,求哪两我的之间有共同好友,共同好友是谁。测试数据以下:apache
A B,C,D,E,F,O B A,C,E,K C F,A,D,I D A,E,F,L E B,C,D,M,L F A,B,C,D,E,O,M G A,C,D,E,F H A,C,D,E,O I A,O J B,O K A,C,D L D,E,F M E,F,G O A,H,I,H
问题分析问题要求解的是共同好友,如A有好友D,C也有好友D,那么这里的共同因素就是共同好友D,所以会想到把这个共同好友做为一个key,而这个共同好友的全部owners做为value,这样在reduce中,循环遍历values两两配对就能够求解。app
问题分析如A的好友中有B,B的好友中有A,则这种状况就是互粉。这种状况下,不变的共同因素实际上是互相之间的关系:咱们能够将A和B组成一对,看成一个key,如“A-B”,value则是此种关系对的数目。若是某个关系对的数目等于2,则代表A是B的好友,B也是A的好友。从而就是互粉的状况。代码以下。ide
package mutualfriend; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MutualFriendMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Text out = new Text(); LongWritable times = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tmp = line.split(" "); String owner = tmp[0]; String[] friends = tmp[1].split(","); for (String friend : friends) { if (owner.compareTo(friend) <= 0) { out.set(owner + "-" + friend); } else { out.set(friend + "-" + owner); } context.write(out, times); } } } package mutualfriend; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MutualFriendReducer extends Reducer<Text, LongWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } if (sum >= 2) { context.write(key, null); } } } package mutualfriend; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MutualFriendDriver extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://master:9000"); conf.set("mapreduce.framework.name", "local"); Job job = Job.getInstance(conf, "MutualFriend"); job.setJarByClass(getClass()); job.setMapperClass(MutualFriendMapper.class); job.setReducerClass(MutualFriendReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path("mutualfriend/input")); FileOutputFormat.setOutputPath(job, new Path("mutualfriend/output")); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MutualFriendDriver(), args); System.out.println(exitCode); } }
代码运行结果以下:oop
A-B A-C A-D A-F A-O B-E C-F D-E D-F D-L E-L E-M F-M H-O I-O