MapReduce设计模式之数据组织模式

分层结构模式

应用场景

将基于行的数据转化成分层格式,如JSON。能够用在文章和评论的关系上等等。java

代码实现

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import filtering.GrepMain;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**
 * @Author bluesnail95
 * @Date 2019/7/21 8:50
 * @Description
 */
public class PostCommentMain {

    public static class PostMapper extends Mapper<Object, Text,Text,Text> {

        public void map(Object key,Text value,Context context) {
            JSONObject textJson = JSONObject.parseObject(value.toString());
            String id = textJson.getString("id");
            if(StringUtils.isNoneBlank(id)) {
                try {
                    textJson.put("type","P");
                    context.write(new Text(id),new Text(textJson.toString()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class CommentMapper extends Mapper<Object, Text,Text,Text> {

        public void map(Object key,Text value,Context context) {
            JSONObject textJson = JSONObject.parseObject(value.toString());
            String id = textJson.getString("postId");
            if(StringUtils.isNoneBlank(id)) {
                try {
                    textJson.put("type","C");
                    context.write(new Text(id),new Text(textJson.toString()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class PostCommentReducer extends Reducer<Text,Text,Text, NullWritable> {

        private JSONObject postJson = null;

        private JSONArray commentArray = new JSONArray();

        public void reduce(Text key,Iterable<Text> values,Context context) {
            postJson = null;
            commentArray.clear();
            for (Text value : values) {
                JSONObject valueJson = JSONObject.parseObject(value.toString());
                String type = valueJson.getString("type");
                if("P".equals(type)) {
                    postJson = valueJson;
                }else if("C".equals(type)) {
                    commentArray.add(valueJson);
                }
            }
            postJson.put("comemnt",commentArray);
            try {
                context.write(new Text(postJson.toString()),NullWritable.get());
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "PostComment");
            //与本身定义的类名保持一致
            job.setJarByClass(PostCommentMain.class);
            //设置的输出键和输出值和mapper定义的须要保持一致。
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //输入输出路径
            //与本身定义的Mapper类和Reducer类保持一致
            job.setReducerClass(PostCommentReducer.class);
            MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class,PostMapper.class);
            MultipleInputs.addInputPath(job,new Path(args[1]), TextInputFormat.class,CommentMapper.class);
            FileUtil.fullyDelete(new File(args[2]));
            FileOutputFormat.setOutputPath(job, new Path(args[2]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**
 * @Author bluesnail95
 * @Date 2019/7/21 10:01
 * @Description
 */
public class QuestionAnswerMain {

    public static class QuestionAnswerMapper extends Mapper<Object, Text,Text, Text> {

        public void map(Object key,Text value,Context context) {
            JSONObject valueJson = JSONObject.parseObject(value.toString());
            String parentId = valueJson.getString("parentId");
            try {
                if(StringUtils.isNotBlank(parentId)) {
                    context.write(new Text(parentId),value);
                }else {
                    String id = valueJson.getString("id");
                    context.write(new Text(id),value);
                }
            }catch(Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class QuestionAnswerReducer extends Reducer<Text,Text,Text,NullWritable> {

        private JSONObject questionJson = null;

        private JSONArray answerArray = new JSONArray();

        public void reduce(Text key, Iterable<Text> values, Context context) {
            questionJson = null;
            answerArray.clear();
            for (Text value:values) {
                JSONObject valueJson = JSONObject.parseObject(value.toString());
                String parentId = valueJson.getString("parentId");
                if(StringUtils.isNotBlank(parentId)) {
                    answerArray.add(valueJson);
                }else{
                    questionJson = valueJson;
                }
            }
            questionJson.put("answer",answerArray);
            try {
                context.write(new Text(questionJson.toString()),NullWritable.get());
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "question and answer");
            //与本身定义的类名保持一致
            job.setJarByClass(QuestionAnswerMain.class);
            //与本身定义的Mapper类和Reducer类保持一致
            job.setMapperClass(QuestionAnswerMapper.class);
            job.setReducerClass(QuestionAnswerReducer.class);
            //设置的输出键和输出值和mapper定义的须要保持一致。
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

分区模式

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Calendar;

/**
 * @Author bluesnail95
 * @Date 2019/7/22 6:27
 * @Description
 */
public class PartitionMain {

    public static class PatitionMapper extends Mapper<Object,Text, IntWritable, Text> {

        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key,Text value,Context context) {
            try {
                JSONObject valueJson = JSONObject.parseObject(value.toString());
                String strDate = valueJson.getString("lastAccessDate");
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(frmt.parse(strDate));
                int year = calendar.get(Calendar.YEAR);
                context.write(new IntWritable(year),value);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class LastAccessDatePartitioner extends Partitioner<IntWritable,Text> implements Configurable {

        private static final String MIN_LAST_ACCESS_DATE_YEAR = "min.last.access.date.year";
        private Configuration conf = null;
        private int minLastAccessDateYear = 0;

        public void setConf(Configuration conf) {
            this.conf = conf;
            minLastAccessDateYear = conf.getInt(MIN_LAST_ACCESS_DATE_YEAR,0);
        }

        public Configuration getConf() {
            return conf;
        }

        public int getPartition(IntWritable key, Text value, int numPartitions) {
            return key.get() - minLastAccessDateYear;
        }

        public static void setMinLastAccessDate(Job job,int minLastAccessDateYear) {
            job.getConfiguration().setInt(MIN_LAST_ACCESS_DATE_YEAR,minLastAccessDateYear);
        }
    }

    public static class PatitionReducer extends Reducer<IntWritable,Text,Text,NullWritable> {

        public void reduce(IntWritable key,Iterable<Text> values,Context context) {
            for(Text value:values) {
                try {
                    context.write(value, NullWritable.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Partition");
            //与本身定义的类名保持一致
            job.setJarByClass(PartitionMain.class);
            //与本身定义的Mapper类和Reducer类保持一致
            job.setMapperClass(PatitionMapper.class);
            job.setPartitionerClass(LastAccessDatePartitioner.class);
            LastAccessDatePartitioner.setMinLastAccessDate(job,2010);
            job.setNumReduceTasks(10);
            job.setReducerClass(PatitionReducer.class);
            //设置的输出键和输出值和mapper定义的须要保持一致。
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
            //输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

分箱模式

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/22 20:46
 * @Description
 */
public class BinningMain {

    public static class BinningMapper extends Mapper<Object,Text, Text, NullWritable> {

        private MultipleOutputs<Text,NullWritable> output = null;

        public void setup(Context context) {
            output = new MultipleOutputs<Text, NullWritable>(context);
        }

        public void map(Object key,Text value,Context context) {
            try {
                JSONObject valueJson = JSONObject.parseObject(value.toString());
                String tag = valueJson.getString("tag");
                if(StringUtils.isBlank(tag)) {
                    return;
                }
                if(tag.equalsIgnoreCase("hadoop")) {
                    output.write("bins",value,NullWritable.get(),"hadoop-tag");
                }
                if(tag.equalsIgnoreCase("hive")) {
                    output.write("bins",value,NullWritable.get(),"hive-tag");
                }
                if(tag.equalsIgnoreCase("hbase")) {
                    output.write("bins",value,NullWritable.get(),"hbase-tag");
                }
                if(tag.equalsIgnoreCase("pig")) {
                    output.write("bins",value,NullWritable.get(),"pig-tag");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        protected void cleanup(Context context) {
            try {
                output.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Binning");
            //与本身定义的类名保持一致
            job.setJarByClass(BinningMain.class);
            //与本身定义的Mapper类和Reducer类保持一致
            job.setMapperClass(BinningMapper.class);
            //设置的输出键和输出值和mapper定义的须要保持一致。
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            //输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class,NullWritable.class);
            MultipleOutputs.setCountersEnabled(job,true);
            job.setNumReduceTasks(0);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

全排序和混排模式

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/23 6:25
 * @Description
 */
public class TotalOrderSortMain {

    public static class LastAccessDateMapper extends Mapper<Object,Text,Text,Text> {

        public void map(Object key, Text value, Context context) {
            JSONObject valueJson = JSONObject.parseObject(value.toString());
            String lastAccessDate = valueJson.getString("lastAccessDate");
            try {
                context.write(new Text(lastAccessDate),value);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class ValueReducer extends Reducer<Text,Text,Text, NullWritable> {

        public void reduce(Text key,Iterable<Text> values,Context context) {
            for(Text value:values) {
                try {
                    context.write(value,NullWritable.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        Path inputPath = new Path(args[0]);
        Path partitionFile = new Path(args[1] + "_partitions.lst");
        Path outputStage = new Path(args[1] + "_staging");
        Path outputOrder = new Path(args[1]);

        try {
            Job sampleJob = Job.getInstance(configuration,"TotalOrderSortingStage");
            sampleJob.setJarByClass(TotalOrderSortMain.class);
            sampleJob.setMapperClass(LastAccessDateMapper.class);
            sampleJob.setNumReduceTasks(0);
            sampleJob.setOutputKeyClass(Text.class);
            sampleJob.setOutputValueClass(Text.class);
            TextInputFormat.setInputPaths(sampleJob,inputPath);
            sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
            FileUtil.fullyDelete(new File(args[1] + "_staging"));
            SequenceFileOutputFormat.setOutputPath(sampleJob,outputStage);

            int code = sampleJob.waitForCompletion(true) ? 0 : 1;

            if(code == 0) {
                Job orderJob = Job.getInstance(configuration,"TotalOrderSortingStage");
                orderJob.setMapperClass(Mapper.class);
                orderJob.setReducerClass(ValueReducer.class);
                orderJob.setNumReduceTasks(10);
                orderJob.setPartitionerClass(TotalOrderPartitioner.class);
                TotalOrderPartitioner.setPartitionFile(configuration,partitionFile);
                orderJob.setOutputKeyClass(Text.class);
                orderJob.setOutputValueClass(Text.class);
                orderJob.setInputFormatClass(SequenceFileInputFormat.class);
                SequenceFileInputFormat.setInputPaths(orderJob,outputStage);
                FileUtil.fullyDelete(new File(args[1]));
                TextOutputFormat.setOutputPath(orderJob,outputOrder);
                orderJob.getConfiguration().set("mapred.textoutputformat.separator","");
                InputSampler.writePartitionFile(orderJob,new InputSampler.RandomSampler(1,20));
                code = orderJob.waitForCompletion(true) ? 0 : 2;
            }
            FileSystem.get(new Configuration()).delete(partitionFile,false);
            FileSystem.get(new Configuration()).delete(outputStage,true);
            System.exit(0);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/22 20:46
 * @Description
 */
public class BinningMain {

    public static class BinningMapper extends Mapper<Object,Text, Text, NullWritable> {

        private MultipleOutputs<Text,NullWritable> output = null;

        public void setup(Context context) {
            output = new MultipleOutputs<Text, NullWritable>(context);
        }

        public void map(Object key,Text value,Context context) {
            try {
                JSONObject valueJson = JSONObject.parseObject(value.toString());
                String tag = valueJson.getString("tag");
                if(StringUtils.isBlank(tag)) {
                    return;
                }
                if(tag.equalsIgnoreCase("hadoop")) {
                    output.write("bins",value,NullWritable.get(),"hadoop-tag");
                }
                if(tag.equalsIgnoreCase("hive")) {
                    output.write("bins",value,NullWritable.get(),"hive-tag");
                }
                if(tag.equalsIgnoreCase("hbase")) {
                    output.write("bins",value,NullWritable.get(),"hbase-tag");
                }
                if(tag.equalsIgnoreCase("pig")) {
                    output.write("bins",value,NullWritable.get(),"pig-tag");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        protected void cleanup(Context context) {
            try {
                output.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Binning");
            //与本身定义的类名保持一致
            job.setJarByClass(BinningMain.class);
            //与本身定义的Mapper类和Reducer类保持一致
            job.setMapperClass(BinningMapper.class);
            //设置的输出键和输出值和mapper定义的须要保持一致。
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            //输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class,NullWritable.class);
            MultipleOutputs.setCountersEnabled(job,true);
            job.setNumReduceTasks(0);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

参考资料

《MapReduce设计模式》apache