Map/Reduce中Join查询实现

http://www.cnblogs.com/MengYan-LongYou/p/3360613.htmlhtml

 

在作这个Join查询的时候,必然涉及数据,我这里设计了2张表,分别较data.txtinfo.txt,字段之间以/t划分。app

data.txt内容以下:ide

201001    1003    abc函数

201002    1005    defoop

201003    1006    ghithis

201004    1003    jklspa

201005    1004    mno设计

201006    1005    pqrorm

 

info.txt内容以下:htm

 

1003    kaka

1004    da

1005    jue

1006    zhao

 

指望输出结果:

1003    201001    abc    kaka

1003    201004    jkl    kaka

1004    201005    mno    da

1005    201002    def    jue

1005    201006    pqr    jue

1006    201003    ghi    zhao

 

4、Map代码

首先是map的代码,我贴上,而后简要说说

 

public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

            // 获取输入文件的全路径和名称

            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

 

            if (pathName.contains("data.txt")) {

                String values[] = value.toString().split("/t");

                if (values.length < 3) {

                    // data数据格式不规范,字段小于3,抛弃数据

                    return;

                } else {

                    // 数据格式规范,区分标识为1

                    TextPair tp = new TextPair(new Text(values[1]), new Text("1"));

                    context.write(tp, new Text(values[0] + "/t" + values[2]));

                }

            }

            if (pathName.contains("info.txt")) {

                String values[] = value.toString().split("/t");

                if (values.length < 2) {

                    // data数据格式不规范,字段小于2,抛弃数据

                    return;

                } else {

                    // 数据格式规范,区分标识为0

                    TextPair tp = new TextPair(new Text(values[0]), new Text("0"));

                    context.write(tp, new Text(values[1]));

                }

            }

        }

    }

 

这里须要注意如下部分:

ApathName是文件在HDFS中的全路径(例如:hdfs://M1:9000/MengYan/join/data/info.txt),能够以endsWith()的方法来判断。

B、资料表,也就是这里的info.txt须要放在前面,也就是标识号是0.不然没法输出理想结果。

CMap执行完成以后,输出的中间结果以下:

1003,0    kaka

1004,0    da

1005,0    jue

1006,0    zhao

1003,1    201001    abc

1003,1    201004    jkl

1004,1    201005    mon

1005,1    201002    def

1005,1    201006    pqr

1006,1    201003    ghi

 

5、分区和分组

1map以后的输出会进行一些分区的操做,代码贴出来:

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {

        @Override

        public int getPartition(TextPair key, Text value, int numParititon) {

            return Math.abs(key.getFirst().hashCode() * 127) % numParititon;

        }

    }

分区我在之前的文档中写过,这里不作描述了,就说是按照map输出的符合key的第一个字段作分区关键字。分区以后,相同key会划分到一个reduce中去处理(若是reduce设置是1,那么就是分区有多个,可是仍是在一个reduce中处理。可是结果会按照分区的原则排序)。分区后结果大体以下:

 

同一区:

1003,0    kaka

1003,1    201001    abc

1003,1    201004    jkl

 

 

同一区:

1004,0    da

1004,1    201005    mon

 

 

同一区:

1005,0    jue

1005,1    201002    def

1005,1    201006    pqr

 

 

同一区:

1006,0    zhao

1006,1    201003    ghi

 

2、分组操做,代码以下

 

public static class Example_Join_01_Comparator extends WritableComparator {

 

        public Example_Join_01_Comparator() {

            super(TextPair.class, true);

        }

 

        @SuppressWarnings("unchecked")

        public int compare(WritableComparable a, WritableComparable b) {

            TextPair t1 = (TextPair) a;

            TextPair t2 = (TextPair) b;

            return t1.getFirst().compareTo(t2.getFirst());

        }

    }

 

分组操做就是把在相同分区的数据按照指定的规则进行分组的操做,就以上来看,是按照复合key的第一个字段作分组原则,达到忽略复合key的第二个字段值的目的,从而让数据可以迭代在一个reduce中。输出后结果以下:

 

同一组:

1003,0    kaka

1003,0    201001    abc

1003,0    201004    jkl

 

同一组:

1004,0    da

1004,0    201005    mon

 

同一组:

1005,0    jue

1005,0    201002    def

1005,0    201006    pqr

 

同一组:

1006,0    zhao

1006,0    201003    ghi

 

6、reduce操做

贴上代码以下:

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

        protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,

                InterruptedException {

            Text pid = key.getFirst();

            String desc = values.iterator().next().toString();

            while (values.iterator().hasNext()) {

                context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));

            }

        }

    }

