Hadoop 小文件的处理

hadoop的HDFS和MapReduce自己都是用户处理大量数据的大文件,对于小文件来讲,因为namenode会在记录每一个block对象,若是存在大量的小文件,会占用namenode的大量内存空间,并且HDFS存储文件是按block来存储,即便一个文件的大小不足一个block的大小,文件仍是会占用一个block的存储空间,因此大量的小文件会对HDFS的存储和访问都带来不利的影响。 hadoop对于小文件的处理主要有Hadoop Archive,Sequence file和CombineFileInputFormat三种方式。 ##Hadoop Archive Hadoop Archive是hadoop的归档命令,能够将hdfs上的小文件打包成一个har文件,这种方式虽然不会减小小文件占用大量存储空间的问题,可是会减小namenode的内存空间。同时har文件支持hdfs命令对其的访问。html

命令:hadoop archive -archiveName 归档名称 -p 父目录 [-r <复制因子>] 原路径(能够多个) 目的路径java

-archiveNames设置归档生成文件的名字node

-p 须要进行归档的文件的父目录shell

例子:apache

$ hadoop fs -ls /user/test/yhj/input/
Found 3 items
-rw-r--r--   3 root hdfs        760 2018-07-04 11:48 /user/test/yhj/input/word1.txt
-rw-r--r--   3 root hdfs         82 2018-07-04 11:48 /user/test/yhj/input/word2.txt
-rw-r--r--   3 root hdfs       1738 2018-07-04 11:48 /user/test/yhj/input/word3.txt
$ hadoop archive -archiveName word.har -p /user/test/yhj/input/ word1.txt word2.txt word3.txt /user/test/yhj/harInput/
$ hadoop fs -ls /user/test/yhj/harInput/
Found 1 items
drwxr-xr-x   - hdfs hdfs          0 2018-07-05 20:18 /user/test/yhj/harInput/word.har

HAR文件的生成是经过运行一个mapreduce的程序生成,因此须要集群环境中装个mapreduceapi

HAR是在Hadoop file system之上的一个文件系统,所以全部fs shell命令对HAR文件都可用,但使用不一样的URI。另外,请注意档案是不可变的。因此,重命名,删除并建立返回一个错误,例如:app

$ hadoop fs -ls /user/test/yhj/harInput/word.har
Found 4 items
-rw-r--r--   3 hdfs hdfs          0 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_SUCCESS
-rw-r--r--   5 hdfs hdfs        255 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_index
-rw-r--r--   5 hdfs hdfs         22 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_masterindex
-rw-r--r--   3 hdfs hdfs       2580 2018-07-05 20:18 /user/test/yhj/harInput/word.har/part-0
$ hadoop fs -ls har:/user/test/yhj/harInput/word.har
Found 3 items
-rw-r--r--   3 hdfs hdfs        760 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word1.txt
-rw-r--r--   3 hdfs hdfs         82 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word2.txt
-rw-r--r--   3 hdfs hdfs       1738 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word3.txt

能够看到Hadoop存档目录包含元数据(采用_index和_masterindex形式)、数据部分data(part- *)文件、归档文件的名称和部分文件中的位置(_index文件)。ide

HAR文件也能够被mapreduce读取,路径的URI可使用不一样的URI,好比例子中的文件输入的路径URI能够下面两种方式使用函数

hdfs://10.1.13.111:8020/user/test/yhj/harInput/word.har
har://hdfs-10.1.13.111:8020/user/test/yhj/harInput/word.har

可是这个例子的文件来讲,两个输入路径产生map的个数是不一样的,har的路径产生的map有三个,对应三个word*.txt,而hdfs的路径只有一个,对应word.har/part-0oop

若是是文件支持行记录切分使用mapreduce来处理数据(文件的先后数据不相互影响),建议使用hdfs的URI路径,由于存档目录的part-*可能包括多个小文件的数据,这样能够减小map的个数,不会为每一个单独的小文件启动一个map。

CombineFileInputFormat

将大量小文件作为mapreduce的输入是不合适的,由于FileInputFormat只会分割大文件(文件大小超过设定的分片大小,默认为HDFS的块大小),对于小于分片大小的文件,每一个文件做为一个分片,若是文件大小小于一个块的大小,mapreduce会为每一个小文件产生一个map,这样会产生大量小文件,而每一个map只会处理少许数据,每次map操做都会产生开销。固然能够经过mapred.min.split.size和mapred.max.split.size来控制map数量。

CombineFileInputFormat是mapreduce针对小文件而设计的,CombineFileInputFormat能够将多个小文件打包进一个分片,另外,比直接设置map数量好的在于,CombineFileInputFormat在决定将那些块放入一个分片是会考虑到块所在的节点和机架的位置,避免操做分片是过多的数据传输。

