MapReduce中的Join算法

  在关系型数据库中Join是很是常见的操做,各类优化手段已经到了极致。在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时须要从不一样的数据源中获取数据。不一样于传统的单机模式,在分布式存储下采用MapReduce编程模型,也有相应的处理措施和优化方法。html

  咱们先简要地描述待解决的问题。假设有两个数据集:气象站数据库和天气记录数据库 java

  气象站的示例数据,以下数据库

Station IDapache

Station Name编程

011990-99999缓存

SIHCCAJAVRI网络

012650-99999app

TRNSET-HANSMOEN分布式

  天气记录的示例数据,以下ide

Station ID

Timestamp

Temperature

012650-99999

194903241200

111

012650-99999

194903241800

78

011990-99999

195005150700

0

011990-99999

195005151200

22

011990-99999

195005151800

-11

  假设咱们想要以下结果

Station ID

Station Name

Timestamp

Temperature

011990-99999

SIHCCAJAVRI

195005150700

0

011990-99999

SIHCCAJAVRI

195005151200

22

011990-99999

SIHCCAJAVRI

195005151800

-11

012650-99999

TYNSET-HANSMOEN

194903241200

111

012650-99999

TYNSET-HANSMOEN

194903241800

78

  想一想看,咱们该怎么经过MapReduce实现上面的需求?

 

   MapReduce链接操做的实现技术取决于数据集的规模及分区方式。若是一个数据集很大而另一个数据集很小,以致于小的数据集能够分发到集群中的每个节点之中,而后在mapper阶段读取大数据集中的数据;到reducer时,reduce获取本节点上的数据(也就是小数据集中的数据)并完成链接操做;咱们以上面的天气数据链接来作具体阐述,假设气象站数据集不多,那将气象站数据集分发到集群中的每一个节点中,在mapper阶段读取天气记录数据,在reduce阶段读取本节点上的气象站数据,而后经过气象站数据中的气象站ID和天气数据中的气象ID作链接,从而完成气象站数据和天气记录数据的链接。在这种状况下,咱们就用到了Hadoop的分布式缓存机制,它可以在任务运行过程当中及时地将文件和存档复制到任务节点以供使用。为了节约网络宽带,在每个做业中,各个文件一般只须要复制到一个节点一次

  若是两个数据集的规模均很大,以致于没有哪一个数据集能够被彻底复制到集群的每一个节点中,咱们仍然可使用 MapReduce来进行链接,至于到底采用map端链接(链接操做若是由mapper执行,则称为 “map 端链接”)仍是reduce端链接(链接操做若是由reducer执行,则称为“reduce端链接”),则取决于数据的组织方式。下面咱们分别介绍map端链接和reduce端链接。

    map 端链接

      在两个大规模输入数据集到达map函数以前就应该执行链接操做。为达到该目的,各map的输入数据必须先分区而且以特定方式排序。各个输入数据集被划分红相同数量的分区,而且均按相同的键(链接键)排序。同一键的全部记录均会放在同一分区之中。听起来彷佛要求很是严格,但这的确合乎MapReduce做业的输出。

     map端链接操做能够链接多个做业的输出,只要这些做业的reducer数量相同、键相同而且输出文件是不可切分的(例如,小于一个 HDFS 块)。在上面讲的天气例子中,若是气象站文件以气象站ID部分排序,天气记录也以气象站ID部分排序,并且reducer的数量相同,则就知足了执行map端链接的前提条件。

     利用 org.apache.hadoop.mapreduce.join 包中的CompositeInputFormat类来运行一个 map 端链接。CompositeInputFormat类的输入源和链接类型(内链接或外链接)能够经过一个链接表达式进行配置,链接表达式的语法简单。此种方法不经常使用,这里再也不赘述。

    reduce 端链接

      因为reduce端链接并不要求输入数据集符合特定结构,于是reduce端链接比 map 端链接更为经常使用。可是,因为两个数据集均需通过MapReduce的shuffle过程, 因此reduce 端链接的效率每每要低一些。基本思路是mapper为各个记录标记源,而且使用链接键做为 map 输出键,使键相同的记录放在同一reducer中。 咱们经过下面两种技术实现reduce端链接。

     一、多输入

       数据集的输入源每每有多种格式,所以可使用 MultipleInputs 类来方便地解析各个数据源。MultipleInputs的用法,在“MapReduce输入格式”已经介绍过,这里就再也不赘述。

     二、二次排序

       如前所述,reducer在两个数据源中选出键相同的记录并不介意这些记录是否已排好序。此外,为了更好地执行链接操做,先将某一个数据源传输到reducer会很是重要。还以上面的天气数据链接为例,当天气记录发送到reducer的时候,与这些记录有相同键的气象站信息最好也已经放在reducer,使得reducer可以将气象站名称填到天气记录之中就立刻输出。虽然也能够不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽可能避免这种状况,由于其中任何一组的记录数量可能很是庞大,远远超出reducer的可用内存容量。 所以咱们用到二次排序技术,对map阶段输出的每一个键的值进行排序,实现这一效果。

 

  下面咱们分别介绍两种实现方式分布式缓存机制、reduce端链接

  一、分布式缓存机制

    1、用法

      Hadoop 命令行选项中,有三个命令能够实现文件分发到任务的各个节点。

        1)可使用-files选项指定待分发的文件,文件内包含以逗号隔开的URL列表。文件能够存放在本地文件系统、HDFS、或其它Hadoop可读文件系统之中。若是还没有指定文件系统,则这些文件被默认是本地的。即便默认文件系统并不是本地文件系统,这也是成立的。

        2)可使用-archives选项向本身的任务中复制存档文件,好比JAR文件、ZIP 文件、tar文件和gzipped tar文件,这些文件会被解档到任务节点。

        3)可使用-libjars选项将JAR文件添加到mapper和reducer任务的类路径中。若是做业JAR文件中并不是包含不少库JAR文件,使用-libjars选项是很方便的。

    2、工做机制

      当启动一个做业,Hadoop会把由-files、-archives、和-libjars等选项所指定的文件复制到分布式文件系统之中。接着,在任务运行以前,tasktracker将文件从分布式文件系统复制到本地磁盘(缓存)使任务可以访问文件。此时,这些文件就被视为“本地化” 了。从任务的角度来看, 这些文件就已经在那儿了,它并不关心这些文件是否来自 HDFS 。此外,有-libjars指定的文件会在任务启动前添加到任务的类路径(classpath)中。

   三、分布式缓存API

     因为能够经过Hadoop命令行间接使用分布式缓存,因此大多数应用不须要使用分布式缓存API。然而,一些应用程序须要用到分布式缓存的更高级的特性,这就须要直接使用API了。 API包括两部分:将数据放到缓存中的方法,以及从缓存中读取数据的方法。

      1)首先掌握数据放到缓存中的方法,如下列举 Job 中可将数据放入到缓存中的相关方法:

