1 概述

压缩策略和原则java

2 MR 支持的压缩编码
压缩格式 |
hadoop自带 |
算法 |
文件扩展名 |
是否可切分 |
换成压缩格式后,原程序是否须要修改 |
DEFLATE |
是,直接使用 |
DEFLATE |
.deflate |
否 |
和文本处理同样,不须要修改 |
Gzip |
是,直接使用 |
DEFLATE |
.gz |
否 |
和文本处理同样,不须要修改 |
bzip2 |
是,直接使用 |
bzip2 |
.bz2 |
是 |
和文本处理同样,不须要修改 |
LZO |
否,须要安装 |
LZO |
.lzo |
是 |
须要建索引,还须要指定输入格式 |
Snappy |
否,须要安装 |
Snappy |
.snappy |
否 |
和文本处理同样,不须要修改 |
为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码器,以下表所示。算法
压缩格式 |
对应的编码/解码器 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip2 |
org.apache.hadoop.io.compress.BZip2Codec |
LZO |
com.hadoop.compression.lzo.LzopCodec |
Snappy |
org.apache.hadoop.io.compress.SnappyCodec |
压缩性能的比较apache
压缩算法 |
原始文件大小 |
压缩文件大小 |
压缩速度 |
解压速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/s |
74.6MB/s |
3 压缩方式选择
3.1 Gzip 压缩

3.2 Bzip2 压缩

3.3 Lzo 压缩

3.4 Snappy 压缩

4 压缩位置选择

5 压缩参数配置
参数 |
默认值 |
阶段 |
io.compression.codecs [在core-site.xml] |
org.apache.hadoop.io.compress.DefaultCodecorg apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.BZip2Codec |
输入压缩 |
mapreduce.map.output.compress [mapred-site.xml] |
false |
mapper输出 |
mapreduce.map.output.compress.codec [mapred-site.xml] |
org.apache.hadoop.io.compress.DefaultCodec |
mapper输出 |
mapreduce.output.fileoutputformat.compress [mapred-site.xml] |
false |
reducer输出 |
mapreduce.output.fileoutputformat.compress.codec [mapred-site.xml] |
org.apache.hadoop.io.compress DefaultCodec |
reducer输出 |
mapreduce.output.fileoutputformat.compress.type [mapred-site.xml] |
RECORD |
reducer输出 |
6 压缩实操案例
6.1 数据流的压缩和解压缩
package com.djm.mapreduce.zip;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.*;
public class CompressUtils {
public static void main(String[] args) throws IOException, ClassNotFoundException {
compress(args[0], args[1]);
decompress(args[0]);
}
private static void decompress(String path) throws IOException {
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = (CompressionCodec) factory.getCodec(new Path(path));
if (codec == null) {
System.out.println("cannot find codec for file " + path);
return;
}
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(path)));
FileOutputStream fos = new FileOutputStream(new File(path + ".decoded"));
IOUtils.copyBytes(cis, fos, 1024);
cis.close();
fos.close();
}
private static void compress(String path, String method) throws IOException, ClassNotFoundException {
FileInputStream fis = new FileInputStream(new File(path));
Class codecClass = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
FileOutputStream fos = new FileOutputStream(new File(path + codec.getDefaultExtension()));
CompressionOutputStream cos = codec.createOutputStream(fos);
IOUtils.copyBytes(fis, cos, 1024);
cos.close();
fos.close();
fis.close();
}
}
6.2 Map 输出端采用压缩
package com.djm.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
configuration.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
Job job = Job.getInstance(configuration);
job.setJarByClass(WcDriver.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
6.3 Reduce 输出端采用压缩
package com.djm.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WcDriver.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}