半链接(Semi-join)

    假设一个场景,须要链接两个很大的数据集,例如,用户日志和 OLTP 的用户数据。任何一个数据集都不是足够小到能够缓存在 map 做业的内存中。能够思考如下问题:若是在数据集的链接操做中,一个数据集中有的记录因为由于没法链接到另外一个数据集的记录,将会被移除。这样还须要将整个数据集放到内存中吗?java

    在这个例子中,在用户日志中的用户仅仅是 OLTP 用户数据中的用户中的很小的一部分。那么就能够从 OLTP 用户数据中只取出存在于用户日志中的那部分用户的用户数据。而后就能够获得足够小到能够放在内存中的数据集。这种的解决方案就叫作半链接。web

    应用场景:
    须要链接两个都很大的数据集,同时避免通过 shuffle 和 sort 阶段。解决方案:apache

    在这个技术中,将会用到三个 MapReduce 做业来链接两个数据集,以此来减小 reduce 端链接的消耗。这个技术在这种场景下很是有用:链接两个很大的数据集,可是能够经过过滤与另外一个数据集不匹配的记录来减小数据的大小,使得能够放入 task 的内存中。缓存

    下图说明了在半链接中将要执行的三个 MapReduce 做业(Job)。服务器


[例]使用半链接。app

    准备数据集:分布式

     有两个数据集 logs.txt 和 users.txt。其中 users.txt 中为用户数据,包括用户名、年龄和所在地区;logs.txt为基于用户的一些活动(可从应用程序或 web 服务器日志中抽取出来),包括用户名、活动、源 IP 地址。ide

    文件 users.txt:函数

        
    文件 logs.txt:    
oop


JOB 1:

    第一个 MapReduce job 的功能是从日志文件中提取出用户名,用这些用户名生成一个用户名惟一的集合(Set)。这经过在 map 函数执行用户名的投影操做来实现,并反过来使用 reducer 来产生这些用户名。为了减小在 map 阶段和 reduce 阶段之间传输的数据量,采用以下方法:在 map 任务中采用哈希集 HashSet来缓存全部的用户名,并在 cleanup 方法中输出该 HashSet 的值。下图说明了这个 job 的流程:


做业1的代码:

package com.edu360.mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
 * java 一出 谁与争锋
 * <p>
 * .............................................
 * 佛祖保佑             永无BUG
 *
 * @Auther: caozhan
 * @Date: 2018/10/29 17:51
 * @Description:
 */
//从logs.txt 表中抽取用户名(考虑外键引用关系,这里至关于先在从表中找出被引用的外键列惟一值)
public class SemiJoinJob1 extends Configured implements Tool {
    //使用keyValueTextInputFormat 类,输入的是logs.txt 表中的每条记录
    public static class Map extends Mapper<Text,Text, Text, NullWritable>{
        //缓存用户名过滤后的小数据集
        private Set<String> keys = new HashSet<>();
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            //把用户名加入缓存,重复的用户名只会保留一个
            keys.add(key.toString());
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            Text outputkey = new Text();
            for(String key:keys){
                outputkey.set(key);
                //从mapper输出缓存的用户名
                context.write(outputkey,NullWritable.get());
            }
        }
    }
    public static class Reduce extends Reducer<Text,NullWritable,Text,NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            //从reduce输出每一个用户名一次
            context.write(key,NullWritable.get());
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Path inputPath = new Path(args[0]);
        Path outPath = new Path(args[1]);
        Job job1= Job.getInstance(getConf(),"SemiJoinJob1");
        job1.setJarByClass(getClass());
        job1.setMapperClass(Map.class);
        job1.setReducerClass(Reduce.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(NullWritable.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(NullWritable.class);
        job1.setInputFormatClass(KeyValueTextInputFormat.class);
        job1.setOutputFormatClass(TextOutputFormat.class);

        //若是输出目录在 先删除
        /*
        FileSystem fs = FileSystem.get(outPath.toUri(),getConf());
        if(fs.exists(outPath)){
            fs.delete(outPath,true);
        }
         */
        FileInputFormat.setInputPaths(job1,inputPath);
        FileOutputFormat.setOutputPath(job1,outPath);
        if(job1.waitForCompletion(true)){
            return 0;
        }
        return 1;
    }
    public static void main(String [] args)throws Exception{
        int returnCode = ToolRunner.run(new SemiJoinJob1(),args);
        System.exit(returnCode);
    }
}

