上ftp://ftp.ncdc.noaa.gov下载,下载下来的目录结构:
每下个文件夹存放了每年全部气象台的气象数据:
每个文件就是一个气象站一年的数据
将上面目录上传到Linux中:
编写如下Shell脚本,将每年的全部不现气象站所产生的文件合并成一个文件,即每一年只有一个文件,并上传到Hadoop系统中:
#!/bin/bash
#将Hadoop权威指南气像数据按每年合并成一个文件,并上传到Hadoop系统中
rm -rf /root/ncdc/all/*
/root/hadoop-1.2.1/bin/hadoop fs -rm -r /ncdc/all/*
#这里的/*/*中第一个*表示年份文件夹,其下面存放的就是每一年不一样气象站的气象文件
for file in /root/ncdc/raw/*/*
do
echo "追加$file.."
path=`dirname $file`
target=${path##*/}
gunzip -c $file >> /root/ncdc/all/$target.all
done
for file in /root/ncdc/all/*
do
echo "上传$file.."
/root/hadoop-1.2.1/bin/hadoop fs -put $file /ncdc/all
done
脚本运行完后,HDFS上的文件以下:
每一个Mapper都须要继承org.apache.hadoop.mapreduce.Mapper类,需重写其map方法:
protectedvoid map(KEYIN key, VALUEIN value, Context context)
每一个Reducer都须要继承org.apache.hadoop.mapreduce.Reducer类,需重写其
protectedvoid reduce(KEYIN key, Iterable<VALUEIN> values, Context context )
publicclassMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT//Map父类中的方法定义以下
/**
* Called once at the beginning of the task.在任务开始执行前会执行一次
*/
protectedvoid setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.会被run()方法循环调用,每对键值都会被调用一次
*/
@SuppressWarnings("unchecked")
protectedvoid map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);//map()方法提供了默认实现,即直接输出,不作处理
}
/**
* Called once at the end of the task.任务结束后会调用一次
*/
protectedvoid cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.map()方法实质上就是被run()循环调用的,咱们能够重写这个方法,加一些处理逻辑
*/
publicvoid run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {//每对键值对都会调用一次map()方法
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
publicclassReducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
/**
* Called once at the start of the task.在任务开始执行前会执行一次
*/
protectedvoid setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.reduce()方法会被run()循环调用
*/
@SuppressWarnings("unchecked")
protectedvoid reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);//提供了默认实现,不作处理直接输出
}
}
/**
* Called once at the end of the task.任务结束后会调用一次
*/
protectedvoid cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
publicvoid run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {//每键值对都会调用一次reduce()
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iterinstanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
Reducer的reduce方法每执行完一次,就会产生一个结果文件
reduce方法的输入类型必须匹配map方法的输出类型
map的输出文件名为 part-m-nnnnn ,而reduce的输出文件名为 part-r-nnnnn (nnnnn为分区号,即该文件存放的是哪一个分区的数据,从0开始),其中part文件名能够修改
publicclass Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {,Mapper类有4个范型参数:
KEYIN:Map Key输入类型,若是输入是文本文件,固定为LongWritable,表示每一行文本所在文件的起始位置,从0开始(即第一行起始为位置为0)
publicvoid map(Object key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("key=" + key + "; value=" + value);
[root@hadoop-master /root]# hadoop fs -get /wordcount/input/wordcount /root/wordcount
换行显示 $('\n'),Tab字符显示^I,^M 是'\r', 回车符
[root@hadoop-master /root]# cat -A /root/wordcount
hello world^M$
hello hadoop
VALUEIN:Map value输入类型,若是输入是文本文件,则通常为Text,表示文本文件中读取到的一行内容(注:Map是以行为单位进行处理的,即每跑一次Map,即处理一行文本,即输入也是以行为单位进行输入的)
KEYOUT, VALUEOUT:为Reduce输出Key与输出Value的类型
//文本文件是按照一行一行传输到Mapper中的
publicclass MaxTemperatureMapper
extends Mapper<LongWritable/*输入键类型:行的起始位置,从0开始*/, Text/*输入值类型:为文本的一行内容*/, Text/*输出键类型:年份*/, IntWritable/*输出值类型:气温*/> {
privatestaticfinalintMISSING = 9999;
@Override
publicvoid map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);//取年份
int airTemperature;
if (line.charAt(87) == '+') { //若是温度值前有加号时,去掉,由于parseInt不支持加号
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);//空气质量
//若是是有效天气,则输出
if (airTemperature != MISSING && quality.matches("[01459]")) {
//每执行一次map方法,可能会输出多个键值对,但这里只输出一次,这些输出合并后传递给reduce做用输入
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
publicclass MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
//reduce的输入即为Map的输出,这里的输入值为一个集合,Map输出后会将相同Key的值合并成一个数组后
//再传递给reduce,因此值类型为Iterable
publicvoid reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
//write出去的结果会写入到输出结果文件
context.write(key, new IntWritable(maxValue));
}
}
publicclass MaxTemperature {
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "weather");
// 根据设置的calss找到它所在的JAR任务包,而不须要明确指定JAR文件名
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
//设置map与reduce的输出类型,通常它们的输出类型都相同,若是不一样,则map能够使用setMapOutputKeyClass、setMapOutputValueClass来设置
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// addInputPath除了支持文件、目录,还能够使用文件通匹符?
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1901.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1902.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1903.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1904.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1905.all"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MapReduce逻辑数据流:
./hadoop jar /root/ncdc/weather.jar ch02.MaxTemperature
若是weather.jar包里的MANIFEST.MF 文件里指定了Main Class:
则运行时能够不用指定主类:
./hadoop jar /root/ncdc/weather.jar
hadoop2里能够这样执行:
./yarn jar /root/ncdc/weather.jar
若是在运行前指定了export HADOOP_CLASSPATH=/root/ncdc/weather.jar,若是设置了HADOOP_CLASSPATH应用程序类路径环境变量,则能够直接运行:
./hadoop MaxTemperature
./yarn MaxTemperature
以上都没有写输入输出文件夹,由于应用程序启动类里写了
map是移动算法而不是数据。在集群上,map任务(算法代码)会移动到数据节点Datanode(计算数据在哪就移动到哪台数据节点上),但reduce过程通常不能避免数据的移动(即不具有本地化数据的优点),单个reduce任务的输入一般来自于全部mapper的输出,所以map的输出会传输到运行reduce任务的节点上,数据在reduce端合并,而后执行用户自定义的reduce方法
reduce任务的完整数据流:
虚线表示节点,虚线箭头表示节点内部数据传输,实线箭头表示不一样节点间的数据传输
有时,map任务(程序)所须要的三台机(假设配置的副本数据为3)正在处理其余的任务时,则Jobtracker就会在这三份副本所在机器的同一机架上找一台空亲的机器,这样数据只会在同一机架上的不一样机器上进行传输,这样比起在不一样机架之间的传输效率要高
数据与map程序可能在同一机器上,可能在同一机架上的不一样机器上,还有多是在不一样机架上的不一样机器上,即数据与map程序分布状况有如下三种:
a(本地数据):同一机器,b(本地机架):同一机架上不一样机器,c(跨机架):不一样机架上不一样机器。显然a这种状况下,执行效率是最高的
从上图来看,应该尽可能让数据与map任务程序在一机器上,这就是为何分片最大的大小与HDFS块大小相同,由于若是分片跨越多个数据块时,而这些块又不在同一机器上时,就须要将其余的块传输到map任务所在节点上,这本地数据相比,这种效率低
为了不计算时不移动数据,TaskTracker是跑在DataName上的
reduce的数量并非由输入数据大小决定的,而是能够单独指定的
若是一个任务有不少个reduce任务,则每一个map任务就须要对输出数据进行分区partition处理,即输入数据交给哪一个reduce进行处理。每一个reduce须要创建一个分区,每一个分区就对应一个reduce,默认的分区算法是根据输出的键哈希法:Key的哈希值 MOD Reduce数量),等到分区号,分区号 小于等于 Reduce数量的整数,从0开始。好比有3个reduce任务,则会分红三个分区。
分区算法也是能够自定义的
在map与reduce之间,还有一个shuffle过程:包括分区、排序、合并
多reduce任务数据流:
一个Map输出数据可能输出到不一样的reduce,一个reduce的输入也可能来自不一样的map输出
一个做业能够没有reduce任务,即无shuffle过程
Hadoop将做业分红若干个小任务进行执行,其中包括两类任务:map任务与reduce任务。
有两类节点控制着任务的执行:一个JobTracker,与若干TaskTracker,JobTracker至关于NameNode的,是用来管理、调度TaskTracker,TaskTracker至关于DataName,须要将任务执行状态报告给JobTracker。
Hadoop将MapReduce的输入数据划分红等长的小数据块,称为输入分片——input split。
每一个分片构建一个map任务,一个map任务就是咱们继承Mapper并重写的map方法
数据分片,能够多个map任务进行并发处理,这样就会缩短整个计算时间,而且分片能够很好的解决负载均衡问题,分片越细(小),则负载均衡越高,但分片过小须要建造不少的小的任务,这样可能会影响整个执行时间,因此,一个合理的分片大小为HDFS块的大小,默认为64M
map任务将其输出结果直接写到本地硬盘上,而不是HDFS中,这是由于map任务输出的是中间结果,该输出传递给reduce任务处理后,就能够删除了,因此没有必要存储在HDFS上
能够为map输出指定一个combiner(就像map经过分区输出到reduce同样),combiner函数的输出做为reduce的输入。
combiner属于优化,没法肯定map输出要调用combiner多少次,有多是0、1、屡次,但无论调用多少次,reduce的输出结果都是同样的
假设1950年的气象数据很大,map前被分红了两片,这样1950的数据就会由两个map任务去执行,假设第一个map输出为:
(1950, 0)
(1950, 20)
(1950, 10)
第二个map任务输出为:
(1950, 25)
(1950, 15)
若是在没有使用combiner时,reducer的输入会是这样的:(1950, [0, 20, 10, 25, 15]),最后输入结果为:(1950, 25);为了减小map的数据输出,这里能够使用combiner函数对每一个map的输出结果进行查找最高气温(第一个map任务最高为20,第二个map任务最高为25),这样一来,最后传递给reducer的输入数据为:(1950, [20, 25]),最后的计算结果也是(1950, 25),这一过程即为:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
上面是找最高气温,并非全部业务需求都具备此特性,如求平均气温时,就不适用combiner,如:
mean(0, 20, 10, 25, 15) = 14
但:
mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15
combiner与reducer的计算逻辑是同样的,因此不须要重定义combiner类(若是输入类型与reducer不一样,则须要重定义一个,但输入类型必定相同),而是在Job启动内中经过job.setCombinerClass(MaxTemperatureReducer.class);便可,即combiner与reducer是同一实现类
publicclass MaxTemperatureWithCombiner {
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "weather");
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1950.all"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在Map端,用户自定义实现的Combine优化机制类Combiner在执行Map端任务的节点自己运行,至关于对map函数的输出作了一次reduce。使用Combine机制的意义就在于使Map端输出更紧凑,使得写到本地磁盘和传给Reduce端的数据更少
Combiner一般被看做是一个Map端的本地reduce函数的实现类Reducer
选用Combine机制下的Combiner虽然减小了IO,可是等于多作了一次reduce,因此应该查看做业日志来判断combine函数的输出记录数是否明显少于输入记录的数量,以肯定这种减小和花费额外的时间来运行Combiner相比是否值得
Combine优化机制执行时机
⑴ Map端spill的时候
在Map端内存缓冲区进行溢写的时候,数据会被划分红相应分区,后台线程在每一个partition内按键进行内排序。这时若是指定了Combiner,而且溢写次数最少为 3(min.num.spills.for.combine属性的取值)时,Combiner就会在排序后输出文件写到磁盘以前运行。 ⑵ Map端merge的时候
在Map端写磁盘完毕前,这些中间的输出文件会合并成一个已分区且已排序的输出文件,按partition循环处理全部文件,合并会分屡次,这个过程也会伴随着Combiner的运行。
⑶ Reduce端merge的时候
从Map端复制过来数据后,Reduce端在进行merge合并数据时也会调用Combiner来压缩数据。
Combine优化机制运行条件
⑴ 知足交换和结合律[10]
结合律:
(1)+(2+3)+(4+5+6)==(1+2)+(3+4)+(5)+(6)== ...
交换律:
1+2+3+4+5+6==2+4+6+1+2+3== ...
应用程序在知足如上的交换律和结合律的状况下,combine函数的执行才是正确的,由于求平均值问题是不知足结合律和交换律的,因此这类问题不能运用Combine优化机制来求解。
例如:mean(10,20,30,40,50)=30
但mean(mean(10,20),mean(30,40,50))=22.5
这时在求平均气温等相似问题的应用程序中使用Combine优化机制就会出错。
下面全部的内容是针对Hadoop 2.x版本进行说明的,Hadoop 1.x和这里有点不同。
在第一次部署好Hadoop集群的时候,咱们须要在NameNode(NN)节点上格式化磁盘:
[wyp@wyp hadoop-2.2.0]$ $HADOOP_HOME/bin/hdfs namenode -format
格式化完成以后,将会在$dfs.namenode.name.dir/current目录下以下的文件结构
current/
|-- VERSION
|-- edits_*
|-- fsimage_0000000000008547077
|-- fsimage_0000000000008547077.md5
|-- seen_txid
其中的dfs.namenode.name.dir是在hdfs-site.xml文件中配置的,默认值以下:
<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/name</value>
</property>
hadoop.tmp.dir是在core-site.xml中配置的,默认值以下
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop-${user.name}</value>
<description>A base for other temporary directories.</description>
</property>
dfs.namenode.name.dir属性能够配置多个目录,如/data1/dfs/name,/data2/dfs/name,/data3/dfs/name,....。各个目录存储的文件结构和内容都彻底同样,至关于备份,这样作的好处是当其中一个目录损坏了,也不会影响到Hadoop的元数据,特别是当其中一个目录是NFS(网络文件系统Network File System,NFS)之上,即便你这台机器损坏了,元数据也获得保存。
下面对$dfs.namenode.name.dir/current/目录下的文件进行解释。
一、 VERSION文件是Java属性文件,内容大体以下:
#Fri Nov 15 19:47:46 CST 2013
namespaceID=934548976
clusterID=CID-cdff7d73-93cd-4783-9399-0a22e6dce196
cTime=0
storageType=NAME_NODE
blockpoolID=BP-893790215-192.168.24.72-1383809616115
layoutVersion=-47
其中
(1)、namespaceID是文件系统的惟一标识符,在文件系统首次格式化以后生成的;
(2)、storageType说明这个文件存储的是什么进程的数据结构信息(若是是DataNode,storageType=DATA_NODE);
(3)、cTime表示NameNode存储时间的建立时间,因为个人NameNode没有更新过,因此这里的记录值为0,之后对NameNode升级以后,cTime将会记录更新时间戳;
(4)、layoutVersion表示HDFS永久性数据结构的版本信息, 只要数据结构变动,版本号也要递减,此时的HDFS也须要升级,不然磁盘仍旧是使用旧版本的数据结构,这会致使新版本的NameNode没法使用;
(5)、clusterID是系统生成或手动指定的集群ID,在-clusterid选项中能够使用它;以下说明
a、使用以下命令格式化一个Namenode:
$ $HADOOP_HOME/bin/hdfs namenode -format [-clusterId <cluster_id>]
选择一个惟一的cluster_id,而且这个cluster_id不能与环境中其余集群有冲突。若是没有提供cluster_id,则会自动生成一个惟一的ClusterID。
b、使用以下命令格式化其余Namenode:
$ $HADOOP_HOME/bin/hdfs namenode -format -clusterId <cluster_id>
c、升级集群至最新版本。在升级过程当中须要提供一个ClusterID,例如:
$ $HADOOP_PREFIX_HOME/bin/hdfs start namenode --config $HADOOP_CONF_DIR -upgrade -clusterId <cluster_ID>
若是没有提供ClusterID,则会自动生成一个ClusterID。
(6)、blockpoolID:是针对每个Namespace所对应的blockpool的ID,上面的这个BP-893790215-192.168.24.72-1383809616115就是在个人ns1(NameNode节点)的namespace下的存储块池的ID,这个ID包括了 其对应的NameNode节点的ip地址。
二、 $dfs.namenode.name.dir/current/seen_txid很是重要,是存放transactionId的文件,format以后是0,它表明的是namenode里面的edits_*文件的尾数,namenode重启的时候,会按照seen_txid的数字,循序从头跑edits_0000001~到seen_txid的数字。因此当你的hdfs发生异常重启的时候,必定要比对seen_txid内的数字是否是你edits最后的尾数,否则会发生建置namenode时metaData的资料有缺乏,致使误删Datanode上多余Block的资讯。
三、 $dfs.namenode.name.dir/current目录下在format的同时也会生成fsimage和edits文件,及其对应的md5校验文件。fsimage和edits是Hadoop元数据相关的重要文件,请参考Hadoop文件系统元数据fsimage和编辑日志edits。
通常来讲,map函数输入的健/值类型(K1和V1)不一样于输出类型(K2和V2),虽然reduce函数的输入类型必须与map函数的输出类型相同,但reduce函数的输出类型(K3和V3)能够不一样于输入类型
若是使用combine函数,它与reduce函数的形式相同(它也是Reducer的一个实现),不一样之处是它的输出类型是中间的键/值对类型(K2和V2),这些中间值能够输入到reduce函数:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
partition(K2, V2) → integer //将中间键值对分区,返回分区索引号。分区内的键会排序,相同的键的全部值会合并
reduce: (K2, list(V2)) → list(K3, V3)
上面是map、combine、reduce的输入输出格式,如map输入的是单独的一对key/value(值也是值);而combine与reduce的输入也是键值对,只不过它们的值不是单值,而是一个列表即多值;它们的输出都是同样,键值对列表;另外,reduce函数的输入类型必须与map函数的输出类型相同,因此都是K2与V2类型
job.setOutputKeyClass和job.setOutputValueClas在默认状况下是同时设置map阶段和reduce阶段的输出(包括Key与Value输出),也就是说只有map和reduce输出是同样的时候才会这样设置;当map和reduce输出类型不同的时候就须要经过job.setMapOutputKeyClass和job.setMapOutputValueClas来单独对map阶段的输出进行设置,当使用job.setMapOutputKeyClass和job.setMapOutputValueClas后,setOutputKeyClass()与setOutputValueClas()此时则只对reduce输出设置有效了。
一、新API倾向于使用抽像类,而不是接口,这样更容易扩展。在旧API中使用Mapper和Reducer接口,而在新API中使用抽像类
二、新API放在org.apache.hadoop.mapreduce包或其子包中,而旧API则是放在org.apache.hadoop.mapred中
三、新API充分使用上下文对象,使用户很好的与MapReduce交互。如,新的Context基本统一了旧API中的JobConf OutputCollector Reporter的功能,使用一个Context就能够搞定,易使用
四、新API容许mapper和reducer经过重写run()方法控制执行流程。如,便可以批处理键值对记录,也能够在处理完全部的记录以前中止。这在旧API中能够经过写MapRunnable类在mapper中实现上述功能,但在reducer中没法实现
五、新的API中做业是Job类实现,而非旧API中的JobClient类,新的API中删除了JobClient类
六、新API实现了配置的统一。旧API中的做业配置是经过JobConf完成的,它是Configuration的子类。在新API中,做业的配置由Configuration,或经过Job类中的一些辅助方法来完成配置
输出的文件命名方法稍有不一样。在旧的API中map和reduce的输出被统一命名为 part-nnmm,但在新API中map的输出文件名为 part-m-nnnnn,而reduce的输出文件名为 part-r-nnnnn(nnnnn为分区号,即该文件存放的是哪一个分区的数据,从0开始)其中part文件名能够修改
七、
八、新API中的可重写的用户方法抛出ava.lang.InterruptedException异常,这意味着能够使用代码来实现中断响应,从而能够中断那些长时间运行的做业
九、新API中,reduce()传递的值是java.lang.Iterable类型的,而非旧API中使用java.lang.Iterator类型,这就能够很容易的使用for-each循环结构来迭代这些值:for (VALUEIN value : values) { ... }
存放的本地目录是能够经过hdfs-site.xml配置的:
hadoop1:
<property>
<name>dfs.name.dir</name>
<value>${hadoop.tmp.dir}/dfs/name</value>
<description>Determines where on the local filesystem the DFS name node
should store the name table(fsimage). If this is a comma-delimited list
of directories then the name table is replicated in all of the
directories, for redundancy. </description>
</property>
在《Hadoop NameNode元数据相关文件目录解析》文章中提到NameNode的$dfs.namenode.name.dir/current/文件夹的几个文件:
current/
|-- VERSION
|-- edits_*
|-- fsimage_0000000000008547077
|-- fsimage_0000000000008547077.md5
`-- seen_txid
其中存在大量的以edits开头的文件和少许的以fsimage开头的文件。那么这两种文件究竟是什么,有什么用?下面对这两中类型的文件进行详解。在进入下面的主题以前先来搞清楚edits和fsimage文件的概念:
(1)、fsimage文件实际上是Hadoop文件系统元数据的一个永久性的检查点,其中包含Hadoop文件系统中的全部目录和文件idnode的序列化信息;
(2)、edits文件存放的是Hadoop文件系统的全部更新操做的路径,文件系统客户端执行的全部写操做首先会被记录到edits文件中。
fsimage和edits文件都是通过序列化的,在NameNode启动的时候,它会将fsimage文件中的内容加载到内存中,以后再执行edits文件中的各项操做,使得内存中的元数据和实际的同步,存在内存中的元数据支持客户端的读操做。
NameNode起来以后,HDFS中的更新操做会从新写到edits文件中,由于fsimage文件通常都很大(GB级别的很常见),若是全部的更新操做都往fsimage文件中添加,这样会致使系统运行的十分缓慢,可是若是往edits文件里面写就不会这样,每次执行写操做以后,且在向客户端发送成功代码以前,edits文件都须要同步更新。若是一个文件比较大,使得写操做须要向多台机器进行操做,只有当全部的写操做都执行完成以后,写操做才会返回成功,这样的好处是任何的操做都不会由于机器的故障而致使元数据的不一样步。
fsimage包含Hadoop文件系统中的全部目录和文件idnode的序列化信息;对于文件来讲,包含的信息有修改时间、访问时间、块大小和组成一个文件块信息等;而对于目录来讲,包含的信息主要有修改时间、访问控制权限等信息。fsimage并不包含DataNode的信息,而是包含DataNode上块的映射信息,并存放到内存中,当一个新的DataNode加入到集群中,DataNode都会向NameNode提供块的信息,而NameNode会按期的“索取”块的信息,以使得NameNode拥有最新的块映射。由于fsimage包含Hadoop文件系统中的全部目录和文件idnode的序列化信息,因此若是fsimage丢失或者损坏了,那么即便DataNode上有块的数据,可是咱们没有文件到块的映射关系,咱们也没法用DataNode上的数据!因此按期及时的备份fsimage和edits文件很是重要!
在前面咱们也提到,文件系统客户端执行的因此写操做首先会被记录到edits文件中,那么长此以往,edits会很是的大,而NameNode在重启的时候须要执行edits文件中的各项操做,那么这样会致使NameNode启动的时候很是长!在下篇文章中我会谈到在Hadoop 1.x版本和Hadoop 2.x版本是怎么处理edits文件和fsimage文件的。
在NameNode运行期间,HDFS的全部更新操做都是直接写到edits中,长此以往edits文件将会变得很大;虽然这对NameNode运行时候是没有什么影响的,可是咱们知道当NameNode重启的时候,NameNode先将fsimage里面的全部内容映像到内存中,而后再一条一条地执行edits中的记录,当edits文件很是大的时候,会致使NameNode启动操做很是地慢,而在这段时间内HDFS系统处于安全模式,这显然不是用户要求的。能不能在NameNode运行的时候使得edits文件变小一些呢?实际上是能够的,本文主要是针对Hadoop 1.x版本,说明其是怎么将edits和fsimage文件合并的,Hadoop 2.x版本edits和fsimage文件合并是不一样的。
用过Hadoop的用户应该都知道在Hadoop里面有个SecondaryNamenode进程,从名字看来你们很容易将它看成NameNode的热备进程。其实真实的状况不是这样的。SecondaryNamenode是HDFS架构中的一个组成部分,它是用来保存namenode中对HDFS metadata的信息的备份,并减小namenode重启的时间而设定的!通常都是将SecondaryNamenode单独运行在一台机器上,那么SecondaryNamenode是如何减小namenode重启的时间的呢?来看看SecondaryNamenode的工做状况:
(1)、SecondaryNamenode会按期的和NameNode通讯,请求其中止使用edits文件,暂时将新的写操做写到一个新的文件edit.new上来,这个操做是瞬间完成,上层写日志的函数彻底感受不到差异;
(2)、SecondaryNamenode经过HTTP GET方式从NameNode上获取到fsimage和edits文件,并下载到本地的相应目录下;
(3)、SecondaryNamenode将下载下来的fsimage载入到内存,而后一条一条地执行edits文件中的各项更新操做,使得内存中的fsimage保存最新;这个过程就是edits和fsimage文件合并;
(4)、SecondaryNamenode执行完(3)操做以后,会经过post方式将新的fsimage文件发送到NameNode节点上
(5)、NameNode将从SecondaryNamenode接收到的新的fsimage替换旧的fsimage文件,同时将edit.new替换edits文件,经过这个过程edits就变小了!整个过程的执行能够经过下面的图说明:
在(1)步骤中,咱们谈到SecondaryNamenode会按期的和NameNode通讯,这个是须要配置的,能够经过core-site.xml进行配置,下面是默认的配置:
<property>
<name>fs.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
其实若是当fs.checkpoint.period配置的时间尚未到期,咱们也能够经过判断当前的edits大小来触发一次合并的操做,能够经过下面配置:
<property>
<name>fs.checkpoint.size</name>
<value>67108864</value>
<description>The size of the current edit log (in bytes) that triggers
a periodic checkpoint even if the fs.checkpoint.period hasn't expired.
</description>
</property>
当edits文件大小超过以上配置,即便fs.checkpoint.period还没到,也会进行一次合并。顺便说说SecondaryNamenode下载下来的fsimage和edits暂时存放的路径能够经过下面的属性进行配置:
<property>
<name>fs.checkpoint.dir</name>
<value>${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
replicated in all of the directories for redundancy.
</description>
</property>
<property>
<name>fs.checkpoint.edits.dir</name>
<value>${fs.checkpoint.dir}</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary edits to merge.
If this is a comma-delimited list of directoires then teh edits is
replicated in all of the directoires for redundancy.
Default value is same as fs.checkpoint.dir
</description>
</property>
从上面的描述咱们能够看出,SecondaryNamenode根本就不是Namenode的一个热备,其只是将fsimage和edits合并。其拥有的fsimage不是最新的,由于在他从NameNode下载fsimage和edits文件时候,新的更新操做已经写到edit.new文件中去了。而这些更新在SecondaryNamenode是没有同步到的!固然,若是NameNode中的fsimage真的出问题了,仍是能够用SecondaryNamenode中的fsimage替换一下NameNode上的fsimage,虽然已经不是最新的fsimage,可是咱们能够将损失减少到最少!
在Hadoop 2.x经过配置JournalNode来实现Hadoop的高可用性,能够参见《Hadoop2.2.0中HDFS的高可用性实现原理》,这样主被NameNode上的fsimage和edits都是最新的,任什么时候候只要有一台NameNode挂了,也能够使得集群中的fsimage是最新状态!关于Hadoop 2.x是如何合并fsimage和edits的,能够参考《Hadoop 2.x中fsimage和edits合并实现》
在《Hadoop 1.x中fsimage和edits合并实现》文章中,咱们谈到了Hadoop 1.x上的fsimage和edits合并实现,里面也提到了Hadoop 2.x版本的fsimage和edits合并实现和Hadoop 1.x彻底不同,今天就来谈谈Hadoop 2.x中fsimage和edits合并的实现。
咱们知道,在Hadoop 2.x中解决了NameNode的单点故障问题;同时SecondaryName已经不用了,而以前的Hadoop 1.x中是经过SecondaryName来合并fsimage和edits以此来减少edits文件的大小,从而减小NameNode重启的时间。而在Hadoop 2.x中已经不用SecondaryName,那它是怎么来实现fsimage和edits合并的呢?首先咱们得知道,在Hadoop 2.x中提供了HA机制(解决NameNode单点故障),能够经过配置奇数个JournalNode来实现HA,如何配置今天就不谈了!HA机制经过在同一个集群中运行两个NN(active NN & standby NN)来解决NameNode的单点故障,在任什么时候间,只有一台机器处于Active状态;另外一台机器是处于Standby状态。Active NN负责集群中全部客户端的操做;而Standby NN主要用于备用,它主要维持足够的状态,若是必要,能够提供快速的故障恢复。
为了让Standby NN的状态和Active NN保持同步,即元数据保持一致,它们都将会和JournalNodes守护进程通讯。当Active NN执行任何有关命名空间的修改(如增删文件),它须要持久化到一半(因为JournalNode最少为三台奇数台,因此最少要存储到其中两台上)以上的JournalNodes上(经过edits log持久化存储),而Standby NN负责观察edits log的变化,它可以读取从JNs中读取edits信息,并更新其内部的命名空间。一旦Active NN出现故障,Standby NN将会保证从JNs中读出了所有的Edits,而后切换成Active状态。Standby NN读取所有的edits可确保发生故障转移以前,是和Active NN拥有彻底同步的命名空间状态(更多的关于Hadoop 2.x的HA相关知识,能够参考本博客的《Hadoop2.2.0中HDFS的高可用性实现原理》)。
那么这种机制是如何实现fsimage和edits的合并?在standby NameNode节点上会一直运行一个叫作CheckpointerThread的线程,这个线程调用StandbyCheckpointer类的doWork()函数,而doWork函数会每隔Math.min(checkpointCheckPeriod, checkpointPeriod)秒来作一次合并操做,相关代码以下:
try {
Thread.sleep(1000 * checkpointConf.getCheckPeriod());
} catch (InterruptedException ie) {}
publiclong getCheckPeriod() {
return Math.min(checkpointCheckPeriod, checkpointPeriod);
}
checkpointCheckPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT);
checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
上面的checkpointCheckPeriod和checkpointPeriod变量是经过获取hdfs-site.xml如下两个属性的值获得:
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description>The SecondaryNameNode and CheckpointNode will poll the NameNode
every 'dfs.namenode.checkpoint.check.period' seconds to query the number
of uncheckpointed transactions.
</description>
</property>
当达到下面两个条件的状况下,将会执行一次checkpoint:
boolean needCheckpoint = false;
if (uncheckpointed >= checkpointConf.getTxnCount()) {
LOG.info("Triggering checkpoint because there have been " +
uncheckpointed + " txns since the last checkpoint, which " +
"exceeds the configured threshold " +
checkpointConf.getTxnCount());
needCheckpoint = true;
} else if (secsSinceLast >= checkpointConf.getPeriod()) {
LOG.info("Triggering checkpoint because it has been " +
secsSinceLast + " seconds since the last checkpoint, which " +
"exceeds the configured interval " + checkpointConf.getPeriod());
needCheckpoint = true;
}
当上述needCheckpoint被设置成true的时候,StandbyCheckpointer类的doWork()函数将会调用doCheckpoint()函数正式处理checkpoint。当fsimage和edits的合并完成以后,它将会把合并后的fsimage上传到Active NameNode节点上,Active NameNode节点下载完合并后的fsimage,再将旧的fsimage删掉(Active NameNode上的)同时清除旧的edits文件。步骤能够归类以下:
(1)、配置好HA后,客户端全部的更新操做将会写到JournalNodes节点的共享目录中,能够经过下面配置
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://XXXX/mycluster</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/export1/hadoop2x/dfs/journal</value>
</property>
(2)、Active Namenode和Standby NameNode从JournalNodes的edits共享目录中同步edits到本身edits目录中;
(3)、Standby NameNode中的StandbyCheckpointer类会按期的检查合并的条件是否成立,若是成立会合并fsimage和edits文件;
(4)、Standby NameNode中的StandbyCheckpointer类合并完以后,将合并以后的fsimage上传到Active NameNode相应目录中;
(5)、Active NameNode接到最新的fsimage文件以后,将旧的fsimage和edits文件清理掉;
(6)、经过上面的几步,fsimage和edits文件就完成了合并,因为HA机制,会使得Standby NameNode和Active NameNode都拥有最新的fsimage和edits文件(以前Hadoop 1.x的SecondaryNameNode中的fsimage和edits不是最新的)
在Hadoop2.0.0以前,NameNode(NN)在HDFS集群中存在单点故障(single point of failure),每个集群中存在一个NameNode,若是NN所在的机器出现了故障,那么将致使整个集群没法利用,直到NN重启或者在另外一台主机上启动NN守护线程。
主要在两方面影响了HDFS的可用性:
(1)、在不可预测的状况下,若是NN所在的机器崩溃了,整个集群将没法利用,直到NN被从新启动;
(2)、在可预知的状况下,好比NN所在的机器硬件或者软件须要升级,将致使集群宕机。
HDFS的高可用性将经过在同一个集群中运行两个NN(active NN & standby NN)来解决上面两个问题,这种方案容许在机器破溃或者机器维护快速地启用一个新的NN来恢复故障。
在典型的HA集群中,一般有两台不一样的机器充当NN。在任什么时候间,只有一台机器处于Active状态;另外一台机器是处于Standby状态。Active NN负责集群中全部客户端的操做;而Standby NN主要用于备用,它主要维持足够的状态,若是必要,能够提供快速的故障恢复。
为了让Standby NN的状态和Active NN保持同步,即元数据保持一致,它们都将会和JournalNodes守护进程通讯。当Active NN执行任何有关命名空间的修改,它须要持久化到一半(奇数个,通常为3,因此须要持久到2台)以上的JournalNodes上(经过edits log持久化存储),而Standby NN负责观察edits log的变化,它可以读取从JNs中读取edits信息,并更新其内部的命名空间。一旦Active NN出现故障,Standby NN将会保证从JNs中读出了所有的Edits,而后切换成Active状态。Standby NN读取所有的edits可确保发生故障转移以前,是和Active NN拥有彻底同步的命名空间状态。
为了提供快速的故障恢复,Standby NN也须要保存集群中各个文件块的存储位置。为了实现这个,集群中全部的DataNode将配置好Active NN和Standby NN的位置,并向它们发送块文件所在的位置及心跳,以下图所示:
在任什么时候候,集群中只有一个NN处于Active 状态是极其重要的。不然,在两个Active NN的状态下NameSpace状态将会出现分歧,这将会致使数据的丢失及其它不正确的结果。为了保证这种状况不会发生,在任什么时候间,JNs只容许一个NN充当writer。在故障恢复期间,将要变成Active 状态的NN将取得writer的角色,并阻止另一个NN继续处于Active状态。
为了部署HA集群,你须要准备如下事项:
(1)、NameNode machines:运行Active NN和Standby NN的机器须要相同的硬件配置;
(2)、JournalNode machines:也就是运行JN的机器。JN守护进程相对来讲比较轻量,因此这些守护进程能够与其余守护线程(好比NN,YARN ResourceManager)运行在同一台机器上。在一个集群中,最少要运行3个JN守护进程,这将使得系统有必定的容错能力。固然,你也能够运行3个以上的JN,可是为了增长系统的容错能力,你应该运行奇数个JN(3、5、7等),当运行N个JN,系统将最多容忍(N-1)/2个JN崩溃。
在HA集群中,Standby NN也执行namespace状态的checkpoints,因此没必要要运行Secondary NN、CheckpointNode和BackupNode;事实上,运行这些守护进程是错误的。
hadoop fs –help
% hadoop fs -copyFromLocal /input/docs/quangle.txt quangle.txt 将本地文件复制到HDFS中,目的地为相对地址,相对的是HDFS上的/user/root目录,root为用户,不一样的用户执行,则不一样
hadoop fs -copyToLocal quangle.txt quangle.copy.txt 从HDFS中下载文件到本地
% hadoop fs -mkdir books
% hadoop fs -ls .
Found 2 items
drwxr-xr-x - tom supergroup 0 2009-04-02 22:41 /user/tom/books
-rw-r--r-- 1 tom supergroup 118 2009-04-02 22:29 /user/tom/quangle.txt
第一列为文件模式,第二列文件的副本数,若是目录则没有;第3、四列表示文件的所属用户和用户组;第五列为文件大小,目录没有。
一个文件的执行权限X没有什么意义,由于你不可能在HDFS系统时执行一个文件,但它对目录是有用的,由于在访问一个目录的子项时是不须要此种权限的
处理超大的文件:一个文件能够是几百M,甚至是TB级文件
流式数据访问:一次写入,屡次读取
廉价的机器
不适用于低延迟数据访问,若是须要能够使用Hbase
不适用于大量的小文件,由于每一个文件存储在DFS中时,都会在NameNode上保存文件的元数据信息,这会会急速加太NameNode节点的内存占用,目前每一个文件的元数据信息大概占用150字节,若是有一百万个文件,则要占用300MB的内存
不支持并发写,而且也不支持文件任意位置的写入,只能一个用户写在文件最末
默认块大小为64M,若是某个文件不足64,则不会占64,而是文件自己文件大小,这与操做系统文件最小存储单元块不一样
分块存储的好处:
一个文件的大小能够大于网络中的任意一个硬盘的容量,若是不分块,则不能存储在硬盘中,当分块后,就能够将这个大文件分块存储到集群中的不一样硬盘中
分块后适合多副本数据备份,保证了数据的安全
能够经过如下命令查看块信息:
hadoop fsck / -files -blocks
一个namenode(管理者),多个datanode(工做者,存放数据)
namenode管理文件系统的命名空间,它维护着文件系统树及整个树里的文件和目录,这些系统以两个文件持久化硬盘上永久保存着:命名空间镜像文件fsimage和编辑日志文件edits
namenode记录了每一个文件中各个块所在的datanode信息,但它并不将这些位置信息持久化硬盘上,由于这些信息会在系统启动时由datanode上报给namenode节点后重建
若是在没有任何备份,namenode若是坏了,则整个文件系统将没法使用,因此就有了secondnamenode辅助节点:
secondnamenode除了备份namenode上的元数据持久信息外,最主要的做用是按期的将namenode上的fsimage、edits两个文件拷贝过来进行合并后,将传回给namenode。secondnamenode的备份并不是namenode上全部信息的彻底备份,它所保存的信息就是滞后于namenode的,因此namenode坏掉后,尽管能够手动从secondnamenode恢复,但也不免丢失部分数据(最近一次合并到当前时间内所作的数据操做)
因为1.X只能有一个namenode,随着文件愈来愈多,namenode的内存就会受到限制,到某个时候确定是存放不了更多的文件了(虽然datanode能够加入新的datanode能够解决存储容量问题),不能够无限在一台机器上加内存。在2.X版本中,引入了联邦HDFS容许系统经过添加namenode进行扩展,这样每一个namenode管理着文件系统命名空间的一部分元数据信息
联邦HDFS只解决了内存数据扩展的问题,但并无解决namenode单节点问题,即当某个namenode坏掉所,因为namenode没有备用,因此一旦毁坏后仍是会致使文件系统没法使用。
HDFS高可用性包括:水平扩展namenode以实现内存扩展、高安全(坏掉还有其余备用的节点)及热切换(坏掉后无需手动切换到备用节点)到备用机
在2.x中增长了高可用性支持,经过活动、备用两台namenode来实现,当活动namenode失效后,备用namenode就会接管安的任务并开始服务于来自客户端的请求,不会有任何明显中断服务,这须要架构以下:
n Namenode之间(活动的与备用的两个节点)之间须要经过共享存储编辑日志文件edits,即这edits文件放在一个两台机器都能访问获得的地方存储,当活动的namenode毁坏后,备用namenode自动切换为活动时,备用机将edits文件恢复备用机内存
n Datanode须要现时向两个namenode发送数据块处理报告
n 客户端不能直接访问某个namenode了,由于一旦某个出问题后,就须要经过另外一备用节点来访问,这须要用户对namenode访问是透明的,不能直接访问namenode,而是经过管理这些namenode集群入口地址透明访问
在活动namenode失效后,备用namenode可以快速(几十秒的时间)实现任务接管,由于最新的状态存储在内存中:包括最新的编辑日志和最新的数据块映射信息
备用切换是经过failover_controller故障转移控制器来完成的,故障转移控制器是基于ZooKeeper实现的;每一个namenode节点上都运行着一个轻量级的故障转移控制器,它的工做就是监视宿主namenode是否失效(经过一个简单的心跳机制实现)并在namenode失效时进行故障切换;用户也能够在namenode没有失效的状况下手动发起切换,例如在进行平常维护时;另外,有时没法确切知道失效的namenode是否已经中止运行,例如在网络异常状况下,一样也可能激发故障转换,但先前的活动着的namenode依然运行着而且依旧是活动的namenode,这会出现其余问题,但高可用实现作了“规避”措施,如杀死行前的namenode进程,收回访问共享存储目录的权限等
伪分布式: fs.default.name=hdfs://localhost:8020;dfs.replication=1,若是设置为3,将会持续收到块副本不中的警告,设置这个属性后就不会再有问题了
Hadoop自己是由Java编写的
org.apache.hadoop.fs.FileSystem是文件系统的抽象类,常见有如下实现类:
文件系统 |
URI scheme |
Java实现类 |
描述 |
Local |
file |
org.apache.hadoop.fs.LocalFileSystem |
使用了客户端校验和的本地文件系统(未使用校验和的本地文件系统请使用RawLocalFileSystem) |
HDFS |
hdfs |
org.apache.hadoop.hdfs.DistributedFileSystem |
Hadoop分布式文件系统 |
HFTP |
hftp |
org.apache.hadoop.hdfs.HftpFileSystem |
经过Http对Hdfs进行只读访问的文件系统,用于实现不一样版本HDFS集群间的数据复制 |
HSFTP |
hsftp |
org.apache.hadoop.hdfs.HsftpFileSystem |
同上,只是https协议 |
|
|
org.apache.hadoop. |
|
获取FileSystem实例有如下几个静态方法:
publicstatic FileSystem get(Configuration conf) throws IOException//获取core-sit.xml中fs.default.name配置属性所配置的URI来返回相应的文件系统,因为core-sit.xml已配置,因此通常调用这个方法便可
publicstatic FileSystem get(URI uri, Configuration conf) throws IOException//根据uri参数里提供scheme信息返回相应的文件系统,即hdfs://hadoop-master:9000,则返回的是hdfs文件系统
publicstatic FileSystem get(URI uri, Configuration conf, String user) throws IOException
有了FileSystem后,就能够调用open()方法获取文件输入流:
public FSDataInputStream open(Path f) throws IOException //默认缓冲4K
publicabstract FSDataInputStream open(Path f, int bufferSize) throws IOException
示例:将hdfs文件系统中的文件内容在标准输出显示
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
publicclass FileSystemCat {
publicstaticvoid main(String[] args) throws Exception {
// 若是为默认端口8020,则能够省略端口
String uri = "hdfs://hadoop-master:9000/wordcount/input/wordcount.txt";
Configuration conf = new Configuration();
// FileSystem fs = FileSystem.get(URI.create(uri), conf);
// 由于get方法的URI参数只须要URI scheme,因此只需指定服务地址便可,无需同具体到某个文件
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
//或者这样使用
// conf.set("fs.default.name", "hdfs://hadoop-master:9000");
// FileSystem fs = FileSystem.get(conf);
InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false); //无需使用循环对流进行拷贝,借助于工具类IOUtils便可
} finally {
IOUtils.closeStream(in);//不直接调用输入输出流的close方法,而是使用IOUtils工具类
}
}
}
实际上,FileSystem的open方法返回的是FSDataInputStream类型的对象,而非Java标准输入流,这个类继承了标准输入流DataInputStream:
publicclassFSDataInputStreamextends DataInputStream
implements Seekable, PositionedReadable, Closeable, HasFileDescriptor {
而且FSDataInputStream类实现了Seekable接口,支持随机访问,所以能够从流的任意位置读取数据。
Seekable接口支持在文件中找到指定的位置,并提供了一个查询当前位置至关于文件起始位置偏移量的方法getPos():
publicinterfaceSeekable {
// 定位到文件指定的位置,与标准输入流的InputStream.skip不一样的是,seek能够定位到文件中的任意绝对位置,而
// skip只能相对于当前位置才能定位到新的位置。这里会传递的是相对于文件开头的绝对位置,不能超过文件长度。注:seek开销很高,谨慎调用
void seek(long pos) throws IOException;
// 返回当前相对于文件开头的偏移量
long getPos() throws IOException;
boolean seekToNewSource(long targetPos) throws IOException;
}
示例:改写上面实例,让它输出两次
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
publicclass FileSystemCat {
publicstaticvoid main(String[] args) throws Exception {
String uri = "hdfs://hadoop-master:9000/wordcount/input/wordcount.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
System.out.println("\n");
in.seek(0);//跳到文件的开头
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
FSDataInputStream类还实现了PositionedReadable接口,这能够从一个指定的偏移量处读取文件的一部分:
publicinterfacePositionedReadable {
// 从文件指定的position处读取最多length字节的数据并存入缓冲区buffer的指定偏移量offset处,返回的值是
// 实际读取到的字节数:调用者须要检查这个值,有可能小于参数length
publicint read(long position, byte[] buffer, int offset, int length) throws IOException;
// 与上面方法至关,只是若是读取到文件最末时,被读取的字节数可能不满length,此时则会抛异常
publicvoid readFully(long position, byte[] buffer, int offset, int length) throws IOException;
// 与上面方法至关,只是每次读取的字节数为buffer.length
publicvoid readFully(long position, byte[] buffer) throws IOException;
}
注:上面这些方法都不会修改当前所在文件偏移量
FileSystem类有一系列参数不一样的create建立文件方法,最简单的方法:
publicFSDataOutputStreamcreate(Path f) throws IOException {
还有一系列不一样参数的重载方法,他们最终都是调用下面这个抽象方法实现的:
publicabstract FSDataOutputStream create(Path f,
FsPermission permission, //权限
boolean overwrite, //若是文件存在,传false时会抛异常,不然覆盖已存在的文件
int bufferSize, //缓冲区的大小
short replication, //副本数量
long blockSize, //块大小
Progressable progress) throws IOException; //处理进度的回调接口
通常调用简单方法时,若是文件存在,则是会覆盖,若是不想覆盖,能够指定overwrite参数为false,或者使用FileSystem类的exists(Path f)方法进行判断:
publicbooleanexists(Path f) throws IOException {//能够用来测试文件或文件夹是否存在
进度回调接口,当数据每次写完缓冲数据后,就会回调该接口显示进度信息:
package org.apache.hadoop.util;
publicinterface Progressable {
publicvoid progress();//返回处理进度给Hadoop应用框架
}
另外一种新建文件的方法是使用append方法在一个已有文件末尾追加数据(该方法也有一些重载版本):
public FSDataOutputStream append(Path f) throws IOException {
示例:带进度的文件上传
publicclass FileCopyWithProgress {
publicstaticvoid main(String[] args) throws Exception {
InputStream in = new BufferedInputStream(new FileInputStream("d://1901.all"));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
OutputStream out = fs.create(new Path("hdfs://hadoop-master:9000/ncdc/all/1901.all"), new Progressable() {
publicvoid progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}
}
像FSDataInputStream 同样,FSDataOutputStream类也有一个getPos方法,用来查询当前位置,但与FSDataInputStream不一样的是,不容许在文件中定位,这是由于HDFS只容许对一个已打开的文件顺序写入,或在现有文件的末尾追加数据,安不支持在除文件末尾以外的其余位置进行写入,因此就没有seek定位方法了
publicvoid copyFromLocalFile(Path src, Path dst)
publicvoid copyFromLocalFile(boolean delSrc, Path src, Path dst)
publicvoid copyFromLocalFile(boolean delSrc, boolean overwrite,Path[] srcs, Path dst)
publicvoid copyFromLocalFile(boolean delSrc, boolean overwrite,Path src, Path dst)
delSrc - whether to delete the src是否删除源文件
overwrite - whether to overwrite an existing file是否覆盖已存在的文件
srcs - array of paths which are source 能够上传多个文件数组方式
dst – path 目标路径,若是存在,且都是目录的话,会将文件存入它下面,而且上传文件名不变;若是不存在,则会建立并认为它是文件,即上传的文件名最终会成为dst指定的文件名
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoop-master:9000");
FileSystem fs = FileSystem.get(conf);
fs.copyFromLocalFile(new Path("c:/t_stud.txt"), new Path("hdfs://hadoop-master:9000/db1/output1"));
fileSystem.rename(src, dst);
形为重命名,实际上该方法还能够移动文件,与上传目的地dst参数同样:若是dst为存在的目录,则会放在它下面;若是不存在,则会建立并认为它是文件,即上传的文件名最终会成为dst指定的文件名
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoop-master:9000");
FileSystem fs = FileSystem.get(conf);
fs.rename(new Path("hdfs://hadoop-master:9000/db1/output2"), new Path("hdfs://hadoop-master:9000/db3/output2"));
FileSystem的delete()方法能够用来删除文件或目录
publicabstractboolean delete(Path f, boolean recursive) throws IOException;
若是f是一个文件或空目录,那么recursive的值就会被忽略。只有在recursive值为true时,非空目录及其内容才会被删除(若是删除非空目录时recursive为false,则会抛IOException异常?)
FileSystem提供了建立目录的方法:
publicbooleanmkdirs(Path f) throws IOException {
若是父目录不存在,则也会自动建立,并返回是否成功
一般状况下,咱们不须要调用这个方法建立目录,由于调用create方法建立文件时,若是父目录不存在,则会自动建立
FileStatus类封装了文件系统中的文件和目录的元数据,包括文件长度、大小、副本数、修改时间、全部者、权限等
FileSystem的getFileStatus方法能够获取FileStatus对象:
publicabstract FileStatus getFileStatus(Path f) throws IOException;
示例:获取文件(夹)状态信息
publicclass ShowFileStatus {
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
//out.close();
IOUtils.closeStream(out);
// 文件的状态信息
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
System.out.println(stat.getPath().toUri().getPath());
System.out.println(stat.isDir());//是否文件夹
System.out.println(stat.getLen());//文件大小
System.out.println(stat.getModificationTime());//文件修改时间
System.out.println(stat.getReplication());//副本数
System.out.println(stat.getBlockSize());//文件系统所使用的块大小
System.out.println(stat.getOwner());//文件全部者
System.out.println(stat.getGroup());//文件全部者所在组
System.out.println(stat.getPermission().toString());//文件权限
System.out.println();
// 目录的状态信息
Path dir = new Path("/dir");
stat = fs.getFileStatus(dir);
System.out.println(stat.getPath().toUri().getPath());
System.out.println(stat.isDir());
System.out.println(stat.getLen());//文件夹为0
System.out.println(stat.getModificationTime());
System.out.println(stat.getReplication());//文件夹为0
System.out.println(stat.getBlockSize());//文件夹为0
System.out.println(stat.getOwner());
System.out.println(stat.getGroup());
System.out.println(stat.getPermission().toString());
}
}
除了上面FileSystem的getFileStatus一次只能获取一个文件或目录的状态信息外,FileSystem还能够一次获取多个文件的FileStatus或目录下的全部文件的FileStatus,这能够调用FileSystem的listStatus方法,该方法有如下重载版本:
publicabstract FileStatus[] listStatus(Path f) throws IOException;
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
public FileStatus[] listStatus(Path[] files) throws IOException {
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
当传入的参数是一个文件时,它会简单转成以数组方式返回长度为1的FileStatus对象。当传入的是一个目录时,则返回0或多个FileStatus对象,包括此目录中包括的全部文件和目录
listStatus方法能够列出目录下全部文件的文件状态,因此就能够借助于这个特色列出某个目录下的全部文件(包括子目录):
publicclass ListStatus {
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
Path[] paths = new Path[2];
// 目录
paths[0] = new Path("hdfs://hadoop-master:9000/ncdc");
// 文件
paths[1] = new Path("hdfs://hadoop-master:9000/wordcount/input/wordcount.txt");
// 只传一个目录进去。注:listStatus方法只会将直接子目录或子文件列出来,
// 而不会递归将全部层级子目录文件列出
FileStatus[] status = fs.listStatus(paths[0]);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
// 输出输入目录下的全部文件及目录的路径
System.out.println(p);
}
System.out.println();
// 只传一个文件进去
status = fs.listStatus(paths[1]);
listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
// 输出输入文件的路径
System.out.println(p);
}
System.out.println();
//传入的为一个数组:包括文件与目录
status = fs.listStatus(paths);
// 将FileStatus数组转换为Path数组
listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
// 输出全部输入的文件的路径,以及输入目录下全部文件或子目录的路径
System.out.println(p);
}
}
}
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoop-master:9000");
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem) fs;
DatanodeInfo[] dns = hdfs.getDataNodeStats();
for (int i = 0, h = dns.length; i < h; i++) {
System.out.println("datanode_" + i + "_name: " + dns[i].getHostName());
}
经过DatanodeInfo能够得到datanode更多的消息
FileSystem提供了两个通配的方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException {
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
pathPattern参数是通配,filter是进一步骤过滤
注:根据通配表达式,匹配到的多是目录,也多是文件,这要看通配表达式是只到目录,仍是到文件。具体示例请参考下面的PathFilter
有时通配模式并不总能多精确匹配到咱们想要的文件,此时此要使用PathFilter参数进行过滤。FileSystem的listStatus() 和 globStatus()方法就提供了此过滤参数
publicinterfacePathFilter {
boolean accept(Path path);
}
示例:排除匹配指定正则表达式的路径
publicclass RegexExcludePathFilter implements PathFilter {
privatefinal String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
publicboolean accept(Path path) {
return !path.toString().matches(regex);
}
}
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
FileStatus[] status = fs.globStatus(new Path("/2007/*/*"));//匹配到文件夹
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
FileStatus[] status = fs.globStatus(new Path("/2007/*/*/*30.txt"));//匹配到文件
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
FileStatus[] status = fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter(
"^.*/2007/12/31$"));//过滤掉31号的目录
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
一、 客户端调用DistributedFileSystem(在编程时咱们通常直接调用的是其抽像父类FileSystem的open方法,对于HDFS文件系统来讲,实质上调用的仍是DistributedFileSystem的open方法)的open()方法来打开要读取的文件,并返回FSDataInputStream类对象(该类实质上是对DFSInputStream的封装,由它来处理与datanode和namenode的通讯,管理I/O)
二、 DistributedFileSystem经过使用RPC来调用namenode,查看文件在哪些datanode上,并返回这些datanode的地址(注:因为同一文件块副本的存放在不少不一样的datanode节点上,返回的都是网络拓扑中距离客户端最近的datanode节点地址,距离算法请参考后面)
三、 客户端调用FSDataInputStream对象的read()方法
四、 FSDataInputStream去相应datanode上读取第一个数据块(这一过程并不须要namenode的参与,该过程是客户端直接访问datanote)
五、 FSDataInputStream去相应datanode上读取第二个数据块…如此读完全部数据块(注:数据块读取应该是同时并发读取,即在读取第一块时,也同时在读取第二块,只是在拼接文件时须要按块顺序组织成文件)
六、 客户端调用FSDataInputStream的close()方法关闭文件输入流
假设有数据中心d1机架r1中的n1节点表示为 /d1/r1/n1。
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)同一节点
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)同一机架上不一样节点
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)同一数据中心不一样机架上不一样节点
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)不一样数据中心
哪些节点是哪些机架上是经过配置实现的,具体请参考后面的章节
一、 客户端调用DistributedFileSystem的create()建立文件,并向客户端返回FSDataOutputStream类对象(该类实质上是对DFSOutputStream的封装,由它来处理与datanode和namenode的通讯,管理I/O)
二、 DistributedFileSystem向namenode发出建立文件的RPC调用请求,namenode会告诉客户端该文件会写到哪些datanode上
三、 客户端调用FSDataOutputStream的write方法写入数据
四、 FSDataOutputStream向datanode写数据
五、 当数据块写完(要达到dfs.replication.min副本数)后,会返回确认写完的信息给FSDataOutputStream。在返回写完信息的后,后台系统还要拷贝数据副本要求达到dfs.replication设置的副本数,这一过程是在后台自动异步复制完成的,并不须要等全部副本都拷贝完成后才返回确认信息到FSDataOutputStream
六、 客户端调用FSDataOutputStream的close方法关闭流
七、 DistributedFileSystem发送文件写入完成的信息给namenode
数据存储在哪些datanode上,这是有默认布局策略的:
在客户端运行的datanode节点上放第一个副本(若是客户端是在集群外的机器上运行的话,会随机选择一个空闲的机器),第二个副本则放在与第一个副本不在同一机架的节点上,第三个副本则放在与第二个节点同一机架上的不一样节点上,超过3个副本的,后继会随机选择一台空闲机器放后继其余副本。这样作的目的兼顾了安全与效率问题
当新建一个文件后,在文件系统命名空间当即可见,但数据不必定能当即可见,即便数据流已刷新:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
当写入数据超过一个块后,第一个数据块对新的reader就是可见的,以后的块也是同样,当后面的块写入后,前面的块才能可见。总之,当前正在写入的块对其余reader是不可见的
FSDataOutputStream提供了一个方法sync()来使全部缓存与数据节点强行同步,当sync()方法调用成功后,对全部新的reader而言均可见:
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
注:若是调用了FSDataOutputStream的close()方法,该方法也会调用sync()
文件压缩有两大好处:减小存储文件所须要的磁盘空间,并加速数据在网络和磁盘上的传输
全部的压缩算法要权衡时间与空间,压缩时间越短,压缩率超低,压缩时间越长,压缩率超高。上表时每一个工具都有9个不一样的压缩级别:-1为优化压缩速度,-9为优化压缩空间。以下面命令经过最快的压缩方法建立一个名为file.gz的压缩文件:
gzip -1 file
不一样压缩工具备不一样的压缩特性。gzip是一个通用的压缩工具,在空间与时间比较均衡。bzip2压缩能力强于gzip,但速度会慢一些。另外,LZO、LZ4和Snappy都优化了压缩速度,比gzip快一个数量级,但压缩率会差一些(LZ4和Snappy的解压速度比LZO高不少)
上表中的“是否可切分”表示数据流是否能够搜索定位(seek)。
上面这些算法类都实现了CompressionCodec接口。
CompressionCodec接口包含两个方法,能够用于压缩和解压。若是要对数据流进行压缩,能够调用createOutputStream(OutputStream out)方法获得CompressionOutputStream输出流;若是要对数据流进行解压,能够调用createInputStream(InputStream in)方法获得CompressionInputStream输入流
CompressionOutputStream与CompressionInputStream相似java.util.zip.DeflaterOutputStream和java.util.zip.DeflaterInputStream,只不过前二者可以重置其底层的压缩与解压算法
示例:压缩从标准输入读取的数据,而后将其写到标准输出
publicstaticvoid main(String[] args) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream("测试".getBytes("GBK"));
Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec");
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);// 压缩流,构造时会输出三字节的头信息:31-117 8
//1F=16+15=31;负数是以补码形势存储的,8B的二进制为10001011,先减一获得10001010,再除符号位各们取反获得原码11110101,即获得 -117
System.out.println();
IOUtils.copyBytes(bais, out, 4096, false);// 将压缩流输出到标准输出
out.finish();
System.out.println();
bais = new ByteArrayInputStream("测试".getBytes("GBK"));
ByteArrayOutputStream baos = new ByteArrayOutputStream(4);
out = codec.createOutputStream(baos);
IOUtils.copyBytes(bais, out, 4096, false);// 将压缩流输出到缓冲
out.finish();
bais = new ByteArrayInputStream(baos.toByteArray());
CompressionInputStream in = codec.createInputStream(bais);// 解压缩流
IOUtils.copyBytes(in, System.out, 4096, false);// 将压缩流输出到标准输出
// ---------将压缩文件上传到Hadoop中
// 注:hadoop默认使用的是UTF-8编码,若是使用GBK上传,使用 hadoop fs -text /gzip_test 命令
// 在Hadoop系统中查看时显示不出来,但Down下来后能够
bais = new ByteArrayInputStream("测试".getBytes("UTF-8"));
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
out = codec.createOutputStream(fs.create(new Path("/gzip_test.gz")));
IOUtils.copyBytes(bais, out, 4096);
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeStream(fsout);
}
在读取一个压缩文件时,能够经过文件扩展名推断须要使用哪一个codec,如以.gz结尾,则使用GzipCodec来读取。能够经过调用CompressionCodecFactory的getCodec()方法根据扩展名来获得一个CompressionCodec
示例:根据文件扩展名自动选取codec解压文件
publicclass FileDecompressor {
publicstaticvoid main(String[] args) throws Exception {
String uri = "hdfs://hadoop-master:9000/gzip_test.gz";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
// 根据文件的扩展名自动找到对应的codec
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
// 将解压出的文件放在hdoop上的同一目录下
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
CompressionCodecFactory从io.compression.codecs(core-site.xml配置文件里)配置属性里定义的列表中找到codec:
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value>
<description>A list of the compression codec classes that can be used for compression/decompression.</description>
</property>
运行上面示例时,会报如下警告:
WARN [main] org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library not loaded
hdfs://hadoop-master:9000/gzip_test
WARN [main] org.apache.hadoop.io.compress.zlib.ZlibFactory - Failed to load/initialize native-zlib library
这是由于程序是在Windows上运行的,在本地没有搜索到native类库,而使用Java实现来进行压缩与解压。若是将程序打包上传到Linux上运行时,第二个警告会消失:
[root@hadoop-master /root/tmp]# hadoop jar /root/tmp/FileDecompressor.jar
16/04/26 11:13:31 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/04/26 11:13:31 WARN snappy.LoadSnappy: Snappy native library not loaded
16/04/26 11:13:31 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
但第一个警告仍是有,缘由是Linux系统上没有安装snappy,下面安装:
1、安装snappy
yum install snappy snappy-devel
2、使得Snappy类库对Hadoop可用
ln -sf /usr/lib64/libsnappy.so /root/hadoop-1.2.1/lib/native/Linux-amd64-64
再次运行:
[root@hadoop-master /root/hadoop-1.2.1/lib/native/Linux-amd64-64]# hadoop jar /root/tmp/FileDecompressor.jar
16/04/26 11:42:19 WARN snappy.LoadSnappy: Snappy native library is available
16/04/26 11:42:19 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/04/26 11:42:19 INFO snappy.LoadSnappy: Snappy native library loaded
16/04/26 11:42:19 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
与内置的Java实现相比,原生的gzip类库能够减小约束一半的解压时间与约10%的压缩时间,下表列出了哪些算法有Java实现,哪些有本地实现:
默认状况下,Hadoop会根据自身运行的平台搜索原生代码库,若是找到则自加载,因此无需为了使用原生代码库而修改任何设置,可是,若是不想使用原生类型,则能够修改hadoop.native.lib配置属性(core-site.xml)为false:
<property>
<name>hadoop.native.lib</name>
<value>false</value>
<description>Should native hadoop libraries, if present, be used.</description>
</property>
如何使用的是代码库,而且须要在应用中执行大量压缩与解压操做,能够考虑使用CodecPool,它支持反复使用压缩秘解压,减小建立对应的开销
publicstaticvoid main(String[] args) throws Exception {
//注:这里使用GBK,若是使用UTF-8,则输出到标准时会乱码,缘由操做系统标准输出为GBK解码
ByteArrayInputStream bais = new ByteArrayInputStream("测试".getBytes("GBK"));
ByteArrayOutputStream bois = new ByteArrayOutputStream();
Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec");
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = null;
CompressionInputStream in = null;
Compressor cmpressor = null;// 压缩实例
Decompressor decompressor = null;// 解压实例
try {
// 从池中获取或新建一个Compressor压缩实例
cmpressor = CodecPool.getCompressor(codec);
// 从池中获取或新建一个Compressor解压缩实例
decompressor = CodecPool.getDecompressor(codec);
out = codec.createOutputStream(bois, cmpressor);
System.out.println();
IOUtils.copyBytes(bais, out, 4096, false);// 将压缩流输出到缓冲
out.finish();
bais = new ByteArrayInputStream(bois.toByteArray());
in = codec.createInputStream(bais, decompressor);// 解压压缩流
IOUtils.copyBytes(in, System.out, 4096, false);// 解压后标准输出
} finally {
IOUtils.closeStream(out);
CodecPool.returnCompressor(cmpressor);// 用完以后返回池中
CodecPool.returnDecompressor(decompressor);
}
}
若是压缩数据超过块大小后,会被分红多块,若是每一个片段数据单独做传递给不一样的Map任务,因为gzip数据是不能单独片段进行解压的,因此会出问题。但实际上Mapreduce任务仍是能够处理gzip文件的,只是若是发现(根据扩展名)是gz,就不会进行文件任务切分(其余算法也同样,只要不支持单独片段解压的,都会交给同一Map进行处理),而将这个文件块都交个同一个Map任务进行处理,这样会影响性能问题。
只有bzip2压缩格式的文件支持数据任务的切分,哪些压缩能切分请参考这里
要想压缩mapreduce做业的输出(即这里讲的是对reduce输出压缩),应该在mapred-site.xml配置文件的配置项mapred.output.compress设置为true,mapred.output.compression.code设置为要使用的压缩算法:
<property>
<name>mapred.output.compress</name>
<value>false</value>
<description>Should the job outputs be compressed?
</description>
</property>
<property>
<name>mapred.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.DefaultCodec</value>
<description>If the job outputs are compressed, how should they be compressed?
</description>
</property>
也能够直接在做业启动程序里经过FileOutputFormat进行设置:
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "MaxTemperatureWithCompression");
job.setJarByClass(MaxTemperatureWithCompression.class);
//map的输入能够是压缩格式的,也可直接是未压缩的文本文件,输入map前会自动根据文件后缀来判断是否须要解压,不须要特殊处理或配置
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop-master:9000/ncdc/1901_1902.txt.gz"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/MaxTemperatureWithCompression"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//与mapred-site.xml配置文件里的mapred.output.compress配置属性等效:job输出是否压缩,即对reduce输出是否采用压缩
FileOutputFormat.setCompressOutput(job, true);
//与mapred-site.xml配置文件里的mapred.output.compression.codec配置属性等效
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
若是Job输出生成的是顺序文件(sequence file),则能够设置mapred.output.compression.type(mapred-site.xml)来控制限制使用压缩格式,默认值为RECORD,表示针对每一条记录进行压缩。若是将其必为BLOCK,将针对一组记录进行压缩,这也是推荐的压缩策略,由于它的压缩效率更高
<property>
<name>mapred.output.compression.type</name>
<value>RECORD</value>
<description>If the job outputs are to compressed as SequenceFiles, how should
they be compressed? Should be one of NONE, RECORD or BLOCK.
</description>
</property>
该属性还能够直接在JOB启动任务程序里经过SequenceFileOutputFormat的setOutputCompressionType()来设定
mapred-site.xml配置文件里能够对Job做业输出压缩进行配置的三个配置项:
若是对map阶段的中间输出进行压缩,能够得到很多好处。因为map任务的输出须要写到磁盘并经过网络传输到reducer节点,因此若是使用LZO、LZ4或者Snappy这样的快速压缩方式,是能够得到性能提高的,由于要传输的数据减小了。
启用map任务输出压缩和设置压缩格式的三个配置属性以下(mapred-site.xml):
也可在程序里设定(新的API设置方式):
Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class,
CompressionCodec.class);
Job job = new Job(conf);
旧API设置方式,经过conf对象的方法设置:
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
publicinterface Writable {
void write(DataOutput out) throws IOException;//序列化:即将实例写入到out输出流中
void readFields(DataInput in) throws IOException;//反序列化:即从in输出流中读取实例
}
Hadoop中可序列化的类都实现了Writable这个接口,好比数据类型类BooleanWritable、ByteWritable、DoubleWritable、FloatWritable、IntWritable、LongWritable、Text
publicstaticvoid main(String[] args) throws IOException {
IntWritable iw = new IntWritable(163);
// 序列化
byte[] bytes = serialize(iw);
// Java里整型占两个字节
System.out.println(StringUtils.byteToHexString(bytes).equals("000000a3"));//true
// 反序列化
IntWritable niw = new IntWritable();
deserialize(niw, bytes);
System.out.println(niw.get() == 163);//true
}
// 序列化
publicstaticbyte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);//最终仍是借助于Java API中的ByteArrayOutputStream 与 DataOutputStream 来完成序列化:即将基本类型的值(这里为整数)转换为二进制的过程
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
// 反序列化
publicstaticvoid deserialize(Writable writable, byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputStream dataIn = new DataInputStream(in); //最终仍是借助于Java API中的ByteArrayInputStream与 DataInputStream来完成反序列化:即将二进制转换为基本类型的值(这里为整数)的过程
writable.readFields(dataIn);
dataIn.close();
}
IntWritable类的序列化与反序列化实现:
publicclass IntWritable implements WritableComparable<IntWritable> {
privateintvalue;
@Override
publicvoidreadFields(DataInput in) throws IOException {
value = in.readInt();
}
@Override
publicvoidwrite(DataOutput out) throws IOException {
out.writeInt(value); //实质上最后就是将整型值以二进制存储起来了
}
...
}
IntWritable实现了WritableComparable接口,而WritableComparable接口继承了Writable接口与java.lang.Comparable接口
publicclassIntWritableimplements WritableComparable {
publicinterfaceWritableComparable<T> extendsWritable, Comparable<T> {
publicinterface Comparable<T> {
publicintcompareTo(T o);
}
IntWritable实现了Comparable的compareTo方法,具体实现:
/** Compares two IntWritables. */
publicint compareTo(Object o) {
int thisValue = this.value;
int thatValue = ((IntWritable)o).value;
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
除了实现了Comparable比较能力接口,Hadoop提供了一个优化接口是继承自java.util.Comparator比较接口的RawComparator接口:
publicinterfaceRawComparator<T> extendsComparator<T> {
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
RawComparator:原生比较,即基于字节的比较
publicinterface Comparator<T> {
intcompare(T o1, T o2);
boolean equals(Object obj);
}
为何说是优化接口呢?由于该接口中的比较方法能够直接对字节进行比较,而不须要先反序列化后再比(由于是静态内部类实现:
/** A WritableComparable for ints. */
publicclass IntWritable implements WritableComparable {
...
/** A Comparator optimized for IntWritable. */
publicstaticclass Comparator extends WritableComparator {
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
...
}
}
...
}
),这样就避免了新建对象(即不须要经过反序列化重构Writable对象后,才能调用该对象的compareTo()比较方法)的额外开销,而Comparable接口比较时是基于对象自己的(属于非静态实现):
/** A WritableComparable for ints. */
publicclass IntWritable implements WritableComparable {
...
/** Compares two IntWritables. */
publicint compareTo(Object o) {
...
}
...
}
),因此比较前须要对输入流进行反序列重构成Writable对象后再比较,因此性能不高。如IntWritable的内部类IntWritable.Comparator就实现了RawComparator原生比较接口,性能比IntWritable.compareTo()比较方法高:
publicstaticclassComparatorextends WritableComparator {
public Comparator() {
super(IntWritable.class);
}
//这里实现的其实是重写WritableComparator里的方法。注:虽然WritableComparator已经提供了该方法的默认实现,但不要直接使用,由于父类WritableComparator提供的默认实现也是先反序列化后,再经过回调IntWritable里的compareTo()来完成比较的,因此咱们在为自定义Key时,必定要本身重写WritableComparator里提供的默认实现
@Override
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);// readInt为父类WritableComparator中的方法,将字节数组转换为整型(具体请参考后面),这样不须要将字节数组反序列化成IntWritable后再进行大小比对,而是直接对IntWritable里封装的int value进行比对
int thatValue = readInt(b2, s2);
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
而WritableComparator又是实现了RawComparator接口中的compare()方法,同时还实现了Comparator类中的:
publicclass WritableComparator implements RawComparator{
publicint compare(byte[] b1, ints1, intl1, byte[] b2, ints2, intl2) {//该方法实现的是RawComparator接口里的方法
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
buffer.reset(null, 0, 0); // clean up reference
} catch (IOException e) {
thrownew RuntimeException(e);
}
return compare(key1, key2); // compare them
}
@SuppressWarnings("unchecked")//该方法被上下两个方法调用,是WritableComparator里本身定义的方法,不是重写或实现
publicint compare(WritableComparable a, WritableComparable b) {
returna.compareTo(b);
}
@Override//该方法实现的是Comparator的compare(T o1, T o2)方法
publicint compare(Object a, Object b) {
return compare((WritableComparable)a, (WritableComparable)b);
}
}
WritableComparable是一个接口;而WritableComparator 是一个类。WritableComparator提供一个默认的基于对象(非字节)的比较方法compare(如上面所贴),这与实现Comparable接口的比较方法是同样的:都是基于对象的,因此性能也不高
获取IntWritable的内部类Comparator的实例:
RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
这样能够取到RawComparator实例,缘由是在IntWritable实现里注册过
static { // register this comparator
WritableComparator.define(IntWritable.class, newComparator());
}
这个comparator实例能够用于比较两个IntWritable对象:
IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));// comparator.compare(w1, w2)会回调IntWritable.compareTo方法
或是IntWritable对象序列化的字节数组:
byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length),greaterThan(0));//这里才真正调用IntWritable.Comparator.compare()方法进行原生比较
上面分析的是IntWritable类型,其余类型基本上也是这样
Key在Map的shuffle过程当中是须要进行排序的,这就要求Key是实现WritableComparable的类,或者若是不实现WritableComparable接口时,须要经过Job指定比较类,他们的优先选择顺序以下:
一、 若是配置了mapred.output.key.comparator.class比较类,或明确地经过job的setSortComparatorClass(Class<? extends RawComparator> cls)方法(旧API为setOutputKeyComparatorClass() on JobConf)指定过,则使用指定类(通常从WritableComparator继承)的实例进行排序(这种状况要不须要WritableComparable,而只需实现Writable便可)
二、 不然,Key必须是实现了WritableComparable的类(由于在实现内部静态比较器继承时须要继承WritableComparator,其构造函数须要传进一个实现了WritableComparable的Key,并在WritableComparator类里提供的默认比较会回调Key类所实现的compareTo()方法,因此须要实现WritableComparable类),而且若是该Key类内部经过静态块(WritableComparator.define(Class c, WritableComparator comparator))注册过基于字节比较的类WritableComparator(实现RawComparator的抽象类,RawComparator又继承了Comparator接口),则使用字节比较方式进行排序(通常使用这种)
三、 不然,若是没有使用静态注册过内部实现WritableComparator,则使用WritableComparable的compareTo()进行对象比较(这须要先反序列化成对象以后)(注:此状况下Key也必须是实现WritableComparable类)
Writable不少的实现类实质上是对Java基本类型(但除char没有对应的Writable实现类外,char能够存放在IntWritable中)的再一次封装,get()、set()方法就是对这些封装的基本值的读取与设定:
从上表能够看出,VIntWritable(1~5)与 VLongWritable(1~9)为变长。若是数字在-112~127之间时,变长格式就只用一个字节进行编码;不然,使用第一个字节来存放正负号,其余字节就存放值(究竟须要多少字节来存放,则是看数字的大小,如int类型的值须要1~4个字节不等)。如 值为163须要两个字节,而不是4个字节:第一个字节存符号为(不一样长度的数这个字节存储的不太同样),第二个字节存放的是值;而257则须要三个字节来存放了;
可变长度类型有点像UTF-8同样,编码是变长的,若是传输内容中的数字都是比较小的数时(若是内容都是英文的字符,UTF-8就会大大缩短编码长度),用可变长度则能够减小数据量,这些数的范围:-65536 =< VIntWritable =< 65535此范围最多只占3字节,包括符号位;-281474976710656L =< VLongWritable =< 28147497671065L此范围最多只占7字节,包括符号位,若是超过了这些数,建议使用定长的,由于此时定长的所占字节还少,由于在接近最大Int或Long时,变长的VintWritable达到5个字节(如2147483647就占5字节),VlongWritable达到9个字节(如9223372036854775807L就占9字节),而定长的最多只有4字节与8字节
另外,同一个数用VintWritable或VlongWritable最后所占有字节数是同样的,好比2147483647这个数,都是8c7fffffff,占5字节,既然同一数字的编码长度都同样,因此优先推荐使用 VlongWritable,由于他存储的数比VintWritable更大,有更好的扩展
虽然VintWritable与VlongWritable所占最大字节可能分别达到5或9位,但它们容许的最大数的范围也 基本类型 int、long是同样的,即VintWritable容许的数字范围:-2147483648 =< VintWritable =< 2147483647;VlongWritable容许的数字范围:-9223372036854775808L =< VlongWritable =< 9223372036854775807L,由于它们的构造函数参数的类型就是基本类型int、long:
public VIntWritable(int value) { set(value); }
public VLongWritable(long value) { set(value); }
提供了序列化、反序列化和在字节级别上比较文本的方法。它的长度类型是整型,采用0压缩序列化格式。另外,它还支持在不将字符数组转换为字符串的状况下进行字符串遍历
至关于Java中的String类型,采用UTF-8编解码,它是对 byte[] 字节数组的封装,而不直接是String:
length存储了字符串所占的字节数,为int类型,因此最大可达2GB。
getLength():返回的是字节数组bytes的所存储内容的有效长度,而非字符串个数,取长度时不要直接经过getBytes().length来获取,由于在经过set()方法重置Text后,有时数组整个长度会大于所存内容的长度
getBytes():返回字符串原生字节数组,但数据的有效长度到getLength()
与String不一样的是,Text是可变的,能够经过set()方法重用它
Text索引位置都是以字节为单位进行索引的,并不像String那样是以字符为单位进行索引的
Text与IntWritable同样,也是可序列化与可比较的
因为Text在内存中使用的是UTF-8编码的字节码,而Java中的String则是Unicode编码,因此是有区别的
Text t = new Text("江正军");
//字符所占字节数,而非字符个数
System.out.println(t.getLength());// 9 UTF-8编码下每一个中文占三字节
//取单个字符,charAt()返回的是Unicode编码
System.out.println((char) t.charAt(0));
System.out.println((char) t.charAt(3));// 第二个字符,注意:传入的是byte数组中的索引,不是字符位置索引
System.out.println((char) t.charAt(6));
//转换成String
System.out.println(t.toString());// 江正军
ByteBuffer buffer = ByteBuffer.wrap(t.getBytes(), 0, t.getLength());
int cp;
// 遍历每一个字符
while (buffer.hasRemaining() && (cp = Text.bytesToCodePoint(buffer)) != -1) {
System.out.println((char) cp);
}
// 在末尾附加字符
t.append("江".getBytes("UTF-8"), 0, "江".getBytes("UTF-8").length);
System.out.println(t.toString());// 江正军江
// 查找字符:返回第一次出现的字符位置(也是在字节数组中的偏移量,而非字符位置),相似String的indexOf,注:这个位置指字符在UTF-8字节数组的索引位置,而不是指定字符所在位置
System.out.println(t.find("江"));// 0
System.out.println(t.find("江", 1));// 9 从第2个字符开始向后查找
Text t2 = new Text("江正军江");
//比较Text:若是相等,返回0
System.out.println(t.compareTo(t2));// 0
System.out.println(t.compareTo(t2.getBytes(), 0, t2.getLength()));//0
下表列出Text字符(实为UTF-8字符)与String(实为Unicode字符)所占字节:若是是拉丁字符如大写字母A,则存放在Text中只占一个字节,而String占用两字节;大于127的都占有两字节;汉字时Text占有三字节,String占两字节;后面的U+10400不知道是什么扩展字符?反正表示一个字符,但都占用了4个字节:
@Test
publicvoid string() throws UnsupportedEncodingException {
String s = "\u0041\u00DF\u6771\uD801\uDC00";
assertThat(s.length(), is(5));
assertThat(s.getBytes("UTF-8").length, is(10));
assertThat(s.indexOf("\u0041"), is(0));
assertThat(s.indexOf("\u00DF"), is(1));
assertThat(s.indexOf("\u6771"), is(2));
assertThat(s.indexOf("\uD801\uDC00"), is(3));
assertThat(s.charAt(0), is('\u0041'));
assertThat(s.charAt(1), is('\u00DF'));
assertThat(s.charAt(2), is('\u6771'));
assertThat(s.charAt(3), is('\uD801'));
assertThat(s.charAt(4), is('\uDC00'));
assertThat(s.codePointAt(0), is(0x0041));
assertThat(s.codePointAt(1), is(0x00DF));
assertThat(s.codePointAt(2), is(0x6771));
assertThat(s.codePointAt(3), is(0x10400));
}
@Test
publicvoid text() {
Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
assertThat(t.getLength(), is(10));
assertThat(t.find("\u0041"), is(0));
assertThat(t.find("\u00DF"), is(1));
assertThat(t.find("\u6771"), is(3));
assertThat(t.find("\uD801\uDC00"), is(6));
assertThat(t.charAt(0), is(0x0041));
assertThat(t.charAt(1), is(0x00DF));
assertThat(t.charAt(3), is(0x6771));
assertThat(t.charAt(6), is(0x10400));
}
与Text同样,BytesWritable是对二进制数据的封装
序列化时,前4个字节存储了字节数组的长度:
publicstaticvoid main(String[] args) throws IOException {
BytesWritable b = new BytesWritable(newbyte[] { 3, 5 });
byte[] bytes = serialize(b);
System.out.println((StringUtils.byteToHexString(bytes)));//000000020305
}
// 序列化
publicstaticbyte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
BytesWritable也是可变的,能够经过set()方法进行修改。与Text同样,BytesWritable的getBytes()返回的是字节数组长——容量——也能够没法体现所存储的实际大小,能够经过getLength()来肯定实际大小,能够经过 setCapacity(int new_cap) 方法重置缓冲大小
它是一个Writable特殊类,它序列化长度为0,即不从数据流中读取数据,也不写入数据,充当占位符。如在MapReduce中,若是你不须要使用键或值,你就能够将键或值声明为NullWritable
它是一个单例,能够经过NullWritable.get()方法获取实例
ObjectWritable是对Java基本类型、String、enum、Writable、null或这些类型组成的一个通用封装:
当一个字段中包含多个类型时(好比在map输出多种类型时),ObjectWritable很是有用,例如:若是SequenceFile中的值包含多个类型,就能够将值类型声明为ObjectWritable。
能够经过getDeclaredClass()获取ObjectWritable封装的类型
ObjectWritable在序列会时会将封装的类型名一并输出,这会浪费空间,咱们能够使用GenericWritable来解决这个问题:若是封装的类型数量比较少而且可以提交知道须要封装哪些类型,那么就能够继承GenericWritable抽象类,并实现这个类将要对哪些类型进行封装的抽象方法:
abstractprotected Class<? extends Writable>[] getTypes();
这们在序列化时,就没必要直接输出封装类型名称,而是这些类型的名称的索引(在GenericWritable内部会它他们分配编号),这样就减小空间来提升性能
class MyWritable extendsGenericWritable{
MyWritable(Writable writable) {
set(writable);
}
publicstatic Class<? extends Writable>[] CLASSES = new Class[] { Text.class };
@Override
protected Class<? extends Writable>[] getTypes() {
returnCLASSES;
}
publicstaticvoid main(String[] args) throws IOException {
Text text = new Text("\u0041\u0071");
MyWritable myWritable = new MyWritable(text);
System.out.println(StringUtils.byteToHexString(serialize(text)));// 024171
System.out.println(StringUtils.byteToHexString(serialize(myWritable)));// 00024171
ObjectWritable ow = new ObjectWritable(text); //00196f72672e6170616368652e6861646f6f702e696f2e5465787400196f72672e6170616368652e6861646f6f702e696f2e54657874024171 红色前面都是类型名序列化出来的结果,占用了很大的空间
System.out.println(StringUtils.byteToHexString(serialize(ow)));
}
publicstaticbyte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
}
GenericWritable的序列化只是把类型在type数组里的索引放在了前面,这样就比ObjectWritable节省了不少空间,因此推荐你们使用GenericWritable
6种集合类:ArrayWritable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable,SortedMapWritable, EnumSetWritable.
ArrayWritable与TwoDArrayWritable是对Writable的数组和二维数据(数组的数组)的实现:
ArrayWritable与TwoDArrayWritable中全部元素必须是同一类型的实例(在构造函数中指定):
ArrayWritable writable = new ArrayWritable(Text.class);
TwoDArrayWritable writable = new TwoDArrayWritable(Text.class);
ArrayWritable与TwoDArrayWritable都有get、set、toArray方法,注:toArray方法是获取的数组(或二维数组)的副本(浅复制,虽然数组壳是复制了一份,只里面存放的元素未深度复制)
publicvoid set(Writable[] values) { this.values = values; }
publicWritable[] get() { returnvalues; }
publicvoid set(Writable[][] values) { this.values = values; }
publicWritable[][] get() { returnvalues; }
ArrayPrimitiveWritable是对Java基本数组类型的一个封装,调用set()方法时能够识别相应组件类型,所以无需经过继承来设置类型
MapWritable 与 SortedMapWritable分别实现了java.util.Map<Writable,Writable> 与 java.util.SortedMap<WritableComparable, Writable>接口。它们在序列化时,类型名称也是使用索引来替代一块儿输出,若是存入的是自定义Writable内,则不要超过127个,因它这两个类里面是使用一个byte来存放自定义Writable类型名称索引的,而那些标准的Writable则使用-127~0之间的数字来编号索引
对集合的枚举类型能够采用EnumSetWritable。对于单类型的Writable列表,使用ArrayWritable就足够了。但若是须要把不一样的Writable类型存放在单个列表中,能够使用GenericWritable将元素封装在一个ArrayWritable中
Hadoop中提供的现有的一套标准Writable是能够知足咱们决大多数需求的。但在某些业务下需咱们定义具备本身数据结构的Writable。
定制的Writable能够彻底控制二进制表示和排序顺序。因为Writable是MapReduce数据路径的核心,因此调整二进制表示能对性能产生显著效果。虽然Hadoop自带的Writable实现已通过很好的性能调优,但若是但愿将结构调整得更好,更好的作法就是新建一个Writable类型
示例:存储一对Text对象的自定义Writable,若是是Int整型,能够参考后面示例IntPair,若是复合键若是由整型与字符型组成,则可能同时参考这两个类来定义:
publicclassTextPairimplements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
publicvoid set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
returnfirst;
}
public Text getSecond() {
returnsecond;
}
@Override
publicvoid write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
publicvoid readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
/*
* HashPartitioner(MapReuce中的默认分区类)一般用hashcode()方法来选择reduce分区,所
* 以应该确保有一个比较好的哈希函数来保证每一个reduce数据分区大小类似
*/
@Override
publicint hashCode() {
returnfirst.hashCode() * 163 + second.hashCode();
}
@Override
publicboolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
returnfirst.equals(tp.first) && second.equals(tp.second);
}
returnfalse;
}
/*
* TextOutputFormat将键或值输出时,会调用此方法,因此也需重写
*/
@Override
public String toString() {
returnfirst + "\t" + second;
}
/*
* 除VIntWritable、VLongWritable这两个Writable外,大多数的Writable类自己都实现了
* Comparable比较能力的接口compareTo()方法,而且又还在Writable类静态的实了Comparator
* 比较接口的compare()方法,这两个方法在Writable中的实现的性能是不同的:Comparable.
* compareTo()方法在比较前,须要将字节码反序列化成相应的Writable实例后,才能调用;而
* Comparator.compare()比较前是不须要反序列化,它能够直接对字节码(数组)进行比较,所
* 以这个方法的性能比较高,属于原生比较
*
* VIntWritable、VLongWritable这两个类里没有静态的实现Comparator接口,多是由于
* 变长的缘由,
*
*/
@Override//WritableComparator里自定义比较方法 compare(WritableComparable a, WritableComparable b) 会回调此方法
publicintcompareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {//先按第一个字段比,若是相等,再比较第二个字段
return cmp;
}
returnsecond.compareTo(tp.second);
}
//(整型类型IntWritable基于字节数组原生比较请参考这里)
publicstaticclassComparatorextends WritableComparator {
privatestaticfinal Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
//或者这样来获取Text.Comparator实例?
// RawComparator<IntWritable> comparator = WritableComparator.get(Text.class);
public Comparator() {
super(TextPair.class);
}
/*
* 这个方法(下面注释掉的方法)从Text.Comparator.compare()方法拷过来的 l1、l2表示字节数有效的长度
*
* 因为Text在序列化时(这一序列化过程可参照Text的序列化方法write()源码来了解):首先是将Text的有效字节数 length
* 以VIntWritable方式序列化(即length在序列化时所在字节为 1~5), 而后再将整个字节数组序列化
* (字节数组序列化时也是先将字节有效长度输出,不过此时为Int,而非VInt,请参考后面贴出的源码)
* 下面是Text的序列化方法源码:
* public void write(DataOutput out) throws IOException {
* WritableUtils.writeVInt(out, length);
* out.write(bytes,0, length);
* }
*
* 下面是BytesWritable的序列化方法源码:
* public void write(DataOutput out) throws IOException {
* out.writeInt(size);
* out.write(bytes, 0, size);
* }
*
* WritableUtils.decodeVIntSize(b1[s1]):读出Text序列化出的串前面多少个字节是用來表示Text的长度的,
* 这样在取Text字節內容時要跳過長度信息串。传入时只需传入字节数组的第一个字节便可
*
* compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2):此方法才是真正按一個個字節進行大小比較
* b1从s1 + n1开始l1 - n1个字节才是Text真正字节内容
*
*/
// public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {//此方法是从Text.Comparator中拷出来的
// int n1 = WritableUtils.decodeVIntSize(b1[s1]);//序列化串中前面多少个字节是长度信息
// int n2 = WritableUtils.decodeVIntSize(b2[s2]);
// return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
// }
@Override
publicintcompare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
//WritableUtils.decodeVIntSize(b1[s1])表示Text有效长度序列化输出占几个字节
//readVInt(b1, s1):将Text有效字节长度是多少读取出来。
//最后firstL1 表示的就是第一个Text属性成员序列化输出的有效字节所占长度
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
//比较第一个Text:即first属性。自己Text里就有Comparator的实现,这里只须要将first
//与second所对应的字节截取出来,再调用Text.Comparator.compare()即根据字节进行比较
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}//若是第一个Text first 不等,则比较第二个Test:即second属性
//s1 + firstL1为第二个Text second的起始位置,l1 - firstL1为第二个Text second的字节数
returnTEXT_COMPARATOR
.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
thrownew IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
}
SequenceFile:顺序文件、或叫序列文件。它是一种具备必定存储结构的文件,数据以在内存中的二进制写入。Hadoop在读取与写入这类文件时效率会高
顺序文件——相对于MapFile只能顺序读取,因此称顺序文件
序列文件——写入文件时,直接将数据在内存中存储的二进写入到文件,因此写入后使用记事本没法直接阅读,但使用程序反序列化后或经过Hadoop命令能够正常阅读显示:hadoop fs -text /sequence/seq1
SequenceFile类提供了Writer,Reader 和 SequenceFile.Sorter 三个类用于完成写,读,和排序
publicclass SequenceFileWriteDemo {
privatestaticfinal String[] DATA = { "One, 一", "Three, 三", "Five, 五", "Seven, 七", "Nine, 九" };
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/sequence/seq1";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
/*
* 该方法有不少的重载版本,但都须要指定FileSystem+Configuration(或FSDataOutputStream+Configuration)
* 、键的class、值的class;另外,其余可选参数包括压缩类型CompressionType以及相应的CompressionCodec
* 、 用于回调通知写入进度的Progressable、以及在Sequence文件头存储的Metadata实例
*
* 存储在SequenceFile中的键和值并不必定须要Writable类型,只要能被Serialization序列化和反序列化
* ,任何类型均可以
*/
// 经过静态方法获取顺序文件SequenceFile写实例SequenceFile.Writer
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < 10; i++) {
key.set(10 - i);
value.set(DATA[i % DATA.length]);
// getLength()返回文件当前位置,后继将今后位置接着写入(注:当SequenceFile刚建立时,就已
// 写入元数据信息,因此刚建立后getLength()也是非零的
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
/*
* 同步点:用来快速定位记录(键值对)的边界的一个特殊标识,在读取SequenceFile文件时,能够经过
* SequenceFile.Reader.sync()方法来搜索这个同步点,便可快速找到记录的起始偏移量
*
* 加入同步点的顺序文件能够做为MapReduce的输入,因为访类顺序文件容许切分,因此该文件的不一样部分能够
* 由不一样的map任务单独处理
*
* 在每条记录(键值对)写入前,插入一个同步点,这样是方便读取时,快速定位每记录的起始边界(若是读取的
* 起始位置不是记录边界,则会抛异常SequenceFile.Reader.next()方法会抛异常)
*
* 在真正项目中,可能不是在每条记录写入前都加上这个边界同步标识,而是以业务数据为单位(多条记录)加入
* ,这里只是为了测试,因此每条记录前都加上了
*/
writer.sync();
// 只能在文件末尾附加健值对
writer.append(key, value);
}
} finally {
// SequenceFile.Writer实现了java.io.Closeable,能够关闭流
IOUtils.closeStream(writer);
}
}
}
写入后在操做系统中打开显示乱的:
从上面能够看出这种文件的前面会写入一些元数据信息:键的Class、值的Class,以及压缩等信息
若是使用Hadoop来看,则仍是能够正常显示的,由于该命令会给咱们反序列化后再展现出来:
publicclass SequenceFileReadDemo {
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/sequence/seq1";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
// 经过SequenceFile.Reader实例进行读
reader = new SequenceFile.Reader(fs, path, conf);
/*
* 经过reader.getKeyClass()方法从SequenceFile文件头的元信息中读取键的class类型
* 经过reader.getValueClass()方法从SequenceFile文件头的元信息中读取值的class类型
* 而后经过ReflectionUtils工具反射获得Key与Value类型实例
*/
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
// 返回当前位置,今后位置读取下一健值对
long position = reader.getPosition();
// 读取下一健值对,并分别存入key与value变量中,若是到文件尾,则返回false
while (reader.next(key, value)) {
// 若是读取的记录(键值对)前有边界同步特殊标识时,则打上*
String syncSeen = reader.syncSeen() ? "*" : "";
// position为当前输入键值对的起始偏移量
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next
// record下一对健值对起始偏移量
}
System.out.println();
//设置读取的位置,注:必定要是键值对起始偏移量,即记录的边界位置,不然抛异常
reader.seek(228);
System.out.print("[" + reader.getPosition() + "]");
reader.next(key, value);
System.out.println(key + " " + value + " [" + reader.getPosition() + "]");
//这个方法与上面seek不一样,传入的位置参数不须要是记录的边界起始偏移的准确位置,根据边界同步特殊标记能够自动定位到记录边界,这里从223位置开始向后搜索第一个同步点
reader.sync(223);
System.out.print("[" + reader.getPosition() + "]");
reader.next(key, value);
System.out.println(key + " " + value + " [" + reader.getPosition() + "]");
} finally {
IOUtils.closeStream(reader);
}
}
}
hadoop fs –text命令除能够显示纯文本文件,还能够以文本形式显示SequenceFile文件、MapFile文件、gzip压缩文件,该命令能够自动力检测出文件的类型,根据检测出的类型将其转换为相应的文本。
对于SequenceFile文件、MapFile文件,会调用Key与Value的toString方法来显示成文本,因此要重写好自定义的Writable类的toString()方法
MapReduce是对一个或多个顺序文件进行排序(或合并)最好的方法。MapReduce自己是并行的,并就能够指定reducer的数量(即分区数),如指定1个reducer,则只会输出一个文件,这样就能够将多个文件合并成一个排序文件了。
除了本身写这样一个简单的排序合并MapReduce外,咱们能够直接使用Haddop提供的官方实例来完成排序合并,如将前面写章节中产生的顺序文件从新升级排序(原输出为降序):
[root@hadoop-master /root/hadoop-2.7.2/share/hadoop/mapreduce]# hadoop jar ./hadoop-mapreduce-examples-2.7.2.jar sort-r 1 -inFormat org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat -outKey org.apache.hadoop.io.IntWritable -outValue org.apache.hadoop.io.Text /sequence/seq1 /sequence/seq1_sorted
[root@hadoop-master /root/hadoop-2.7.2/share/hadoop/mapreduce]# hadoop fs -text /sequence/seq1_sorted/part-r-00000
1 Nine, 九
2 Seven, 七
3 Five, 五
4 Three, 三
5 One, 一
6 Nine, 九
7 Seven, 七
8 Five, 五
9 Three, 三
10 One, 一
System.out.println("sort [-r <reduces>] " + //reduces的数量
"[-inFormat <input format class>] " +
"[-outFormat <output format class>] " +
"[-outKey <output key class>] " +
"[-outValue <output value class>] " +
"[-totalOrder <pcnt> <num samples> <max splits>] " +
"<input> <output>");
注:官方提供的Sort示例除了排序合并顺序文件外,还能够合并普通的文本文件,下面是它的部分源码:
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(num_reduces);
job.setInputFormatClass(inputFormatClass);
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(outputKeyClass);
job.setOutputValueClass(outputValueClass);
顺序文件由文件头Header、随后的一条或多条记录Record、以及记录间边界同步点特殊标识符Sync(可选):
此图为压缩前和记录压缩Record compression后的顺序文件的内部结构
顺序文件的前三个字节为SEQ(顺序文件代码),紧随其后的一个字节表示顺序文件的版本号,文件头还包括其余字段,例如键和值的名称、数据压缩细节、用户定义的元数据,此外,还包含了一些同步标识,用于快速定位到记录的边界。
每一个文件都有一个随机生成的同步标识,存储在文件头中。同步标识位于顺序文件中的记录与记录之间,同步标识的额外存储开销要求小于1%,因此没有必要在每条记录末尾添加该标识,特别是比较短的记录
记录的内部结构取决因而否启用压缩,SeqeunceFile支持两种格式的数据压缩,分别是:记录压缩record compression和块压缩block compression。
record compression如上图所示,是对每条记录的value进行压缩
默认状况是不启用压缩,每条记录则由记录长度(字节数)Record length、健长度Key length、键Key和值Value组成,长度字段占4字节
记录压缩(Record compression)格式与无压缩状况基本相同,只不过记录的值是用文件头中定义的codec压缩的,注,键没有被压缩(指记录压缩方式的Key是不会被压缩的,而若是是块压缩方式的话,整个记录的各个部分信息都会被压缩,请看下面块压缩)
块压缩(Block compression)是指一次性压缩多条记录,由于它能够利用记录间的类似性进行压缩,因此比单条记录压缩方式要好,块压缩效率更高。block compression是将一连串的record组织到一块儿,统一压缩成一个block:
上图:采用块压缩方式以后,顺序文件的内部结构,记录的各个部分都会被压缩,不仅是Value部分
能够不断向数据块中压缩记录,直到块的字节数不小于io.seqfile.compress.blocksize(core-site.xml)属性中设置的字节数,默认为1MB:
<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
<description>The minimum block size for compression in block compressed SequenceFiles.
</description>
</property>
每个新块的开始处都须要插入同步标识,block数据块的存储格式:块所包含的记录数(vint,1~5个字节,不压缩)、每条记录Key长度的集合(Key长度集合表示将全部Key长度信息是放在一块儿进行压缩)、每条记录Key值的集合(全部Key放在一块儿再起压缩)、每条记录Value长度的集合(全部Value长度信息放在一块儿再进行压缩)和每条记录Value值的集合(全部值放在一块儿再进行压缩)
MapFile是已经排过序的SequenceFile,它有索引,索引存储在另外一单独的index文件中,因此能够按键进行查找,注:MapFile并未实现java.util.Map接口
MapFile是对SequenceFile的再次封装,分为索引与数据两部分:
publicclass MapFile {
/** The name of the index file. */
publicstaticfinal String INDEX_FILE_NAME = "index";
/** The name of the data file. */
publicstaticfinal String DATA_FILE_NAME = "data";
publicstaticclass Writer implements java.io.Closeable {
private SequenceFile.Writer data;
private SequenceFile.Writer index;
/** Append a key/value pair to the map. The key must be greater or equal
* to the previous key added to the map. 在Append时,Key的值必定要大于或等于前面的已加入的值,即升序,不然抛异常*/
publicsynchronizedvoid append(WritableComparable key, Writable val)
throws IOException {
...
publicstaticclass Reader implements java.io.Closeable {
// the data, on disk
private SequenceFile.Reader data;
private SequenceFile.Reader index;
...
与SequenceFile同样,也是使用append方法在文件末写入,并且键要是WritableComparable类型的具备比较能力的Writable,值与SequenceFile同样也是Writable类型便可
privatestaticfinal String[] DATA = { "One, 一", "Three, 三", "Five, 五", "Seven, 七", "Nine, 九" };
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/map";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
IntWritable key = new IntWritable();
Text value = new Text();
MapFile.Writer writer = null;
try {
/*
* 注:在建立writer时与SequenceFile不太同样,这里传进去的是URI,而不是具体文件的Path,
* 这是由于MapFile会生成两个文件,一个是data文件,一个是index文件,能够查看MapFile源码:
* //The name of the index file.
* public static final String INDEX_FILE_NAME = "index";
* //The name of the data file.
* public static final String DATA_FILE_NAME = "data";
*
* 因此不须要具体的文件路径,只传入URI便可,且传入的URI只到目录级别,即便包含文件名也会看做目录
*/
writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass());
for (int i = 0; i < 1024; i++) {
key.set(i);
value.set(DATA[i % DATA.length]);
// 注:append时,key的值要大于等前面已加入的键值对
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
[root@localhost /root]# hadoop fs -ls /map
Found 2 items
-rw-r--r-- 3 Administrator supergroup 430 2016-05-01 10:24 /map/data
-rw-r--r-- 3 Administrator supergroup 203 2016-05-01 10:24 /map/index
会在map目录下建立两个文件data与index文件,这两个文件都是SequenceFile
[root@localhost /root]# hadoop fs -text /map/data | head
0 One, 一
1 Three, 三
2 Five, 五
3 Seven, 七
4 Nine, 九
5 One, 一
6 Three, 三
7 Five, 五
8 Seven, 七
9 Nine, 九
[root@localhost /root]# hadoop fs -text /map/index
0 128
128 4013
256 7918
384 11825
512 15730
640 19636
768 23541
896 27446
Index文件存储了部分键(上面显示的第一列)及在data文件中的起使偏移量(上面显示的第二列)。从index输出能够看到,默认状况下只有每隔128个键才有一个包含在index文件中,固然这个间隔是能够调整的,可调用MapFile.Writer实例的setIndexInterval()方法来设置(或者经过io.map.index.interval属性配置也可)。增长索引间隔大小能够有效减小MapFile存储索引所须要的内存,相反,若是减少间隔则能够提升查询效率。由于索引index文件只保留一部分键,因此MapFile不可以提供枚举或计算全部的键的方法,惟一的办法是读取整个data文件
下面能够根据index的索引seek定位到相应位置后读取相应记录:
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/map/data";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.seek(4013);
System.out.print("[" + reader.getPosition() + "]");
reader.next(key, value);
System.out.println(key + " " + value + " [" + reader.getPosition() + "]");
} finally {
IOUtils.closeStream(reader);
}
}
[4013]128 Seven, 七 [4044]
MapFile遍历文件中全部记录与SequenceFile同样:先建一个MapFile.Reader实例,而后调用next()方法,直到返回为false到文件尾:
/** Read the next key/value pair in the map into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
* the end of the map */
publicsynchronizedboolean next(WritableComparable key, Writable val)
throws IOException {
经过调用get()方法能够随机访问文件中的数据:
/** Return the value for the named key, or null if none exists. */
publicsynchronized Writable get(WritableComparable key, Writable val)
throws IOException {
根据指定的key查找记录,若是返回null,说明没有相应的条目,若是找到相应的key,则将该键对应的值存入val参变量中
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/map/data";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
MapFile.Reader reader = null;
try {
//构造时,路径只须要传入目录便可,不能到data文件
reader = new MapFile.Reader(fs, "hdfs://hadoop-master:9000/map", conf);
IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
key.set(255);
//根据给定的key查找相应的记录
reader.get(key, value);
System.out.println(key + " " + value);// 255 One, 一
} finally {
IOUtils.closeStream(reader);
}
}
get()时,MapFile.Reader首先将index文件读入内存,接着对内存中的索引进行二分查找,最后在index中找到小于或等于搜索索引的键255,这里即为128,对应的data文件中的偏移量为4013,而后从这个位置顺序读取每条记录,拿出Key一个个与255进行对比,这里很不幸运,须要比较128(由io.map.index.interval决定)次直到找到键255为止。
getClosest()方法与get()方法相似,只不过它返回的是与指定键匹配的最接近的键,而不是在不匹配的返回null,更准确地说,若是MapFile包含指定的键,则返回对应的条目;不然,返回MapFile中的第一个大于(或小于,由相应的boolean参数指定)指定键的键
大型MapFile的索引全加载到内存会占据大量内存,若是不想将整个index加载到内存,不须要修改索引间隔以后再重建索引,而是在读取索引时设置io.map.index.skip属性(编程时可经过Configuration来设定)来加载必定比例的索引键,该属性一般设置为0,意味着加载index时不跳过索引键所有加载;若是设置为1,则表示加载index时每次跳过索引键中的一个,这样索引会减半;若是设置为2,则表示加载index时每次读取索引时跳过2个键,这样只加载索引的三分一的键,以此类推,设置的值越大,节省大量内存,但增长搜索时间
l SetFile是一个特殊的MapFile,用于只存储Writable键的集合,键必须升序添加:
publicclass SetFile extends MapFile {
publicstaticclass Writer extends MapFile.Writer {
/** Append a key to a set. The key must be strictly greater than the
* previous key added to the set. */
publicvoid append(WritableComparable key) throws IOException{
append(key, NullWritable.get());//只存键。因为调用MapFile.Writer.append()方法实现,因此键也只能升序添加
}
. . .
/** Provide access to an existing set file. */
publicstaticclass Reader extends MapFile.Reader {
/** Read the next key in a set into <code>key</code>. Returns
* true if such a key exists and false when at the end of the set. */
publicboolean next(WritableComparable key)
throws IOException {
return next(key, NullWritable.get());//也只读取键
}
l ArrayFile也是一个特殊的MapFile,键是一个整型,表示数组中的元素索引,而值是一个Writable值
publicclass ArrayFile extends MapFile {
/** Write a new array file. */
publicstaticclass Writer extends MapFile.Writer {
private LongWritable count = new LongWritable(0);
/** Append a value to the file. */
publicsynchronizedvoid append(Writable value) throws IOException {
super.append(count, value); // add to map 键是元素索引
count.set(count.get()+1); // increment count 每添加一个元素后,索引加1
}
. . .
/** Provide access to an existing array file. */
publicstaticclass Reader extends MapFile.Reader {
private LongWritable key = new LongWritable();
/** Read and return the next value in the file. */
publicsynchronized Writable next(Writable value) throws IOException {
return next(key, value) ? value : null;//只返回值
}
/** Returns the key associated with the most recent call to {@link
* #seek(long)}, {@link #next(Writable)}, or {@link
* #get(long,Writable)}. */
publicsynchronizedlong key() throws IOException {//若是知道是第几个元素,则是能够调用此方法
returnkey.get();
}
/** Return the <code>n</code>th value in the file. */
publicsynchronized Writable get(long n, Writable value)//根据数组元素索引取值
throws IOException {
key.set(n);
return get(key, value);
}
l BloomMapFile文件构建在MapFile的基础之上:
publicclass BloomMapFile {
publicstaticfinal String BLOOM_FILE_NAME = "bloom";
publicstaticclass Writer extends MapFile.Writer {
惟一不一样之处就是,除了data与index两个文件外,还增长了一个bloom文件,该bloom文件主要包含一张二进制的过滤表,该过滤表能够提升key-value的查询效率。在每一次写操做完成时,会更新这个过滤表,其实现源代码以下:
publicclass BloomMapFile {
publicstaticclass Writer extends MapFile.Writer {
publicsynchronizedvoid append(WritableComparable key, Writable val)
throws IOException {
super.append(key, val);
buf.reset();
key.write(buf);
bloomKey.set(byteArrayForBloomKey(buf), 1.0);
bloomFilter.add(bloomKey);
}
它有两个调优参数,一个是io.mapfile.bloom.size,指出map文件中大概有多少个条目;另外一个是io.mapfile.bloom.error.rate , BloomMapFile中使用布隆过滤器失败比率. 若是减小这个值,使用的内存会成指数增加。
VERSION: 过滤器的版本号;
nbHash: 哈希函数的数量;
hashType: 哈希函数的类型;
vectorSize: 过滤表的大小;
nr: 该BloomFilter可记录key的最大数量;
currentNbRecord: 最后一个BloomFilter记录key的数量;
numer: BloomFilter的数量;
vectorSet: 过滤表;
前提是SequenceFile里是按键升序存放的,这样才能够为它建立index文件
publicclass MapFileFixer {
publicstaticvoid main(String[] args) throws Exception {
String mapUri = "hdfs://hadoop-master:9000/sequence2map";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(mapUri), conf);
Path map = new Path(mapUri);
//若是data文件名不是data也是能够的,但这里为默认的data,因此指定MapFile.DATA_FILE_NAME便可
Path mapData = new Path(map, MapFile.DATA_FILE_NAME);
// Get key and value types from data sequence file
SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);
Class keyClass = reader.getKeyClass();
Class valueClass = reader.getValueClass();
reader.close();
// Create the map file index file
long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);
System.out.printf("Created MapFile %s with %d entries\n", map, entries);
}
}
fix()方法一般用于重建已损坏的索引,若是要将某个SequenceFile转换为MapFile,则通常通过如下几步:
一、 保证SequenceFile里的数据是按键升序存放的,不然使用MapReduce任务对文件进行一次输入输出,就会自动排序合并,如:
//建立两个SequenceFile
publicclass SequenceFileCreate {
privatestaticfinal String[] DATA = { "One, 一", "Three, 三", "Five, 五", "Seven, 七", "Nine, 九" };
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/sequence/seq1";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
Path path2 = new Path("hdfs://hadoop-master:9000/sequence/seq2");
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null, writer2 = null;
try {
//建立第一个SequenceFile
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < 10; i++) {
key.set(10 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
//建立第二个SequenceFile
writer2 = SequenceFile.createWriter(fs, conf, path2, key.getClass(), value.getClass());
for (int i = 10; i < 20; i++) {
key.set(30 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer2.getLength(), key, value);
writer2.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
IOUtils.closeStream(writer2);
}
}
}
//将前面生成的两个SequenceFile排序合并成一个SequenceFile文件
publicclass SequenceFileCovertMapFile {
publicstaticclass Mapper extends
org.apache.hadoop.mapreduce.Mapper<IntWritable, Text, IntWritable, Text> {
@Override
publicvoid map(IntWritable key, Text value, Context context) throws IOException,
InterruptedException {
context.write(key, value);
System.out.println("key=" + key + " value=" + value);
}
}
publicstaticclass Reducer extends
org.apache.hadoop.mapreduce.Reducer<IntWritable, Text, IntWritable, Text> {
@Override
publicvoid reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "SequenceFileCovert");
job.setJarByClass(SequenceFileCovertMapFile.class);
job.setJobName("SequenceFileCovert");
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
// 注意这里要设置输入输出文件格式为SequenceFile
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);//默认就是1,一个Reduce就只输出一个文件,这样就将多个输入文件合并成一个文件了
SequenceFileInputFormat.addInputPath(job, new Path("hdfs://hadoop-master:9000/sequence"));
SequenceFileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop-master:9000/sequence2map"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
二、 将SequenceFile文件名修改成data(hadoop fs -mv /sequence2map/part-r-00000 /sequence2map/data)
三、 使用最前面的MapFileFixer程序建立index
org.apache.hadoop.conf.Configuration类是用来读取特定格式XML配置文件的
<?xml version="1.0"?>
<configuration>
<property>
<name>color</name>
<value>yellow</value>
<description>Color</description>
</property>
<property>
<name>size</name>
<value>10</value>
<description>Size</description>
</property>
<property>
<name>weight</name>
<value>heavy</value>
<final>true</final> <!--该属性不能被后面加进来的同名属性覆盖-->
<description>Weight</description>
</property>
<property>
<name>size-weight</name>
<value>${size},${weight}</value><!—配置属性能够引用其余属性或系统属性-->
<description>Size and weight</description>
</property>
</configuration>
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");//若有多个XML,能够多添调用此方法添加,相同属性后面会覆盖前面的,除非前面是final属性
System.out.println(conf.get("color"));//yellow
System.out.println(conf.get("size"));//10
System.out.println(conf.get("breadth", "wide"));//wide 若是不存在breadth配置项,则返回后面给定的wide默认值
System.out.println(conf.get("size-weight"));//10,heavy
系统属性的优先级高于XML配置文件中定义的属性,但仍是不能覆盖final为true的属性:
System.setProperty("size", "14");//系统属性
System.out.println(conf.get("size-weight"));//14,heavy
系统属性还能够经过JVM参数 -Dproperty=value 来设置
虽然能够经过系统属性来覆盖XML配置文件中非final属性,但若是XML中不存在该属性,则仅配置系统属性后,经过Configuration