1、代码比较简单,首先获取关键的ID值,就是key的第一个字段。

2、获取公用的字段,经过排组织后能够看到,一些共有字段是在第一位,取出来便可。

3、遍历余下的结果,输出。

7、其余的支撑代码

1、首先是TextPair代码,没有什么能够细说的,贴出来:

public class TextPair implements WritableComparable<TextPair> {

    private Text first;

    private Text second;

 

    public TextPair() {

        set(new Text(), new Text());

    }

 

    public TextPair(String first, String second) {

        set(new Text(first), new Text(second));

    }

 

    public TextPair(Text first, Text second) {

        set(first, second);

    }

 

    public void set(Text first, Text second) {

        this.first = first;

        this.second = second;

    }

 

    public Text getFirst() {

        return first;

    }

 

    public Text getSecond() {

        return second;

    }

 

    public void write(DataOutput out) throws IOException {

        first.write(out);

        second.write(out);

    }

 

    public void readFields(DataInput in) throws IOException {

        first.readFields(in);

        second.readFields(in);

    }

 

    public int compareTo(TextPair tp) {

        int cmp = first.compareTo(tp.first);

        if (cmp != 0) {

            return cmp;

        }

        return second.compareTo(tp.second);

    }

}

2Job的入口函数

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();

        GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

        String[] otherArgs = parser.getRemainingArgs();

        if (agrs.length < 3) {

            System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");

            System.exit(2);

        }

 

        //conf.set("hadoop.job.ugi", "root,hadoop");

 

        Job job = new Job(conf, "Example_Join_01");

        // 设置运行的job

        job.setJarByClass(Example_Join_01.class);

        // 设置Map相关内容

        job.setMapperClass(Example_Join_01_Mapper.class);

        // 设置Map的输出

        job.setMapOutputKeyClass(TextPair.class);

        job.setMapOutputValueClass(Text.class);

        // 设置partition

        job.setPartitionerClass(Example_Join_01_Partitioner.class);

        // 在分区以后按照指定的条件分组

        job.setGroupingComparatorClass(Example_Join_01_Comparator.class);

        // 设置reduce

        job.setReducerClass(Example_Join_01_Reduce.class);

        // 设置reduce的输出

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        // 设置输入和输出的目录

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        // 执行,直到结束就退出

        System.exit(job.waitForCompletion(true) ? 0 : 1);

 

    }

 

8、总结

1、这是个简单的join查询,能够看到,我在处理输入源的时候是在map端作来源判断。其实在0.19能够用MultipleInputs.addInputPath()的方法,可是它用了JobConf作参数。这个方法原理是多个数据源就采用多个map来处理。方法各有优劣。

2、对于资源表,若是咱们采用01这样的模式来区分,资源表是须要放在前的。例如本例中info.txt就是资源表,因此标识位就是0.若是写为1的话,能够试下,在分组以后,资源表对应的值放在了迭代器最后一位,没法追加在最后全部的结果集合中。

3、关于分区,并非全部的map都结束才开始的,一部分数据完成就会开始执行。一样,分组操做在一个分区内执行,若是分区完成,分组将会开始执行,也不是等全部分区完成才开始作分组的操做。

相关文章
相关标签/搜索