一种分布式的计算方式指定一个Map(映#x5C04;)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证全部映射的键值对中的每个共享相同的键组数据库
map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)编程
Map输出格式和Reduce输入格式必定是相同的设计模式
MapReduce主要是先读取文件数据,而后进行Map处理,接着Reduce处理,最后把处理结果写到文件中缓存
记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录自己。它将数据以键值对的形式传输给mapper。一般键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围以内.markdown
在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,由于定义对于分布式任务的完成具备重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展现大量细节来解释特定键值如何选择.网络
ruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一块儿。并发
reducer采用分组数据做为输入。该功能传递键和此键相关值的迭代器。能够采用多种方式来汇总、过滤或者合并数据。当ruduce功能完成,就会发送0个或多个键值对。app
输出格式会转换最终的键值对并写入文件。默认状况下键和值以tab分割,各记录以换行符分割。所以能够自定义更多输出格式,最终数据会写入HDFS。相似记录读取,自定义输出格式不在本书范围。分布式
经过InputFormat决定读取的数据的类型,而后拆分红一个个InputSplit,每一个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Mapide
决定读取数据的格式,能够是文件或数据库等
List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题
RecordReader <k,v>createRecordReader():</k,v> 建立#x5EFA;RecordReader,从InputSplit中读取数据,解决读取分片中数据问题
TextInputFormat: 输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容
KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符以前的内容为Key,在以后的为Value。分隔符变量经过key.value.separator.in.input.line变量设置,默认为(\t)字符。
NLineInputFormat: 与TextInputFormat同样,但每一个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1
SequenceFileInputFormat: 一个用来读取字符流数据的InputFormat,<key,value>为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另外一个MapReduce任务的输入之间的数据传输过程。</key,value>
表明一个个逻辑分片,并无真正存储数据,只是提供了一个如何将数据分片的方法
Split内有Location信息,利于数据局部化
一个InputSplit给一个单独的Map处理
public abstract class InputSplit { /** * 获取Split的大小,支持根据size对InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 获取存储该分片的数据所在的节点位置. */ public abstract String[] getLocations() throws IOException, InterruptedException; }
将InputSplit拆分红一个个<key,value>对给Map处理,也是实际的文件读取分隔对象</key,value>
CombineFileInputFormat能够将若干个Split打包成一个,目的是避免过多的Map任务(由于Split的数目决定了Map的数目,大量的Mapper Task建立销毁开销将是巨大的)
一般一个split就是一个block(FileInputFormat仅仅拆分比block大的文件),这样作的好处是使得Map能够在存储有当前数据的节点上运行本地的任务,而不须要经过网络进行跨节点的任务调度
经过mapred.min.split.size, mapred.max.split.size, block.size来控制拆分的大小
若是mapred.min.split.size大于block size,则会将两个block合成到一个split,这样有部分block数据须要经过网络读取
若是mapred.max.split.size小于block size,则会将一个block拆成多个split,增长了Map任务数(Map对split进行计算ק#x5E76;且上报结果,关闭当前计算打开新的split均须要耗费资源)
先获取文件在HDFS上的路径和Block信息,而后根据splitSize对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认splitSize 就等于blockSize的默认值(64m)
public List<InputSplit> getSplits(JobContext job) throws IOException { // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job.getConfiguration()); // 获取该文件全部的block信息列表[hostname, offset, length] BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); // 判断文件是否可分割,一般是可分割的,但若是文件是压缩的,将不可分割 if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 计算分片大小 // 即 Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 循环分片。 // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,中止分片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 处理余下的数据 if (bytesRemaining != 0) { splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else { // 不可split,整块返回 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { // 对于长度为0的文件,建立空Hosts列表,返回 splits.add(makeSplit(path, 0, length, new String[0])); } } // 设置输入文件数量 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }
split是根据文件大小分割的,而通常处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split
解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录
public class LineRecordReader extends RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // initialize函数即对LineRecordReader的一个初始化 // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片通过压缩的状况等 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打开文件,并定位到分片读取的起始位置 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { // 文件是压缩文件的话,直接打开文件 in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { // 只要不是第一个split,则忽略本split的第一行数据 if (start != 0) { skipFirstLine = true; --start; // 定位到偏移位置,下&#x#x6B21;读取就会从偏移位置开始 fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // 忽略第一行数据,从新定位start start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos);// key即为偏移量 if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 读取的数据长度为0,则说明已读完 if (newSize == 0) { break; } pos += newSize; // 读取的数据长度小于最大行长度,也说明已读取完毕 if (newSize < maxLineLength) { break; } // 执行到此处,说明该行数据没读完,继续读入 } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } }
主要是读取InputSplit的每个Key,Value对并进行处理
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { /** * 预处理,仅在map task启动时运行一次 */ protected void setup(Context context) throws IOException, InterruptedException { } /** * 对于InputSplit中的每一对<key, value>都会运行一次 */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * 扫尾工做,好比关闭流等 */ protected void cleanup(Context context) throws IOException, InterruptedException { } /** * map task的驱动器 */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } } public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private RecordReader<KEYIN, VALUEIN> reader; private InputSplit split; /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } }
对Map的结果进行排序并传输到Reduce进行处理 Map的结果并不#x662F;直接存放到硬盘,而是利用缓存作一些预排序处理 Map会调用Combiner,压缩,按key进行分区、排序等,尽可能减小结果的大小 每一个Map完成后都会通知Task,而后Reduce就能够进行处理
当Map程序开始产生结果的时候,并非直接写到文件的,而是利用缓存作一些排序方面的预处理操做
每一个Map任务都有一个循环内存缓冲区(默认100MB),当缓存的内容达到80%时,后台线程开始将内容写到文件,此时Map任务能够&#x#x7EE7;续输出结果,但若是缓冲区满了,Map任务则须要等待
写文件使用round-robin方式。在写入文件以前,先将数据按照Reduce进行分区。对于每个分区,都会在内存中根据key进行排序,若是配置了Combiner,则排序后执行Combiner(Combine以后能够减小写入文件和传输的数据)
每次结果达到缓冲区的阀值时,都会建立一个文件,在Map结束时,可能会产生大量的文件。在Map完成前,会将这些文件进行合并和排序。若是文件的数量超过3个,则&##x5408;并后会再次运行Combiner(一、2个文件就没有必要了)
若是配置了压缩,则最终写入的文件会先进行压缩,这样能够减小写入和传输的数据
一旦Map完成,则通知任务管理器,此时Reduce就能够开始复制结果数据
Map的结果文件都存放到运行Map任务的机器的本地硬盘中
若是Map的结果不多,则直接放到内存,不然写入文件中
同时后台线程将这些文件进行合并和排序到一个更大的文件中(若是文件是压缩的ÿ#xFF0C;则须要先解压)
当全部的Map结果都被复制和合并后,就会调用Reduce方法
Reduce结果会写入到HDFS中
通常的原则是给shuffle分配尽量多的内存,但前提是要保证Map、Reduce任务有足够的内存
对于Map,主要就是避免把文件写入磁盘,例如使用Combiner,增大io.sort.mb的值
对于Reduce,主要是把Map的结果尽量地保存到内存中,一样也是要避免把中间结果写入磁盘。默认状况下,全部的内存都是分配给Reduce方法的,若是Reduce方法不怎&##x4E48;消耗内存,能够mapred.inmem.merge.threshold设成0,mapred.job.reduce.input.buffer.percent设成1.0
在任务监控中可经过Spilled records counter来监控写入磁盘的数,但这个值是包括map和reduce的
对于IO方面,能够Map的结果可使用压缩,同时增大buffer size(io.file.buffer.size,默认4kb)
属性 | 默认值 | 描述 |
---|---|---|
io.sort.mb | 100 | 映射输出分类时所使用缓冲区的大小. |
io.sort.record.percent | 0.05 | 剩余空间用于映射输出自身记录.在1.X发布后去除此属性.随机代码用于使用映射全部内存并记录信息. |
io.sort.spill.percent | 0.80 | 针对映射输出内存缓冲和记录索引的阈值使用比例. |
io.sort.factor | 10 | 文件分类时合并流的最大数量。此属性也用于reduce。一般把数字设为100. |
min.num.spills.for.combine | 3 | 组合运行所需最小溢出文件数目. |
mapred.compress.map.output | false | 压缩映射输出. |
mapred.map.output.compression.codec | DefaultCodec | 映射输出所需的压缩解编码器. |
mapred.reduce.parallel.copies | 5 | 用于向reducer传送映射输出的线程数目. |
mapred.reduce.copy.backoff | 300 | 时间的最大数量,以秒为单位,这段时间内若reducer失败则会反复尝试传输 |
io.sort.factor | 10 | 组合运行所需最大溢出文件数目. |
mapred.job.shuffle.input.buffer.percent | 0.70 | 随机复制阶段映射输出缓冲器的堆栈大小比例 |
mapred.job.shuffle.merge.percent | 0.66 | 用于启动合并输出进程和磁盘传输的映射输出缓冲器的阀值使用比例 |
mapred.inmem.merge.threshold | 1000 | 用于启动合并输出和磁盘传输进程的映射输出的阀值数目。小于等于0意味着没有门槛,而溢出行为由 mapred.job.shuffle.merge.percent单独管理. |
mapred.job.reduce.input.buffer.percent | 0.0 | 用于减小内存映射输出的堆栈大小比例,内存中映射大小不得超出此值。若reducer须要较少内存则能够提升该值. |
export LIBJARS=$MYLIB/commons-lang-2.3.jar, <other_jars_used_by_remote_components>hadoop jar prohadoop-0.0.1-SNAPSHOT.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner -libjars $LIBJARS<input_path><output_path>
hadoop jar prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner <input_path>The dependent libraries are now included inside the application JAR file
通常仍是上面的好,指定依赖能够利用Public Cache,若是是包含依赖,则每次都须要拷贝