实验环境 win7 hadoop2.7.3本地模式java
实验数据:订单数据orders.txt,商品数据pdts.txtapache
order.txt缓存
1001 pd001 300 1002 pd002 20 1003 pd003 40 1004 pd002 50
pdts.txtapp
pd001 apple
pd002 xiaomi
pd003 cuizi
实验解决的问题:解决mapreduce链接过程当中的数据倾斜的问题,典型应用场景以下:在电商平台中,买小米手机和买苹果手机的订单数量不少,买锤子手机的订单数量不多,如ide
果根据传统的mapreduce方法,3个reduce的数据将不均衡。好比接受小米的reduce接收到的数据会不少,接受锤子数据的reduce接收到的数据就会不多函数
实验解决的思路:采用map端链接,直接将排序过程在map中执行,将商品信息加载在map信息中,引入mapreduce的输入缓存机制oop
代码如图所示:ui
package com.tianjie.mapsidejoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapSideJoin { static class MapSideJoinMappe extends Mapper<LongWritable, Text, Text, NullWritable>{ //map 商品的订单信息k v key为商品编号,v为商品名称 Map<String,String> pdInfoMap = new HashMap<String, String>(); Text ktext = new Text(); /*setup 函数用来加载文件到hadoop缓存中 * */ protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //打开输入文本文件的路径,得到一个输入流 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt"))); String line; while(StringUtils.isNotEmpty(line = br.readLine())){ //得到商品信息表 k为商品编号,value为商品名称 String[] split = line.split("\t"); pdInfoMap.put(split[0], split[1]); } } /* * hadoop 的缓冲机制*/ /* * map 函数的输入key value ,其中默认输入为TextInputFormat, * key 为输入文本的偏移量,value为输入文本的值 * Text,NullWriable为map文件输入的值 * */ protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //得到文本文件的一行 String orderline = value.toString(); //将文本文件按照制表符切分 String[] fields = orderline.split("\t"); //更具商品编号,得到商品名称 String pdName = pdInfoMap.get(fields[1]); //得到商品的名字,将商品名称追加在文本文件中 ktext.set(orderline+"\t"+pdName); //将新的文本文件写出 context.write(ktext, NullWritable.get()); } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { //获得hadoop的一个配置参数 Configuration conf = new Configuration(); //获取一个job实例 Job job = Job.getInstance(conf); //加载job的运行类 job.setJarByClass(MapSideJoin.class); //加载mapper的类 job.setMapperClass(MapSideJoinMappe.class); //设置mapper类的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置文件输入的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); //设置文件的输出路径 FileSystem fs = FileSystem.get(conf); Path path = new Path(args[1]); if(fs.isDirectory(path)){ fs.delete(path, true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定须要缓冲一个文件到全部maptask运行节点工做目录 //job.addArchiveToClassPath(""); 缓存jar包到task运行节点的classpath中 //job.addFileToClassPath(file); 缓存普通文件到task运行节点的classpath中 //job.addCacheArchive(uri); 缓存压缩包文件到task运行节点的工做目录中 //1:缓存普通文件到task运行节点的工做目录 job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt")); //2:指定map端的加入逻辑不须要reduce阶段,设置reducetask数量为0 job.setNumReduceTasks(0); //提交job任务,等待job任务的结束 boolean res =job.waitForCompletion(true); System.exit(res?1:0); } }
须要注意的点有:spa
1:采用map端链接时,能够不适用reduce,这个时候能够设置reducetask 的数量为0:.net
2:程序运行的结果: