简单解析mapreduce切片

  在mapreduce中的切片是什么意思?顾名思义就是将数据进行切分,切分为数据片,其实这个切片关乎于map阶段的map个数,以及每一个map处理的数据量的大小。node

  mapreduce中,一个job的map个数, 每一个map处理的数据量是如何决定的呢? 另外每一个map又是如何读取输入文件的内容呢? 用户是否能够本身决定输入方式, 决定map个数呢? 编程

  mapreduce做业会根据输入目录产生多个map任务, 经过多个map任务并行执行来提升做业运行速度, 但若是map数量过少, 并行量低, 做业执行慢, 若是map数过多, 资源有限,也会增长调度开销. 所以, 根据输入产生合理的map数, 为每一个map分配合适的数据量, 能有效的提高资源利用率, 并使做业运行速度加快。数组

  在mapreduce中, 每一个做业都会经过 InputFormat来决定map数量,oop

InputFormat是一个接口, 提供两个方法:code

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;

  其中getSplits方法会根据输入目录产生InputSplit数组, 每一个InputSplit会相应产生一个map任务, map的输入定义在InputSplit中. getRecordReader方法返回一个RecordReader对象, RecordReader决定了map任务如何读取输入数据, 例如一行一行的读取仍是一个字节一个字节的读取, 等等。而咱们大多数其实在使用TextInputFormat,但TextInputFormat没有实现本身的getSplits方法,它继承于FileInputFormat, 所以使用了FileInputFormat的getSplits方法.orm

  旧接口FileInputFormat的getSplits流程包含两个参数和一个公式:对象

mapred.min.split.size        (一个map最小输入长度)
mapred.map.tasks                (推荐map数量)
Math.max(minSize, Math.min(goalSize, blockSize))

  如何决定每一个map输入长度呢? 首先获取输入目录下全部文件的长度和, 除以mapred.map.tasks(这个属性是咱们能够再程序中设置的)获得一个推荐长度goalSize, 而后经过公式: Math.max(minSize, Math.min(goalSize, blockSize))决定map输入长度。 这里的minSize为mapred.min.split.size, 这个属性是咱们在配置文件中配置的,blockSize为相应文件的block长度。这公式能保证一个map的输入至少大于mapred.min.split.size, 对于推荐的map长度,只有它的长度小于blockSize且大于mapred.min.split.size才会有效果. 因为mapred.min.split.size默认长度为1, 所以一般状况下只要小于blockSize就有效果,不然使用blockSize作为map输入长度.继承

  所以, 若是想增长map数, 能够把mapred.min.split.size调小(其实默认值便可), 另外还须要把mapred.map.tasks设置大.若是须要减小map数,能够把mapred.min.split.size调大, 另外把mapred.map.tasks调小.接口

  这里要特别指出的是FileInputFormat会让每一个输入文件至少产生一个map任务, 所以若是你的输入目录下有许多文件, 而每一个文件都很小, 例如几十kb, 那么每一个文件都产生一个map会增长调度开销. 做业变慢.资源

  那么如何防止这种问题呢? CombineFileInputFormat能有效的减小map数量。

  Hadoop 0.20开始定义了一套新的mapreduce编程接口, 使用新的FileInputFormat, 它与旧接口下的FileInputFormat主要区别在于, 它再也不使用mapred.map.tasks, 而使用mapred.max.split.size参数代替goalSize, 经过Math.max(minSize, Math.min(maxSize, blockSize))决定map输入长度, 一个map的输入要大于minSize,小于Math.min(maxSize, blockSize)。

  若需增长map数,能够把mapred.min.split.size调小,把mapred.max.split.size调大。 若需减小map数, 能够把mapred.min.split.size调大, 并把mapred.max.split.size调小。

CombineFileInputFormat

  顾名思义, CombineFileInputFormat的做用是把许多文件合并做为一个map的输入.

  在它以前,可使用MultiFileInputFormat,不过其功能太简单, 它以文件为单位,一个文件至多分给一个map处理, 若是某个目录下有许多小文件, 另外还有一个超大文件, 处理大文件的map会严重偏慢.

  CombineFileInputFormat是一个被推荐使用的InputFormat. 它有三个配置:

mapred.min.split.size.per.node, 一个节点上split的至少的大小
mapred.min.split.size.per.rack   一个交换机下split至少的大小
mapred.max.split.size             一个split最大的大小

  它的主要思路是把输入目录下的大文件分红多个map的输入, 并合并小文件, 作为一个map的输入. 具体的原理是下述三步:

  1. 根据输入目录下的每一个文件,若是其长度超过mapred.max.split.size,以block为单位分红多个split(一个split是一个map的输入),每一个split的长度都大于mapred.max.split.size,
    由于以block为单位, 所以也会大于blockSize,
    此文件剩下的长度若是大于mapred.min.split.size.per.node, 则生成一个split, 不然先暂时保留.
  2. 如今剩下的都是一些长度效短的碎片,把每一个rack下碎片合并, 只要长度超过mapred.max.split.size就合并成一个split, 最后若是剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一个split, 不然暂时保留.
  3. 把不一样rack下的碎片合并, 只要长度超过mapred.max.split.size就合并成一个split, 剩下的碎片不管长度, 合并成一个split.

举例:

mapred.max.split.size=1000
mapred.min.split.size.per.node=300
 mapred.min.split.size.per.rack=100

  输入目录下五个文件,rack1下三个文件,长度为2050,1499,10, rack2下两个文件,长度为1010,80. 另外blockSize为500.

  通过第一步, 生成五个split: 1000,1000,1000,499,1000. 剩下的碎片为rack1下:50,10; rack2下10:80

  因为两个rack下的碎片和都不超过100, 因此通过第二步, split和碎片都没有变化.

  第三步,合并四个碎片成一个split, 长度为150.

  若是要减小map数量, 能够调大mapred.max.split.size, 不然调小便可.

  其特色是: 一个块至多做为一个map的输入,一个文件可能有多个块,一个文件可能由于块多分给作为不一样map的输入, 一个map可能处理多个块,可能处理多个文件。

相关文章
相关标签/搜索