public void addCacheFile(URI uri); 
public void addCacheArchive(URI uri);// 以上两组方法将文件或存档添加到分布式缓存 
public void setCacheFiles(URI[] files); 
public void setCacheArchives(URI[] archives);// 以上两组方法将一次性向分布式缓存中添加一组文件或存档 
public void addFileToClassPath(Path file); 
public void addArchiveToClassPath(Path archive);// 以上两组方法将文件或存档添加到 MapReduce 任务的类路径

           在缓存中能够存放两类对象:文件(files)和存档(achives)。文件被直接放置在任务节点上,而存档则会被解档以后再将具体文件放置在任务节点上。
     2)其次掌握在map或者reduce任务中,使用API从缓存中读取数据。          

public Path[] getLocalCacheFiles() throws IOException; 
public Path[] getLocalCacheArchives() throws IOException; 
public Path[] getFileClassPaths(); 
public Path[] getArchiveClassPaths();

     咱们可使用 getLocalCacheFiles()和getLocalCacheArchives()方法获取缓存中的文件或者存档的引用。当处理存档时,将会返回一个包含解档文件的目录。相应的,用户能够经过 getFileClassPaths()和getArchivesClassPaths()方法获取被添加到任务的类路径下的文件和文档。

 

  下面咱们仍然之前面的气象站数据和天气记录数据为例,使用分布式缓存API,完成两个数据集的链接操做。完整的 MapReduce 程序以下所示。

package com.buaa.distributedgache;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa.distributedgache
* @ClassName JoinRecordWithStationName
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-25 19:34:57
*/
public class JoinRecordWithStationName extends Configured implements Tool {
    