CombineFileInputFormat是一个抽象类,hadoop自带的实现的有CombineTextInputFormat,咱们能够经过继承CombineFileInputFormat实现createRecordReader方法,自定义RecordReader类来实现理海量小文件的MapReduce。

InputFormat主要有两个方法,getSplits(计算获得分片),createRecordReader(产生返回RecordReader,RecordReader生成输出map读入的键值对)

CombineFileInputFormat中已经实现了getSplits,即将多个小文件打包进一个分片中CombineFileSplit,咱们须要实现createRecordReader方法,返回一个能够读取该分片中内容的RecordReader。

MyCombineInputFormat的实现

public class MyCombineInputFormat extends CombineFileInputFormat<LongWritable, Text>{
    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        RecordReader<LongWritable, Text> reader = new CombineFileRecordReader<>((CombineFileSplit) inputSplit, taskAttemptContext, MyCombineFileRecordReader.class);
        try {
            reader.initialize(inputSplit, taskAttemptContext);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return reader;
    }
}

这里实际返回了一个CombineFileRecordReader的对象,CombineFileRecordReader经过CombineFileSplit,context和Class<? extends RecordReader>类型构造,MyCombineFileRecordReader是咱们对于CombineFileSplit中每个文件的产生map的输入的方法。 CombineFileRecordReader中的nextKeyValue方法,会为每个打包在CombineFileSplit中的文件构造一个RecordReader方法,读取文件中的记录。

public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
	...
    public CombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Class<? extends RecordReader<K, V>> rrClass) throws IOException {
        this.split = split;
        this.context = context;
        this.idx = 0;
        this.curReader = null;
        this.progress = 0L;

        try {
            this.rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
            this.rrConstructor.setAccessible(true);
        } catch (Exception var5) {
            throw new RuntimeException(rrClass.getName() + " does not have valid constructor", var5);
        }

        this.initNextRecordReader();
    }


	protected boolean initNextRecordReader() throws IOException {
        if(this.curReader != null) {
            this.curReader.close();
            this.curReader = null;
            if(this.idx > 0) {
                this.progress += this.split.getLength(this.idx - 1);
            }
        }

        if(this.idx == this.split.getNumPaths()) {
            return false;
        } else {
            this.context.progress();

            try {
                Configuration conf = this.context.getConfiguration();
                conf.set("mapreduce.map.input.file", this.split.getPath(this.idx).toString());
                conf.setLong("mapreduce.map.input.start", this.split.getOffset(this.idx));
                conf.setLong("mapreduce.map.input.length", this.split.getLength(this.idx));
                this.curReader = (RecordReader)this.rrConstructor.newInstance(new Object[]{this.split, this.context, Integer.valueOf(this.idx)});
                if(this.idx > 0) {
                    this.curReader.initialize(this.split, this.context);
                }
            } catch (Exception var2) {
                throw new RuntimeException(var2);
            }

            ++this.idx;
            return true;
	}
	
	 public boolean nextKeyValue() throws IOException, InterruptedException {
        do {
            if(this.curReader != null && this.curReader.nextKeyValue()) {
                return true;
            }
        } while(this.initNextRecordReader());

        return false;
    }

    public K getCurrentKey() throws IOException, InterruptedException {
        return this.curReader.getCurrentKey();
    }

    public V getCurrentValue() throws IOException, InterruptedException {
        return this.curReader.getCurrentValue();
    }
	...
}

在nextKeyValue方法中经过自定义的RecordReader的nextKeyValue读取当前文件的对象,当读完当前文件中的信息,后会经过initNextRecordReader返回初始化的下一个文件的RecordReader,因此咱们只需实现相应的读取一个文件的RecordReader便可。

MyCombineFileRecordReader的实现

public class MyCombineFileRecordReader extends RecordReader<LongWritable, Text> {

    private CombineFileSplit combineFileSplit;
    private int currentIndex;
    private LineRecordReader reader = new LineRecordReader();
    private int totalNum;

    public MyCombineFileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index){
        super();
        this.combineFileSplit = combineFileSplit;
        this.currentIndex = index;
        this.totalNum = combineFileSplit.getNumPaths();
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex),
                combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
        context.getConfiguration().set("mapreduce.map.input.file.name", fileSplit.getPath().getName());
        this.reader.initialize(fileSplit, context);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(currentIndex >= 0 && currentIndex < totalNum){
            return reader.nextKeyValue();
        }else {
            return false;
        }
    }

    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return reader.getCurrentKey();
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return reader.getCurrentValue();
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        if(currentIndex >= 0 && currentIndex < totalNum){
            return (float)currentIndex/totalNum;
        }
        return 0;
    }

    @Override
    public void close() throws IOException {
        reader.close();
    }
}

MyCombineFileRecordReader中经过LineRecordReader按行来读取文本记录,在initialize方法中经过CombineFileSplit和index(CombineFileSplit中文件信息的索引位置)来获得相应文件的信息,建立对应的FileSplit,接着建立LineRecordReader对象,在nextKeyValue中委托给LineRecordReader为mapper产生键-值对象。