做业 1 的结果就是来自于日志文件中的全部用户的集合。集合中的用户名是惟一的。

Job2:

    第二步是一个复杂的过滤 MapReduce job,目标是从全体用户的用户数据集中移除不存在于日志文件中的用户。这是一个 map-only job,它使用一个复制链接来缓存出如今日志文件中的用户名,并把他们和用户数据集进行链接。因为 job 1 输出的惟一用户的数据集实际上要远远小于整个用户数据集,因此很天然地就把来自 job 1 的惟一用户集放到缓存中了。下图说明了这个做业的流程:


    这是一个复制链接,与上一节学习的复制链接同样。

Job 2 的 mapper 代码以下:(注意,要先上传 job1 的输出文件 part-r-00000 到分布式缓存)

package com.edu360.mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;


/**
 * java 一出 谁与争锋
 * <p>
 * .............................................
 * 佛祖保佑             永无BUG
 *
 * @Auther: caozhan
 * @Date: 2018/10/29 23:20
 * @Description:
 */
public class SemiJoinJob2 extends Configured implements Tool {
    public static class Map extends Mapper<Object, Text,Text, NullWritable>{
        public static final String CATCH_USERNAME_FILENAME="part-000";
        private Set<String> userSet=new HashSet<>();
        private Text outputKey = new Text();
        //在map()函数执行以前,从分布式缓存中读取被缓存到本地

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            URI[] patternsURIs = context.getCacheFiles(); // 获取缓存文件的 uri
            Path patternsPath = new Path(patternsURIs[0].getPath()); // 这里咱们只缓存了一个文件
            String patternsFileName = patternsPath.getName(); // 得到缓存文件的文件名
            System.out.println("patternsFileName: " + patternsFileName);
            // 从分布式缓存中读取 job 1 的输出,并存入到 HashMap 中
            if (CATCH_USERNAME_FILENAME.equals(patternsFileName)) {
                BufferedReader br = new BufferedReader(new FileReader("/hadoop/semijoin/output1/"));
                 String line = br.readLine();
                while (line != null) {
                    String username = line; userSet.add(username); // 放入 HashSet 中 line = br.readLine();
                }
                br.close(); }
            if (userSet.isEmpty()) {
                throw new IOException("unable to load unique user table");
            }
        }
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String row = value.toString();
            String[] tokens = row.split("\t");
            String username = tokens[0]; // 取每条用户记录中的用户名字段
            if (userSet.contains(username)) { // 过滤
                outputKey.set(row);
                context.write(outputKey, NullWritable.get()); // 输出整行用户记录
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        if (args.length < 3) {
            System.err.println("用法: SemiJoinJob2 <userpath> <outpath> <catchpath>");
            System.exit(-1);
        }
        Path inputPath = new Path(args[0]); // 应该为 job 1 的输出:part-r-00000
        Path outputPath = new Path(args[1]);
        Path cachePath = new Path(args[2]);
        Job job2 = Job.getInstance(getConf(), "SemiJoinJob2");
        // 将 part-r-00000 文件放入分布式缓存中 // "/hadoop/semijoin/output1/part-r-00000" job2.addCacheFile(cachePath.toUri());
        job2.setJarByClass(getClass());
        job2.setMapperClass(Map.class);
        job2.setNumReduceTasks(0);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(NullWritable.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(NullWritable.class);
        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        // 若是输出目录存在,则先删除
        FileSystem fs = FileSystem.get(outputPath.toUri(), getConf());
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        FileInputFormat.setInputPaths(job2, inputPath);
        FileOutputFormat.setOutputPath(job2, outputPath);
        if (job2.waitForCompletion(true)) {
            return 0;
        }
        return 1;
    }
    public static void main(String[] args) throws Exception {
        int returnCode = ToolRunner.run(new SemiJoinJob2(), args);
        System.exit(returnCode);
    }
}

做业 2 的输出就是已被用户日志数据集的用户名过滤过的用户集了。

Job 3:

在这最后一步中,咱们将合并从 job 2 输出的过滤后的用户和原始的用户日志。如今被过滤后的用户已经小到能够驻留在内存中了,这样就能够将它们放入分布式缓存中。下图演示了这个 job 的流程:

package com.edu360.mapreduce;


import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;


/**
 * java 一出 谁与争锋
 * <p>
 * .............................................
 * 佛祖保佑             永无BUG
 *
 * @Auther: caozhan
 * @Date: 2018/10/29 23:46
 * @Description:
 */
public class SemiJoinJob3 extends Configured implements Tool {
    public static class JoinMap extends Mapper<Object, Text, Text, Text> {
        public static final String CATCH_USERS_FILENAME = "part-m-00000";
        private Map<String, String> usersMap = new HashMap<>();
        private Text outputKey = new Text();
        private Text outputValue = new Text();
        // 在 map()函数执行以前,从分布式缓存中读取被缓存到本地的文件

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            URI[] patternsURIs = context.getCacheFiles(); // 获取缓存文件的 uri
            Path patternsPath = new Path(patternsURIs[0].getPath()); // 这里咱们只缓存了一个文件
            String patternsFileName = patternsPath.getName().toString(); // 得到缓存文件的文件名
            // 从分布式缓存中读取 job 2 的输出,并存入到 HashMap 中
            if (CATCH_USERS_FILENAME.equals(patternsFileName)) {
                patternsFileName = "/hadoop/semijoin/output2/" + patternsFileName;
                BufferedReader br = new BufferedReader(new FileReader(patternsFileName));
                String line = br.readLine();
                while (line != null) {
                    String[] tokens = line.split("\t");
                    String username = tokens[0];
                    String content = line;
                    usersMap.put(username, content); // 放入 HashMap 中
                    line = br.readLine();
                }
                br.close();
            }
            if (usersMap.isEmpty()) {
                throw new IOException("unable to load users catch table");
            }
        }
        // 输入的是 logs.txt 中的日志信息,须要和缓存中的用户信息链接 @Override
        protected void map(Object key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            String row = value.toString();
            String[] tokens = row.split("\t");
            String username = tokens[0]; // 取每条日志记录的用户名字段
            String user = usersMap.get(username); // 根据 username,找到对应的(缓存的)用户记录
            outputKey.set(row);
            outputValue.set(user);
            context.write(outputKey, outputValue);
        }
    }
        public int run(String[] args) throws Exception { if (args.length < 3) {
            System.err.println("用法: SemiJoinJob3 <logspath> <outpath> <catchpath>");
            System.exit(-1); }
            Path inputPath = new Path(args[0]); // 应该为 job 2 的输出:part-r-00000
            Path outputPath = new Path(args[1]);
            Path cachePath = new Path(args[2]);
            Job job3 = Job.getInstance(getConf(), "SemiJoinJob3");
            // 将 part-r-00000 文件放入分布式缓存中
            // "/hadoop/semijoin/output2/part-m-00000" job3.addCacheFile(cachePath.toUri());
            job3.setJarByClass(getClass());
            job3.setMapperClass(JoinMap.class);
            job3.setNumReduceTasks(0);
            job3.setMapOutputKeyClass(Text.class);
            job3.setMapOutputValueClass(Text.class);
            job3.setOutputKeyClass(Text.class);
            job3.setOutputValueClass(Text.class);
            job3.setInputFormatClass(TextInputFormat.class);
            job3.setOutputFormatClass(TextOutputFormat.class);
            // 若是输出目录存在,则先删除
            FileSystem fs = FileSystem.get(outputPath.toUri(), getConf());
            if (fs.exists(outputPath)) {
                fs.delete(outputPath, true);
            }
            FileInputFormat.setInputPaths(job3, inputPath);
            FileOutputFormat.setOutputPath(job3, outputPath);
            if (job3.waitForCompletion(true)) {
                return 0;
            }
            return 1;
    }
    public static void main(String[] args) throws Exception {
        int returnCode = ToolRunner.run(new SemiJoinJob3(), args);
        System.exit(returnCode);
    }
}

小结:

这一节学习了怎样使用一个半链接(semi-join)来合并两个数据集。半链接结构包含比其余链接更多的步骤,可是当处理大数据集时(其中有一个数据集必须可被消减到适合放入内存的大小),使用半链接是很给力的方式。

相关文章
相关标签/搜索