    public static class TemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] arr = value.toString().split("\t", 2);
            if (arr.length == 2) {
                context.write(new Text(arr[0]), value);
            }
        }
    }
    
    public static class TemperatureReducer extends Reducer<Text, Text, Text, Text> {
        // 定义Hashtable存放缓存数据
        private Hashtable<String, String> table = new Hashtable<String, String>();
        
        /**
         * 获取分布式缓存文件
         */
        @SuppressWarnings("deprecation")
        protected void setup(Context context) throws IOException, InterruptedException {
            // 返回本地文件路径
            Path[] localPaths = (Path[]) context.getLocalCacheFiles();
            if (localPaths.length == 0) {
                throw new FileNotFoundException("Distributed cache file not found.");
            }
            
            // 获取本地 FileSystem实例
            FileSystem fs = FileSystem.getLocal(context.getConfiguration());
            // 打开输入流
            FSDataInputStream in = fs.open(new Path(localPaths[0].toString()));
            // 建立BufferedReader读取器
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            // 按行读取并解析气象站数据
            String infoAddr = null;
            while ((infoAddr = br.readLine()) != null) {
                String[] records = infoAddr.split("\t");
                // key为stationID,value为stationName
                table.put(records[0], records[1]);
            }
        }

        public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException {
            // 天气记录根据stationId获取stationName
            String stationName = table.get(key.toString());
            for (Text value : values) {
                context.write(new Text(stationName), value);
            }
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        // 读取配置文件
        Configuration conf = new Configuration();
        
        // 判断路径是否存在,若是存在,则删除
        Path mypath = new Path(args[2]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
        
        // 获取一个job实例
        Job job = Job.getInstance(conf,"join");
        // 主类
        job.setJarByClass(JoinRecordWithStationName.class);
        
        // 设置record.txt文件做为输入
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 添加station.txt到分布式缓存
        job.addCacheFile(new URI(args[1]));
        // 输出目录
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        // mapper
        job.setMapperClass(TemperatureMapper.class);
        // reduce
        job.setReducerClass(TemperatureReducer.class);
        
        // 输出key类型
        job.setOutputKeyClass(Text.class);
        // 输出value类型
        job.setOutputValueClass(Text.class);
        
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = { 
                "hdfs://hadoop1:9000/buaa/join/record.txt",
                "hdfs://hadoop1:9000/buaa/join/station.txt",
                "hdfs://hadoop1:9000/buaa/join/out/" 
            };
        int ec = ToolRunner.run(new Configuration(), new JoinRecordWithStationName(), args0);
        System.exit(ec);
    }
}

  添加分布式缓存文件相对简单,只需使用job.addCacheFile(new URI(cacheFilePath))方法添加缓存文件便可。须要注意的是,在获取获取缓存文件时,文件将以“本地的”Path 对象的形式返回。为了读取文件,用户须要首先使用getLocal()方法得到一个Hadoop本地FileSystem实例。本程序中,咱们在Reduce的setup()方法中获取缓存文件。

  如下是输出结果,达到咱们预期的效果。

  clip_image002

  二、Reduce端链接

  咱们使用 TextPair 类构建组合键,包括气象站ID 和“标记”。在这里,“标记” 是一个虚拟的字段,其惟一目的是对记录排序,使气象站记录比天气记录先到达。一种简单的作法就是:对于气象站记录,设置“标记”的值设为 0;对于天气记录,设置“标记”的值设为1,代码以下所示

package com.buaa.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName TextPair
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-24 22:54:05
*/
public class TextPair implements WritableComparable<TextPair>{
    // Text类型的实例变量first
    private Text first;
    // Text类型的实例变量second
    private Text second;
    
    public TextPair(){
        set(new Text(),new Text());
    }
    
    public TextPair(String first,String second){
        set(new Text(first),new Text(second));
    }
    
    public void set(Text first,Text second){
        this.first = first;
        this.second = second;
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }
    
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }
    
    public boolean equals(TextPair tp) {
        return first.equals(tp.first) && second.equals(tp.second);
    }
    
    public String toStirng() {
        return first + "\t" + second;
    }
    
    @Override
    public int compareTo(TextPair o) {
        if(!first.equals(o.first)){
            return first.compareTo(o.first);
        }else if(!second.equals(o.second)){
            return second.compareTo(o.second);
        }
        
        return 0;
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }

    public void setSecond(Text second) {
        this.second = second;
    }
}

  JoinStationMapper处理来自气象站数据,代码以下所示。