最后入口函数和map类的实现,将InputFormatClass替换成自定义的MyCombineInputFormat类

public class CombineInputFromatMain extends Configured implements Tool{

    public static class CombineInputFormatMap extends Mapper<Object, Text, Text, Text>{
        private Text outKey = new Text();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            outKey.set(context.getConfiguration().get("mapreduce.map.input.file.name"));
            context.write(outKey, value);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        //设定默认job和设置输入输出路径的函数
        Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
        job.setJobName("CombineInputFormat Text");
        job.setJarByClass(CombineInputFromatMain.class);
        job.setMapperClass(CombineInputFormatMap.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(MyCombineInputFormat.class);
        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new CombineInputFromatMain(), args));
    }
}

在这例子中将三个word*.txt文件打包进一个分片,实际只产生了一个map。

Sequence file

sequence file由一系列的二进制key/value组成,若是为key小文件名,value为文件内容,则能够将大批小文件合并成一个大文件。

顺序文件由文件头和随后的记录内容组成,顺序文件的前三个字节为SEQ(顺序文件代码),紧接着一个字节表示文件的版本号,文件头还包括键和值的类型,数据是否压缩的标志位,是否进行快压缩的标志位, 数据的压缩形式,用户自定义的数据以及同步标识。顺序文件读取内容只能从同步标识开始读取。同步标识位于记录和记录之间,也就是说没法从记录中间开始读取顺序文件的内容。

Sequence file的格式主要有三种,分为未压缩,记录压缩和块压缩。主要格式的存储方式能够查看官方给出的api:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

将小文件合并成一个sequence file的实现(代码参考hadoop 权威指南)

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {

    public static class WholeFileInputFormat extends FileInputFormat<LongWritable, Text>{

        /**
         * 不切分文件,一个split读入整个文件
         * @param context
         * @param filename
         * @return
         */
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }

        @Override
        public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            RecordReader reader = new WholeFileRecordReader();
            reader.initialize(inputSplit, taskAttemptContext);
            return reader;
        }
    }

    /**
     * 自定义RecordReader,读取整个小文件内容
     */
    public static class WholeFileRecordReader extends RecordReader<LongWritable, Text>{

        private FileSplit fileSplit;
        private Configuration conf;
        private LongWritable key = new LongWritable();
        private Text value = new Text();
        private boolean process = false;

        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.fileSplit = (FileSplit)inputSplit;
            this.conf = taskAttemptContext.getConfiguration();
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if(!process){
                FileSystem fs = fileSplit.getPath().getFileSystem(conf);
                FSDataInputStream in = null;
                try {
                    in = new FSDataInputStream(fs.open(fileSplit.getPath()));
                    byte[] contextByte = new byte[(int)fileSplit.getLength()];
                    IOUtils.readFully(in, contextByte, 0, contextByte.length);
                    //等同于 in.read(contextByte, 0, contextByte.length);
                    String context = new String(contextByte, "utf-8");
                    key.set(fileSplit.getStart());
                    value.set(context);
                }finally {
                    IOUtils.closeStream(in);
                }
                process = true;
                return true;
            }
            return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return process? 1.0f:1.0f;
        }

        @Override
        public void close() throws IOException {

        }
    }


    public static class SmallFilesToSequenceFileMap extends Mapper<Object, Text, Text, Text>{

        private Text outKey = new Text();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            outKey.set(((FileSplit)context.getInputSplit()).getPath().toString());
        }

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            context.write(outKey, value);
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        //设定默认job和设置输入输出路径的函数
        Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
        job.setJobName("SmallFiles To SequenceFile");
        job.setMapperClass(SmallFilesToSequenceFileMap.class);
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        return job.waitForCompletion(true)? 0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new SmallFilesToSequenceFileConverter(), args));
    }
}

hdfs能够经过命令行hadoop fs -text来显示以文本的方式显示顺序文件

读取SequenceFile简单实现

public class SequenceFileReadMain extends Configured implements Tool{

    public static class SequenceFileReadMap extends Mapper<Text, Text, Text, Text>{
        private Text outKey = new Text();
        private Text outValue = new Text();
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            outKey.set("key : " + key.toString());
            outValue.set("value : " + value.toString());
            context.write(outKey, outValue);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
        job.setJobName("Sequence File Read");
        job.setMapperClass(SequenceFileReadMap.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new SequenceFileReadMain(), args));
    }
}

这时候读取SequenceFile的时候,key对应的是小文件的名字,value是一个小文件的全部内容,因此须要在map编写处理整个小文件内容的代码

参考资料:

https://blog.csdn.net/u011007180/article/details/52333387

https://www.cnblogs.com/staryea/p/8603112.html

http://dongxicheng.org/mapreduce/hdfs-small-files-solution/

相关文章
相关标签/搜索