Mapreduce实例——去重

 

"数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。 java

MaprReduce去重流程以下图所示: linux

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。在MapReduce流程中,map的输出<key,value>通过shuffle过程汇集成<key,value-list>后交给reduce。咱们天然而然会想到将同一个数据的全部记录都交给一台reduce机器,不管这个数据出现多少次,只要在最终结果中输出一次就能够了。具体就是reduce的输入应该以数据做为key,而对value-list则没有要求(能够设置为空)。当reduce接收到一个<key,value-list>时就直接将输入的key复制到输出的key中,并将value设置成空值,而后输出<key,value>。 apache

实验环境 网络

Linux Ubuntu 14.04 app

jdk-7u75-linux-x64 eclipse

hadoop-2.6.0-cdh5.4.5 函数

hadoop-2.6.0-eclipse-cdh5.4.5.jar oop

eclipse-java-juno-SR2-linux-gtk-x86_64 大数据

实验内容 网站

现有一个某电商网站的数据文件,名为buyer_favorite1,记录了用户收藏的商品以及收藏的日期,文件buyer_favorite1中包含(用户id,商品id,收藏日期)三个字段,数据内容以"\t"分割,因为数据很大,因此为了方便统计咱们只截取它的一部分数据,内容以下:

  1. 用户id   商品id    收藏日期  
  2. 10181   1000481   2010-04-04 16:54:31  
  3. 20001   1001597   2010-04-07 15:07:52  
  4. 20001   1001560   2010-04-07 15:08:27  
  5. 20042   1001368   2010-04-08 08:20:30  
  6. 20067   1002061   2010-04-08 16:45:33  
  7. 20056   1003289   2010-04-12 10:50:55  
  8. 20056   1003290   2010-04-12 11:57:35  
  9. 20056   1003292   2010-04-12 12:05:29  
  10. 20054   1002420   2010-04-14 15:24:12  
  11. 20055   1001679   2010-04-14 19:46:04  
  12. 20054   1010675   2010-04-14 15:23:53  
  13. 20054   1002429   2010-04-14 17:52:45  
  14. 20076   1002427   2010-04-14 19:35:39  
  15. 20054   1003326   2010-04-20 12:54:44  
  16. 20056   1002420   2010-04-15 11:24:49  
  17. 20064   1002422   2010-04-15 11:35:54  
  18. 20056   1003066   2010-04-15 11:43:01  
  19. 20056   1003055   2010-04-15 11:43:06  
  20. 20056   1010183   2010-04-15 11:45:24  
  21. 20056   1002422   2010-04-15 11:45:49  
  22. 20056   1003100   2010-04-15 11:45:54  
  23. 20056   1003094   2010-04-15 11:45:57  
  24. 20056   1003064   2010-04-15 11:46:04  
  25. 20056   1010178   2010-04-15 16:15:20  
  26. 20076   1003101   2010-04-15 16:37:27  
  27. 20076   1003103   2010-04-15 16:37:05  
  28. 20076   1003100   2010-04-15 16:37:18  
  29. 20076   1003066   2010-04-15 16:37:31  
  30. 20054   1003103   2010-04-15 16:40:14  
  31. 20054   1003100   2010-04-15 16:40:16  

要求用Java编写MapReduce程序,根据商品id进行去重,统计用户收藏商品中都有哪些商品被收藏。结果数据以下:

  1. 商品id  
  2. 1000481  
  3. 1001368  
  4. 1001560  
  5. 1001597  
  6. 1001679  
  7. 1002061  
  8. 1002420  
  9. 1002422  
  10. 1002427  
  11. 1002429  
  12. 1003055  
  13. 1003064  
  14. 1003066  
  15. 1003094  
  16. 1003100  
  17. 1003101  
  18. 1003103  
  19. 1003289  
  20. 1003290  
  21. 1003292  
  22. 1003326  
  23. 1010178  
  24. 1010183  
  25. 1010675  

实验步骤

1.切换到/apps/hadoop/sbin目录下,开启Hadoop。

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.在Linux本地新建/data/mapreduce2目录。

  1. mkdir -p /data/mapreduce2  

3.切换到/data/mapreduce1目录下,自行创建文本文件buyer_favorite1。

依然在/data/mapreduce1目录下,使用wget命令,从

网络下载hadoop2lib.tar.gz,下载项目用到的依赖包。

将hadoop2lib.tar.gz解压到当前目录下。

  1. tar -xzvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce2/in目录,而后将Linux本地/data/mapreduce2目录下的buyer_favorite1文件导入到HDFS的/mymapreduce2/in目录中。

view plain copy

  1. hadoop fs -mkdir -p /mymapreduce2/in  
  2. hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in  

5.新建Java Project项目,项目名为mapreduce2。

在mapreduce2项目下新建包,包名为mapreduce。

在mapreduce包下新建类,类名为Filter。

6.添加项目所需依赖的jar包

右键项目,新建一个文件夹,命名为:hadoop2lib,用于存放项目所需的jar包。

将/data/mapreduce2目录下,hadoop2lib目录中的jar包,拷贝到eclipse中mapreduce2项目的hadoop2lib目录下。

选中全部项目hadoop2lib目录下全部jar包,并添加到Build Path中。

7.编写程序代码,并描述其思路

数据去重的目的是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。咱们天然想到将相同key值的全部value记录交到一台reduce机器,让其不管这个数据出现多少次,最终结果只输出一次。具体就是reduce的输出应该以数据做为key,而对value-list没有要求,当reduce接收到一个时,就直接将key复制到输出的key中,将value设置为空。

Map代码

  1. public static class Map extends Mapper<Object , Text , Text , NullWritable>  
  2.     //map将输入中的value复制到输出数据的key上,并直接输出  
  3.     {  
  4.     private static Text newKey=new Text();      //从输入中获得的每行的数据的类型  
  5.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException  
  6.     //实现map函数  
  7.     {             //获取并输出每一次的处理过程  
  8.     String line=value.toString();  
  9.     System.out.println(line);  
  10.     String arr[]=line.split("\t");  
  11.     newKey.set(arr[1]);  
  12.     context.write(newKey, NullWritable.get());  
  13.     System.out.println(newKey);  
  14.     }  
  15.     }  

map阶段采用Hadoop的默认的做业输入方式,把输入的value用split()方法截取,截取出的商品id字段设置为key,设置value为空,而后直接输出<key,value>。

reduce端代码

  1. public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{  
  2.         public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException  
  3.     //实现reduce函数  
  4.     {  
  5.     context.write(key,NullWritable.get());   //获取并输出每一次的处理过程  
  6.     }  
  7.     }  

map输出的<key,value>键值对通过shuffle过程,聚成<key,value-list>后,会交给reduce函数。reduce函数,无论每一个key 有多少个value,它直接将输入的赋值给输出的key,将输出的value设置为空,而后输出<key,value>就能够了。

完整代码

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.NullWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.mapreduce.Mapper;  
  10. import org.apache.hadoop.mapreduce.Reducer;  
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  15. public class Filter{  
  16.     public static class Map extends Mapper<Object , Text , Text , NullWritable>{  
  17.     private static Text newKey=new Text();  
  18.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  19.     String line=value.toString();  
  20.     System.out.println(line);  
  21.     String arr[]=line.split("\t");  
  22.     newKey.set(arr[1]);  
  23.     context.write(newKey, NullWritable.get());  
  24.     System.out.println(newKey);  
  25.     }  
  26.     }  
  27.     public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{  
  28.     public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{  
  29.         context.write(key,NullWritable.get());  
  30.         }  
  31.         }  
  32.         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  33.         Configuration conf=new Configuration();  
  34.         System.out.println("start");  
  35.         Job job =new Job(conf,"filter");  
  36.         job.setJarByClass(Filter.class);  
  37.         job.setMapperClass(Map.class);  
  38.         job.setReducerClass(Reduce.class);  
  39.         job.setOutputKeyClass(Text.class);  
  40.         job.setOutputValueClass(NullWritable.class);  
  41.         job.setInputFormatClass(TextInputFormat.class);  
  42.         job.setOutputFormatClass(TextOutputFormat.class);  
  43.         Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");  
  44.         Path out=new Path("hdfs://localhost:9000/mymapreduce2/out");  
  45.         FileInputFormat.addInputPath(job,in);  
  46.         FileOutputFormat.setOutputPath(job,out);  
  47.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  48.         }  
  49.         }  

8.在Filter类文件中,右键并点击=>Run As=>Run on Hadoop选项,将MapReduce任务提交到Hadoop中。

9.待执行完毕后,进入命令模式下,在HDFS中/mymapreduce2/out查看实验结果。

  1. hadoop fs -ls /mymapreduce2/out  
  2. hadoop fs -cat /mymapreduce2/out/part-r-00000  

相关文章
相关标签/搜索