package com.buaa.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinStationMapper
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-24 22:55:42
*/
public class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        // 解析气象站数据
        String[] arr = line.split("\\s+");
        
        if (arr.length == 2) {// 知足这种数据格式
            // key=气象站id value=气象站名称
            context.write(new TextPair(arr[0], "0"), new Text(arr[1]));
        }
    }
}

  JoinRecordMapper处理来自天气记录数据,代码以下所示

package com.buaa.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinRecordMapper
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-24 22:56:55
*/
public class JoinRecordMapper extends Mapper<LongWritable,Text,TextPair,Text>{ 
    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String line = value.toString();
        // 解析天气记录数据
        String[] arr = line.split("\\s+",2);
        
        if(arr.length == 2){
            //key=气象站id  value=天气记录数据
            context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
        }  
    }
}

  因为 TextPair 通过了二次排序,因此 reducer 会先接收到气象站数据。所以从中抽取气象站名称,并将其做为后续每条输出记录的一部分写到输出文件。JoinReducer 的代码以下所示。

package com.buaa.secondarysort;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinReducer
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-24 22:54:24
*/
public class JoinReducer extends Reducer< TextPair,Text,Text,Text>{
    protected void reduce(TextPair key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
        Iterator<Text> iter = values.iterator();
        // 气象站名称
        Text stationName = new Text(iter.next());
        
        while(iter.hasNext()){
            // 天气记录的每条数据
            Text record = iter.next();
            
            Text outValue = new Text(stationName.toString() + "\t" + record.toString());
            
            context.write(key.getFirst(),outValue);
        }
    }        
}

  下面咱们定义做业的驱动类 JoinRecordWithStationName,在该类中,关键在于根据组合键的第一个字段(即气象站 ID)进行分区和分组,JoinRecordWithStationName 类的代码以下所示。

package com.buaa.secondarysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinRecordWithStationName
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-24 22:57:24
*/
public class JoinRecordWithStationName extends Configured implements Tool {
    public static class KeyPartitioner extends Partitioner<TextPair, Text> {
        public int getPartition(TextPair key, Text value, int numPartitions) {
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }
    
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(TextPair.class, true);
        }
        
        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable wc1, WritableComparable wc2) {
            TextPair tp1 = (TextPair) wc1;
            TextPair tp2 = (TextPair) wc2;
            
            return tp1.getFirst().compareTo(tp2.getFirst());
        }
    }

    public int run(String[] args) throws Exception {
        // 读取配置文件
        Configuration conf = new Configuration();
        
        // 判断路径是否存在,若是存在,则删除
        Path mypath = new Path(args[2]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }

        // 新建一个任务
        Job job = Job.getInstance(conf, "join");
        // 主类
        job.setJarByClass(JoinRecordWithStationName.class);
        
        // 天气记录数据源
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, JoinRecordMapper.class);
        // 气象站数据源
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, JoinStationMapper.class);
        // 输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        // 自定义分区
        job.setPartitionerClass(KeyPartitioner.class);
        // 自定义分组
        job.setGroupingComparatorClass(GroupComparator.class);
        
        // 指定Reducer
        job.setReducerClass(JoinReducer.class);
        
        // map key输出类型
        job.setMapOutputKeyClass(TextPair.class);
        // reduce key输出类型
        job.setOutputKeyClass(Text.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = { 
                "hdfs://hadoop1:9000/buaa/join/record.txt",
                "hdfs://hadoop1:9000/buaa/join/station.txt",
                "hdfs://hadoop1:9000/buaa/join/out/" 
        };
        int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args0);
        System.exit(exitCode);
    }
}

  如下是输出结果,也达到咱们预期的效果。

  clip_image004

 

若是,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
若是,您但愿更容易地发现个人新博客,不妨点击一下左下角的【关注我】。
若是,您对个人博客所讲述的内容有兴趣,请继续关注个人后续博客,我是【刘超★ljc】。

本文版权归做者和博客园共有,欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,不然保留追究法律责任的权利。

相关文章
相关标签/搜索