MapReduce应用普遍的缘由之一就是其易用性,提供了一个高度抽象化而变得很是简单的编程模型,它是在总结大量应用的共同特色的基础上抽象出来的分布式计算框架,在其编程模型中,任务能够被分解成相互独立的子问题。MapReduce编程模型给出了分布式编程方法的5个步骤:java
下面就简要总结一下编程模型中用到的主要组件以及在其中的做用: node
仍然以示例开始:算法
package hadoop; import java.io.IOException; import java.util.StringTokenizer; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.example.GroupWriteSupport; /** * * <p>Title: ParquetNewMR</p> * <p>Description: </p> * @author zjhua * @date 2019年4月7日 */ public class ParquetNewMR { /** * map模型 * <p>Title: WordCountMap</p> * <p>Description: </p> * @author zjhua * @date 2019年4月23日 */ public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } } } /** * reduce模型 * <p>Title: WordCountReduce</p> * <p>Description: </p> * @author zjhua * @date 2019年4月23日 */ public static class WordCountReduce extends Reducer<Text, IntWritable, Void, Group> { private SimpleGroupFactory factory; @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } Group group = factory.newGroup() .append("name", key.toString()) .append("age", sum); context.write(null,group); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration())); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String writeSchema = "message example {\n" + "required binary name;\n" + "required int32 age;\n" + "}"; conf.set("parquet.example.schema",writeSchema); // conf.set("dfs.client.use.datanode.hostname", "true"); Job job = Job.getInstance(conf); // new Job()接口过时了 job.setJarByClass(ParquetNewMR.class); job.setJobName("parquet"); String in = "hdfs://192.168.223.150:8020/user/hadoop1/wordcount/input"; String out = "hdfs://192.168.223.150:8020/user/hadoop1/pq_out_" + UUID.randomUUID().toString(); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputValueClass(Group.class); job.setMapperClass(WordCountMap.class); // Map实现类 job.setReducerClass(WordCountReduce.class); //Reduce实现类 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(ParquetOutputFormat.class); FileInputFormat.addInputPath(job, new Path(in)); ParquetOutputFormat.setOutputPath(job, new Path(out)); ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class); job.waitForCompletion(true); } }
1. InputFormat
主要用于描述输入数据的格式,提供数据切分功能,按照某种方式将输入数据且分红若干个split,肯定map task的个数,以及为Mapper提供输入数据,给定某个split,让其解析成一个个key/value对。
InputFormat中的getSplits方法主要完成数据切分的功能,会尝试着将输入数据且分红numSplits个进行存储。InputSplit中只记录了分片的元数据信息,好比起始位置、长度以及所在的节点列表。
在Hadoop中对象的序列化主要用在进程间通讯以及数据的永久存储。Client端会调用Job中的InputFormat中的getSplits函数,看成业提交到JobTracker端对做业初始化时,能够直接读取该文件,解析出全部InputSplit,并建立对应的MapTask。
而重要的方法就是getRecordReader,其返回一个RecordReader,将输入的InputSplit解析成若干个key/value对。MapReduce框架在Map Task执行过程当中,不断地调用RecordReader对象中的方法,获取key/value对交给map函数处理,伪代码以下:
apache
K1 key = input.createKey(); V1 value = input.createValue(); while(input.next(key, value)){ //invoke map() } input.close();
对于FileInputFormat,这是一个采用统一的方法对各类输入文件进行切分的InputFormat,也是好比TextInputFormat, KeyValueInputFormat等类的基类。其中最重要的是getSplits函数,最核心的两个算法就是文件切分算法以及host选择算法。
文件切分算法主要用于肯定InputSplit的个数以及每一个InputSplit对应的数据段。
在InputSplit切分方案完成后,就须要肯定每一个InputSplit的元数据信息: <file, start, length, host>,表示InputSplit所在文件,起始位置,长度以及所在的host节点列表,其中host节点列表是最难肯定的。
host列表选择策略直接影响到运行过程当中的任务本地性。Hadoop中HDFS文件是以block为单位存储的,一个大文件对应的block可能会遍及整个集群,InputSplit的划分算法可能致使一个InputSplit对应的多个block位于不一样的节点上。
hadoop将数据本地性分红三个等级:node locality, rack locality和data center locality。在进行任务调度时,会依次考虑3个节点的locality,优先让空闲资源处理本节点的数据,其次同一个机架上的数据,最差是处理其余机架上的数据。
虽然InputSplit对应的block可能位于多个节点上,但考虑到任务调度的效率,一般不会将全部节点到InputSplit的host列表中,而是选择数据总量最大的前几个节点,做为任务调度时判断任务是否具备本地性的主要凭据。对于FileInputFormat设计了一个简单有效的启发式算法:按照rack包含的数据量对rack进行排序,在rack内部按照每一个node包含的数据量对node排序,取前N个node的host做为InputSplit的host列表(N为block的副本数,默认为3)。
当InputSplit的尺寸大于block的尺寸时,MapTask不能实现彻底的数据本地性,总有一部分数据须要从远程节点中获取,所以当使用基于FileInputFormat实现InputFormat时,为了提升Map Task的数据本地性,应该尽可能使得InputSplit大小与block大小相同。(虽然理论上是这么说,可是这会致使过多的MapTask,使得任务初始时占用的资源很大)。
2. OutputFormat
OutputFormat主要用于描述输出数据的格式,可以将用户提供的key/value对写入特定格式的文件中。其中与InputFormat相似,OutputFormat接口中有一个重要的方法就是getRecordWriter,返回的RecordWriter接收一个key/value对,并将之写入文件。Task执行过程当中,MapReduce框架会将map或reduce函数产生的结果传入write方法:编程
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException{ output.collect(newKey, newValue); }
hadoop中全部基于文件的OutputFormat都是从FileOutputFormat中派生的,事实上这也是最经常使用的OutputFormat。总结发现,FileOutputFormat实现的主要功能有两点:多线程
默认状况下,看成业成功完成后,会在最终结果目录下生成空文件_SUCCESS,该文件主要为高层应用提供做业运行完成的标识(好比oozie工做流就能够根据这个判断任务是否执行成功)。
3. Mapper和Reducer
Mapper的过程主要包括初始化、Map操做执行和清理三个部分。Reducer过程与Mapper过程基本相似。app
对于一个MapReduce应用,不必定非要存在Mapper,MapReduce框架提供了比Mapper更加通用的接口:org.apache.hadoop.mapred.MapRunnable,能够直接实现该接口定制本身的key/value处理逻辑(相对于MapReduce阶段中固定的map阶段,能够跳过Map阶段,好比Hadoop Pipes中的将数据发送给其余进程处理)。
MapRunner是其固定实现,直接调用用户job中设置的Mapper Class,此外,hadoop中还提供了一个多线程的MapRunnable实现,用于非CPU类型的做业提供吞吐率。
4. Partitioner
Partitoner的做用是对Mapper产生的中间结果进行分片,将同一分组的数据交给一个Reducer来处理,直接影响这Reducer阶段的负载均衡。其中最重要的方法就是getPartition,包含三个参数,key,value,以及Reducer的个数numPartions。
MapReduce提供两个Partitioner实现,HashPartitoner和TotalOrderPartitioner。HashPartitioner是默认实现,基于哈希值进行分片;TotalOrderPartitoner提供了一种基于区间分片的方法,一般用在数据的全排序中。例如归并排序,若是Map Task进行局部排序后Reducer端进行全局排序,那么Reducer端只能设置成1个,这会成为性能瓶颈,为了提升全局排序的性能和扩展性,并保证一个区间中的全部数据都大于前一个区间的数据,就会用到TotalOrderPartitioner。负载均衡