简洁而不简单
压缩技术可以有效减小底层存储系统(HDFS)读写字节数。压缩提升了网络带宽和磁盘空间的效率。在 Hadoop下,尤为是数据规模很大和工做负载密集的状况下,使用数据压缩显得很是重要。在这种状况下,IO操做和网络数据传输要花大量的时间。还有, Shuffle与 Merge过程一样也面临着巨大的IO压力鳘于磁盘IO和网络带宽是 Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘IO和网络传输很是有帮助。java
不过,尽管压缩与解压操做的CPU开销不髙,其性能的提高和资源的节省并不是没有代价。若是磁盘IO和网络带宽影响了 MapReduce做业性能,在任意 MapReduce阶段启用压缩均可以改善端到端处理时间并減少IO和网络流量。git
压缩是提升 Hadoop运行效率的一种优化策略经过对 Mapper、 Reducer运行过程的数据进行压缩,以减小磁盘IO,提升MR程序运行速度。
注意:釆用压缩技术减小了磁盘IO,但同时增长了CPU运算负担。因此,压缩特性运用得当能提升性能,但运用不当也可能下降性能压缩基本原则:github
(1)运算密集型的job,少用压缩
(2)IO密集型的job,多用压缩!!算法
压缩格式 | hadoop自带? | 算法 | 文件扩展名 | 是否可切分 | 换成压缩格式后,原来的程序是否须要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理同样,不须要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理同样,不须要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理同样,不须要修改 |
LZO | 否,须要安装 | LZO | .lzo | 是 | 须要建索引,还须要指定输入格式 |
Snappy | 否,须要安装 | Snappy | .snappy | 否 | 和文本处理同样,不须要修改 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,以下表所示。apache
压缩格式 | 对应的编码/解码器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
参数 | 默认值 | 阶段 | 建议 |
---|---|---|---|
io.compression.codecs (在core-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 使用LZO或Snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress. DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) | RECORD | reducer输出 | SequenceFile输出使用的压缩类型:NONE和BLOCK |
public class TestCompress { public static void main(String[] args) throws Exception { compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec"); // decompress("e:/hello.txt.bz2"); } // 一、压缩 private static void compress(String filename, String method) throws Exception { // (1)获取输入流 FileInputStream fis = new FileInputStream(new File(filename)); Class codecClass = Class.forName(method); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration()); // (2)获取输出流 FileOutputStream fos = new FileOutputStream(new File(filename +codec.getDefaultExtension())); CompressionOutputStream cos = codec.createOutputStream(fos); // (3)流的对拷 IOUtils.copyBytes(fis, cos, 1024*1024*5, false); // (4)关闭资源 fis.close(); cos.close(); fos.close(); } // 二、解压缩 private static void decompress(String filename) throws FileNotFoundException, IOException { // (0)校验是否能解压缩 CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration()); CompressionCodec codec = factory.getCodec(new Path(filename)); if (codec == null) { System.out.println("cannot find codec for file " + filename); return; } // (1)获取输入流 CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename))); // (2)获取输出流 FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded")); // (3)流的对拷 IOUtils.copyBytes(cis, fos, 1024*1024*5, false); // (4)关闭资源 cis.close(); fos.close(); } }
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); // 开启map端输出压缩 configuration.setBoolean("mapreduce.map.output.compress", true); // 设置map端输出压缩方式 configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 1 : 0); } }
Mapper和Reducer代码不变网络
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置reduce端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); boolean result = job.waitForCompletion(true); System.exit(result?1:0); } }
本文配套 GitHub: https://github.com/zhutiansam...