这个问题的答案来自于计算机硬盘的发展趋势:寻址时间远远比不上传输速率的提高。寻址是将磁头移动到特定的硬盘位置进行读或者写操做,而传输速率取决于硬盘的带宽。而寻址就是致使磁盘操做延迟的主要缘由。 当数据量很小时,传统的数据库依靠B树(一种数据结构,主要用于关系型数据库索引),能够实现快速的读取和更新;可是当数据量很大时,由于须要不少的“排序/合并”操做,传统的数据库系统就明显落后于MapReduce了。 java
简而言之,他们认为:1. MapReduce放弃了 不少那些通过历代数据库专家优化提出的高性能数据库技术,好比批量导入、索引、视图、更新、事物等;2. MapReduce是一个粗糙的实现,它没有索引,依靠蛮力做为处理选项;3. MapReduce并不稀奇,以前就已经有人在使用类似的概念而且作出产品来了。 node
结构化数据是指具备既定格式的实体化数据,好比XML文档或者知足特定预约义格式的数据表。这是RDBMS(关系型数据库)包含的技术。 半结构化数据比较松散,它可能有格式,可是常常被忽略,因此通常只能做为对数据结构的通常性指导。好比Excel电子表格,它在结构上是单元格组成的网格,可是你能够在单元格中保存任何格式的数据。 非结构化数据没有什么特别的内部结构,例如纯文本或者图像数据。 Hadoop对半结构化和非结构化数据很是有效,由于它是在处理数据时才对数据进行解释(所谓的“读时模式”)。这种模式在提供灵活的同时,避免了RDBMS在数据加载阶段带来的高开销,由于这在Hadoop中仅仅是一个简单的文件拷贝操做。(这也是为何Hadoop能够进行高速流式读/写的缘由) 关系型数据每每是规范的,,这主要是为了数据的完整性且不含冗余。可是规范性会给Hadoop处理带来麻烦,由于它使记录变成非本地操做,而Hadoop的核心假设之一恰恰就是能够进行(高速的)流读/写操做。(暂时还没理解这句话的意思)而WEB日志就是典型的非规范化数据,例如每次都要记录客户端主机的名字和IP,这会致使同一个客户端的全名可能出现屡次。 MapReduce以及Hadoop中其余的处理模型都是能够随着数据规模现行伸缩的(可扩展性)。对数据分区后,函数原语(如map程序和reduce程序)可以在各个分区上面并行工做。这意味着,若是输入的数据量是原来的两倍,那么做业的运行时间也须要两倍。可是若是集群的规模扩展为原来的两倍,那么做业的速度依然能够变得和原来同样快。(分数不够,拿钱来凑...)可是SQL查询不具有这样的特性。 2. 高性能计算和Hadoop计算web
高性能计算相关的组织多年以来一直研究大规模的数据处理,主要使用相似于消息传递接口MPI的相关API。 从广义上讲,高性能计算采用的方法是将做业分散到集群的各台机器上,这些机器访问存储区域网络(SAN)组成的共享文件系统。这比较适合计算密集型的做业。可是若是节点须要访问的数据量更庞大,不少计算节点就会由于网络带宽的瓶颈而不得不闲下来等数据。 而对于Hadoop而言,它尽可能在计算节点上存储数据,以实现数据的本地快速访问。这在Hadoop中称为数据本地化。意识到网络带宽是数据中心环境最珍贵的资源(处处复制数据很容易耗尽网络带宽)以后,Hadoop采用显式网络拓扑结构来保留网络带宽(将网络当作一棵树,两个节点之间的距离使他们到共同的祖先的距离综合)。 3. MapReduce数据流(P31 2.4 横向扩展) 首先定义一些术语:MapReduce做业(Job)是客户端须要执行的一个工做单元,它包括输入数据、MapReduce程序和配置信息。而Hadoop将做业分红若干任务(task)来执行,任务有两类:map任务和reduce任务。这些任务运行在集群的节点上,并经过Yarn调度。若是一个任务失败,它Yarn将在另一个不一样的节点上自动从新调度运行。Hadoop将MapReduce的输入数据划分红等长的数据块,称为“输入分片”,简称“分片”。 算法
这个数据块的大小应该不能超过HDFS的块大小,缘由是:1. 数据本地化。Hadoop在存储有HDFS数据的节点上运行Map任务能够得到最佳性能,由于它无需使用宝贵的集群带宽资源;若是在本地找不到,Yarn就会调度同一个机架上的空闲slot来运行该任务,此时属于同机架内不一样节点间调度,有时候还会跨机架调度,可是这种状况几乎见不到。2. 若是分片后的数据跨过了两个HDFS块,因为Hadoop的容灾机制,任何一个节点都不可能同时存储这两个数据块,所以不免会跨节点甚至是跨机架,效率会变得很低。 shell
由于map的输出是中间结果,这个中间结果还要通过reduce处理后才能产生最终结果。并且一旦map做业成功完成,这个中间结果就会被删除;一旦执行失败,就会被从新调度运行。若是把这个中间结果存储在HDFS上并实现备份,未免有点小题大作。 数据库
集群的带宽资源很宝贵,所以尽可能避免map任务和reduce任务之间的数据传输是有必要的。Hadoop容许用户针对map的输出指定一个combiner(就像map同样),combiner的输入来自map,输出做为reduce的输入。combiner属于优化方案,因此有没有combiner、或者调用多少次combiner,reduce输出的结果都应该是同样的。 简单理解,combiner至关于对map的输出结果进行一个相似于reduce的补充运算,以此来减小mapper和reducer之间的数据传输量。举一个例子,咱们的目的是统计1950年全国最高气温,第一个map的输出为((1950,0),(1950,10),(1950,20)),第二个map的输出为((1950,25),(1950,20)),没有combiner的时候,reduce的输入为(1950,[0,10,20,25,20]),最后的输出为(1950,25)。当咱们使用combiner以后,第一个map的输出为(1950,20),第二个map的输出为(1950,25),那么reduce的输入为(1950,[20,25]),最后仍是输出(1950,25)。更简单的说,咱们能够经过下面的表达式来讲明这个计算过程: 没有使用combiner:max(0,10,20,25,20)=25。 没有使用combiner:max(max(0,10,20),max(25,20))=max(20,25)=25。 4. 使用本身喜欢的语言实现MapReduce程序(P37 2.5 HadoopStreaming)apache
Hadoop提供了MapReduce的API,容许编程人员使用其余的非Java语言来写本身的map函数和reduce函数。Hadoop Streaming 使用 Unix标准输入输出流做为Hadoop和应用程序之间的接口,因此,任何能进行Unix输入与输出的编程语言都能实现MapReduce程序。 map函数的输入来自标准输入,输出结果写到标准输出。map输出的键-值对是以一个制表符(\t)分割的行,而且是通过对键排序的。 reduce的输入来自于标准输入,输出结果写到标准输出(Hadoop能够接受此标准输出,并将其持久化)。reduce的输入与map的输出相同。 5. HDFS(P42 3.1 HDFS的设计)编程
刚开始我对这块也很好奇:为何该接口不在确认存入第一个节点以后就返回结果,以后节点之间采用异步的方式将副本同步?后面我看到了这句话:只要写入了dfs.namenode.replication.min的副本数(默认为1),写操做就会成功,而且这个块能够在集群中异步复制,直到达到其目标副本数(dfs.replication的默认值为3)。 namenode选择在哪一个datanode上存储副本,要从可靠性、写入带宽、读取带宽之间进行权衡。通常的作法是:第一个副本放在运行客户端的节点上(若是运行客户端的节点不是datanode,那么就随机选择一个不那么忙也不那么满的节点);第二个节点存放在与第一个节点不一样的机架上的任意一个节点;第三个节点存放在与第二个节点相同机架的另外一任意节点。 6. HDFS的I/O操做(P96 5.1)bootstrap
写数据时:由DataNode负责对所要存储的数据的验证操做。写数据的客户端将数据以及校验和发送给一系列DataNode组成的管线,管线中的最后一个DataNode负责校验操做。若是最后一个DataNode检测到错误,客户端就会收到一个IOException。 读数据时:由客户端进行验证操做。每个DataNode都会保存一个用于验证的校验和日志,里面记录每个数据块的最后一个验证时间。客户端收到数据以后进行验证,若是成功,则修改该日志;若是失败,会执行下面的操做:1. 向NameDode汇报出错的数据块block以及这个数据块所在的DataNode,同时抛出ChecksumException异常;2. NameNode将这个block标记为已损坏,这样它就不会再将一样的客户端请求发送到这个节点,同时尝试将这个block的一个副本复制到另外的DataNode,使得数据块的副本因子回到指望值;3. 将这个已经损坏的block删除。 - 序列化操做?bash
序列化是指将结构化对象转换为字节流以便在网络上传输或者写入到磁盘中进行永久存储的过程。反序列化是指将字节流转回结构化对象的逆过程。 序列化在分布式数据处理有两大应用场景:进程间通讯和永久存储。 在Hadoop生态中,系统中多个节点上的进程之间的通讯是经过**RPC(Remote Produce Call,远程过程调用)**实现的。RPC协议将消息序列化为二进制流以后发送到远端节点,远端节点接收到二进制流以后将其反序列化为原始消息。一般状况下,RPC序列化有如下特色:紧凑、快速、可扩展、支持互操做。Hadoop编程中使用Writable接口实现序列化。
具体由输入文件数目、输入文件大小、配置的参数决定的。 首先了解配置参数:
mapreduce.input.fileinputformat.split.minsize // 启动map的最小split size ,默认为0
mapreduce.input.fileinputformat.split.maxsize // 启动map的最大split size ,默认为256
dfs.block.size // Hadoop系统中的block块大小,默认为128M
splitsize = Math.max(minsize, Math.min(maxSize,blockSize))
复制代码
例如:默认状况下,一个输入文件800M,那么mapper的数量应该为7个,其中6个大小为128M,1个大小为32M; 再例如:一个目录下有三个文件,大小分别为5M、10M和150M,那么这个时候会产生四个mapper,它们所处理的数据大小分别为5M、10M、128M和22M; 固然咱们也能自定义mapper的数量。好比使上面的mapper数量变成2,一个处理大小为128M、另外一个处理大小为37M(5 + 10 + 22),具体实现能够经过设置具体的参数。 2. 环形缓冲区细节:
map task的任务输出首先会进入到一个缓冲区内,这个缓冲区默认大小为100M,当缓冲区达到容量的80%的时候,一个单独的守护进程——spill(溢写)进程会将缓冲区的内容溢写(spill)到本地磁盘。溢写是一个单独的进程,不会影响map端的继续输出,可是当溢写的过程当中写入速度过快致使缓冲区满,那么map的写操做就会被阻塞,直到溢写完成。 3. partition细节:
首先要知道的是,默认状况下,无论有多少个map任务,一个reduce任务只会产生一个输出文件。可是有时候咱们须要将最终的输出数据分散到不一样的文件中去,好比按照省份划分,将同一个省份的数据写入同一个文件中,最终有多个文件。而最终的数据来源于reduce,也就是说,若是要获得多个文件,意味着须要一样数量的reducer运行。而reduce的数据输入来自于map,也就是说,咱们要实现多个reduce,得根据map的不一样输出作手脚,将不一样的map输出按照自定义规则分配给不一样的reduce任务。而map任务划分数据的过程称为partition。 partition就是提早对输入进行处理,根据自定义的reduce进行分区,到了reduce处理的时候,只须要处理对应的分区数据就好了。 默认的partition方法以下所示:
public int getPartition(k Key, v Value, int numReduceTasks){
return (key.hashCode() & Inter.MAX_VALUE) % numReduceTasks;
}
复制代码
前面括号内的结果表示将key的hash值变成一个非负值。numReduceTasks指的是reducer的数量,默认值是1。由于任何一个非负整数除以1的结果是0,也就是说,在默认状况下,getPartition方法的返回值老是1。也就是说,mapper的任务输出老是送给一个reducer,最终只能输出到一个文件中去。这里须要注意的是,若是numReduceTasks的数量为0,那么map会将结果直接刷写到HDFS上去做进一步处理。 固然咱们能够重载此方法,进而实现自定义的partition。 对map输出的每个键值对,系统都会经过这个方法给定一个partition。若是一个键值对的partition值为0,那么它将会交给第1个reducer去处理,partition值为1,会交给第二个reducer去处理。 4. sort和spill细节:
当溢写线程启动的时候,须要对缓冲区中80%的数据作排序。这里的排序是针对序列化的字节根据键值作排序,使用Hadoop本身定义的排序算法,具体实现细节为快速排序法(先按照分区编号partition进行排序,再按照key进行排序),这样的结果是,数据以分区汇集在一块儿,而且同一个分区中的数据按照key有序。 溢写的时候,按照分区编号由小到大依次将每一个分区中的数据写入到任务工做目录的临时文件output/spillN.out(N表示当前溢写的次数)中。 若是设置了combiner,那么这个时候也会进行combiner操做。combiner会将形同key的key/value对的value按照设定的操做,好比加起来,减小溢写到磁盘的数据量。combiner会优化map的输出结果,可是不能对reduce的输出结果产生影响,在这个前提下,combiner是个好东西,会在模型中屡次使用。 在优化的时候,为了尽可能减小对磁盘文件的I/O操做,能够在这一步对spill的文件进行压缩,使其编程IFile格式,这也将有利于后面的merge操做,由于merge的时候须要根据spill后的文件大小进行排序操做。 5. merge细节:
每一次的spill都会产生一个spill文件,因此map task计算的时候会不断产生不少的spill文件,在map task结束以前会对这些spit文件进行合并造成一个已分区而且排序的输出文件(能够控制一次可以合并多少流,默认是10)0,最终的文件也只有一个,这个过程就是merge。 让每个map task最终只生成一个数据文件,能够避免同时打开大量文件金额同时读取大量小文件产生的随机读取带来的开销。 每个map task都有一个缓冲区,存储着map的输出结果,当缓冲区快满的时候须要将缓冲区的数据以临时文件的形式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的全部临时文件作合并,生成最终的正式输出文件,而后等待reduce task来拉取数据。 merge采用多轮递归合并的方式,首先根据全部文件的大小创建小根堆,而后选取前十个元素,依次迭代读取spill下来的文件中的key-value,并将生成的文件又放回到原来的文件中,以后再次创建小根堆,再执行上面的操做。因此merge的过程能够当作建堆=>选取前十个元素迭代合并=>再建堆=>再合并……
shuffle要作的事是怎么把map task的输出结果有效地传送到reduce端,或者能够这样理解,shuffle描述着数据从map task到reduce task输入的这段过程。 在map处理阶段结束后,会将多个文件合并成一个文件,这个文件中相同分区的数据放在一块儿,同一个分区中的数据按照key有序。在reduce阶段须要从多个map task中获取属于该reduce的分区的结果值,而后根据获取到的文件大小决定放在内存中仍是刷写到磁盘中,也就是说,从每个map中获取的数据是一段内存或者一个文件,而后对内存和文件进行和map阶段类似的merge操做,将结果中的每一行key/value执行reduce函数。
咱们的目的是统计两个文件中每一个单词出现的总次数。 首先建立两个文件,做为咱们的输入:
file 1:
His name is Tom
Tom comes from Yunge
file 2:
His name is Jerry
Jerry comes from Lingge
复制代码
map,映射,也就是拆解的意思。 咱们的输入是两个文件,在默认状况下,会产生两个split,也就是两个mapper:mapper1和mapper2。 接下来,这两个mapper会分别将文件内容分解为单词和1(注意,这里的1不是具体数量,只是数字1),其中单词是咱们的主键也就是key,后面的数字就是对应的value。 那么每个mapper对应的输出为: mapper1:
His 1
name 1
is 1
Tom 1
Tom 1
comes 1
from 1
Yunge 1
复制代码
mapper2:
His 1
name 1
is 1
Jerry 1
Jerry 1
comes 1
from 1
Lingge 1
复制代码
partition,分区。为何要分区?由于后面会有多个reducer,每个reducer只干本身的事,这回=会让效率提高很多。partition就是提早对输入进行处理,根据自定义的reduce进行分区,到了reduce处理的时候,只须要处理对应的分区数据就好了。 那么如何分区呢?主要依据就是按照key将数据按照reduce分红对应数量的组,就像汇总硬币同样,一元的进入1号桶,5角的进入2号桶,一角的进入三号桶。这个很重要的一点是须要保证key的惟一性,所以最多见的方法就是使用hash函数。这里咱们假设有两个reducer,咱们将首字母对应的字母顺序进行除2取模,所以每个mapper进行partition以后的结果以下:
mapper1:
partition 1:
His 1
is 1
comes 1
from 1
partition 2:
name 1
Tom 1
Tom 1
Yunge 1
复制代码
mapper2:
partition 1:
His 1
is 1
Jerry 1
Jerry 1
comes 1
from 1
Lingge 1
partition 2:
name 1
复制代码
其中partition 1是给reducer 1处理的,partition 2是给reducer 2处理的。 能够看到,partition只是按照key进行了简单的分区,并无任何别的处理,而且每个分区中的key不会出如今另外一个分区里面。
sort,排序。由于后面的reducer也会作排序,可是它只是作一个归并排序,要求每个mapper的输出结果也是基于key有序的。这里咱们根据首字母进行字典排序:
mapper1:
partition 1:
comes 1
from 1
His 1
is 1
partition 2:
name 1
Tom 1
Tom 1
Yunge 1
复制代码
mapper2:
partition 1:
comes 1
from 1
His 1
is 1
Jerry 1
Jerry 1
Lingge 1
partition 2:
name 1
复制代码
能够看到,每个partition中的数据都按照key作了排序。
combine能够理解成一个mini reducer,它发生在spill到本次磁盘过程以前,目的就是把送到reducer的数据实现进行一次计算,以减小文件大小、减小对网络带宽的消耗。可是要注意的是,combine操做是可选的,若是要加上,请务必保证通过combine以后的数据不会对最终的reduce结果产生影响。下面咱们执行combine:
mapper1:
partition 1:
comes 1
from 1
His 1
is 1
partition 2:
name 1
Tom 2
Yunge 1
复制代码
mapper2:
partition 1:
comes 1
from 1
His 1
is 1
Jerry 2
Lingge 1
partition 2:
name 1
复制代码
由于最后reducer执行的操做是add,那么提早add和后面add的效果是同样的,所以这个combiner是有效的。能够看到,结果中,对重复的单词进行了简单的汇总。
copy,也叫shuffle,就是reducer本身从mapper拉去数据。每个reducer只拉取属于本身partition的数据,结果以下:
reducer 1:
partition 1:(来自mapper1)
comes 1
from 1
His 1
is 1
partition 1:(来自mapper2)
comes 1
from 1
His 1
is 1
Jerry 2
Lingge 1
复制代码
reducer 2:
partition 2:(来自mapper1)
name 1
Tom 2
Yunge 1
partition 2:(来自mapper2)
name 1
复制代码
能够看到,经过shuffle操做,相同partition的数据落到了同一个节点(reducer)上。
merge,合并。将reducer获得的文件合并成同一个文件,须要注意的是,这个过程也包含了排序。结果以下:
reducer 1:
comes 1
comes 1
from 1
from 1
His 1
His 1
is 1
is 1
Jerry 2
Lingge 1
复制代码
reducer2:
partition 2:
name 1
name 1
Tom 2
Yunge 1
复制代码
reduce,归并。最终的一步,将相同的key的value加1。结果以下图:
reducer 1:
comes 2
from 2
His 2
is 2
Jerry 2
Lingge 1
复制代码
reducer2:
partition 2:
name 1
Tom 2
Yunge 1
复制代码
大功告成!咱们统计了两个文件中每个单词的数目,由于有两个reducer,所以输出结果会有两个文件,即part-000000和part-00001。
0. 预览被统计的文件格式
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
<outputDirectory>./lib</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
复制代码
2. 新建mapper内部类
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@java.lang.Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String words = value.toString(); // 传进来一行数据,先将其转换成string
String[] wordsArr = words.split(","); // 由于是csv文件,因此使用","做为列分隔符
context.write(new Text(wordsArr[0]), new LongWritable(1)); // 以省份做为键值,value固定为1
context.write(new Text(wordsArr[0] + "," + wordsArr[1]), new LongWritable(1)); // 以(省份,市)做为键值,value固定为1
}
}
复制代码
2. 新建reducer内部类
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
/** * 输入格式 : <word,[1,1,1,1]> */
Long sum = 0L;
for (LongWritable value : values
) {
sum += value.get(); // 实现value的累加
}
context.write(key, new LongWritable(sum)); // 将结果输出
}
}
复制代码
3. 实现主方法
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 建立一个job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word-count");
// 2. 将类名打包成Jar包
job.setJarByClass(WordCount.class);
// 3. 输入文件地址
FileInputFormat.addInputPath(job, new Path(args[0]));
// 4. mapper处理逻辑
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 5. reducer处理逻辑
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 6. shuffle过程
// 暂不处理
// 7. 定义输出地址
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8. 运行结果显示,若是成功就输出"成功",不然输出"失败"
boolean result = job.waitForCompletion(true);
System.out.println(result ? "成功": "失败");
}
复制代码
4. 整个WordCount.java完成代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String words = value.toString();
String[] wordArr = words.split(","); // 将每一行数据拆分红每一个单词
// context.write(new Text(wordArr[0]), one); // 统计某个省的学校数量
context.write(new Text(wordArr[0] + "," + wordArr[1]), one); // 统计某个市的学校数量
}
}
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
/** * 输入格式 : <word,[1,1,1,1]> */
Long sum = 0L;
for (LongWritable value : values
) {
sum += value.get(); // 实现value的累加
}
context.write(key, new LongWritable(sum)); // 将结果输出
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 建立一个job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word-count");
// 2. 将类名打包成Jar包
job.setJarByClass(WordCount.class);
// 3. 输入文件地址
FileInputFormat.addInputPath(job, new Path(args[0]));
// 4. mapper处理逻辑
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 5. reducer处理逻辑
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 6. shuffle过程
// 暂不处理
// 7. 定义输出地址
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8. 运行结果显示,若是成功就输出"成功",不然输出"失败"
boolean result = job.waitForCompletion(true);
System.out.println(result ? 1 : 0);
}
}
复制代码
5. 将代码打包成jar包
yarn jar ProvinceCount.jar WordCount hdfs://master:8020/test_data/senior_school_name.csv hdfs://master:8020/test_data/province_out/
# ProvinceCount.jar为jar包名称,WordCount为主类,hdfs://master:8020/test_data/senior_school_name.csv为输入文件(这里是HDFS上的文件),hdfs://master:8020/test_data/province_out/是结果输出文件夹
复制代码
6. 执行结果: 你能够经过UI界面查看执行状况:
[root@master hadoop-2.9.2]# yarn jar ProvinceCount.jar WordCount hdfs://master:8020/test_data/senior_school_name.csv hdfs://master:8020/test_data/province_out/
18/12/25 04:33:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/25 04:33:37 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.111.132:8032
18/12/25 04:33:37 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/12/25 04:33:43 INFO input.FileInputFormat: Total input files to process : 1
18/12/25 04:33:45 INFO mapreduce.JobSubmitter: number of splits:1
18/12/25 04:33:45 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/12/25 04:33:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1545718642445_0006
18/12/25 04:33:47 INFO impl.YarnClientImpl: Submitted application application_1545718642445_0006
18/12/25 04:33:47 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1545718642445_0006/
18/12/25 04:33:47 INFO mapreduce.Job: Running job: job_1545718642445_0006
18/12/25 04:34:42 INFO mapreduce.Job: Job job_1545718642445_0006 running in uber mode : false
18/12/25 04:34:42 INFO mapreduce.Job: map 0% reduce 0%
18/12/25 04:36:12 INFO mapreduce.Job: map 100% reduce 0%
18/12/25 04:36:21 INFO mapreduce.Job: map 100% reduce 100%
18/12/25 04:36:22 INFO mapreduce.Job: Job job_1545718642445_0006 completed successfully
18/12/25 04:36:22 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=660893
FILE: Number of bytes written=1719115
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1536880
HDFS: Number of bytes written=362
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=84094
Total time spent by all reduces in occupied slots (ms)=6551
Total time spent by all map tasks (ms)=84094
Total time spent by all reduce tasks (ms)=6551
Total vcore-milliseconds taken by all map tasks=84094
Total vcore-milliseconds taken by all reduce tasks=6551
Total megabyte-milliseconds taken by all map tasks=86112256
Total megabyte-milliseconds taken by all reduce tasks=6708224
Map-Reduce Framework
Map input records=38356
Map output records=38356
Map output bytes=584175
Map output materialized bytes=660893
Input split bytes=116
Combine input records=0
Combine output records=0
Reduce input groups=31
Reduce shuffle bytes=660893
Reduce input records=38356
Reduce output records=31
Spilled Records=76712
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=1346
CPU time spent (ms)=12980
Physical memory (bytes) snapshot=332660736
Virtual memory (bytes) snapshot=4213764096
Total committed heap usage (bytes)=137498624
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1536764
File Output Format Counters
Bytes Written=362
成功
[root@master hadoop-2.9.2]#
复制代码
查看HDFS文件系统中是否有文件输出:
# 在集群shell中输入:
hdfs dfs -text /test_data/province_out/part-r-00000
# 结果以下:
复制代码
# 资源说明:
# 1. 三台服务器,分别为master1,master2,slave1
# 2. 使用master1和master2做为HA中的namenode,只有slave1一个datanode
复制代码
hdfs-site.xml配置
<property>
<name>dfs.nameservices</name>
<value>haojiCluster</value>
<description>集群服务ID</description>
</property>
<property>
<name>dfs.ha.namenodes.haojiCluster</name>
<value>master1,master2</value>
<description>集群服务ID内含有的namenode</description>
</property>
<property>
<name>dfs.namenode.rpc-address.haojiCluster.master1</name>
<value>master1:8020</value>
<description>datanode和namenode RPC通讯地址1</description>
</property>
<property>
<name>dfs.namenode.rpc-address.haojiCluster.master2</name>
<value>master2:8020</value>
<description>datanode和namenode RPC通讯地址1</description>
</property>
<property>
<name>dfs.namenode.http-address.haojiCluster.master1</name>
<value>master1:50070</value>
<description>访问namenode的http地址(好比ui界面)</description>
</property>
<property>
<name>dfs.namenode.http-address.haojiCluster.master2</name>
<value>master2:50070</value>
<description>访问namenode的http地址(好比ui界面)</description>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://master1:8485;master2:8485;slave1:8485/haojiCluster</value>
<description>配置journalnode集群的访问地址</description>
</property>
<property>
<name>dfs.client.failover.proxy.provider.haojiCluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
<description>配置dfs客户端,用来判断哪一个namenode处于活跃状态</description>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
<description>为了防止脑裂现象,须要配置一个解决方案,让备用的那个namenode可以经过这个方式去杀掉那个进程,好比ssh</description>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>~/.ssh/id_rsa</value>
<description>既然须要ssh,那么就须要ssh免密登陆</description>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/usr/local/hadoop-2.9.2/journal_data</value>
<description>journal节点目录</description>
</property>
<property>
<name>dfs.replicatio</name>
<value>3</value>
<description>集群副本数</description>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:////usr/local/hadoop-2.9.2/hdfs/name</value>
<description>hadoop的name目录路径</description>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:////usr/local/hadoop-2.9.2/hdfs/data</value>
<description>hadoop的data目录路径</description>
</property>
<property>
<name>dfs.namenode.servicerpc-address</name>
<value>master1:10000</value>
<description>hadoop的name目录路径</description>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
<description>指定在namenode和DataNode之间是否开启webHDFS功能</description>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
<description>经过UI操做hdfs时是否须要权限认证</description>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
<description>自动failover启动</description>
</property>
复制代码
core-site.xml
<property>
<name>ha.zookeeper.quorum</name>
<value>master1:2181,master2:2181,slave1:2181</value>
<description>zookeeper集群访问地址</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://haojiCluster</value>
<description>集群对外访问ID,客户端拿着这个ID去访问zookeeper集群查出处于活跃状态的namenode的ip和端口,再进行链接</description>
</property>
<property>
<name>ha.zookeeper.session-timeout.ms</name>
<value>30000</value>
<description>zkfc超过这个时间连不上zookeeper就会自动退出,默认5s</description>
</property>
<property>
<name>ipc.client.connect.max.retries</name>
<value>20</value>
<description>Indicates the number of retries a clientwill make to establisha server connection.</description>
</property>
<property>
<name>ipc.client.connect.retry.interval</name>
<value>5000</value>
<description>Indicates the number of milliseconds aclient will wait for before retrying to establish a server connection.</description>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop-2.9.2/tmp</value>
<description></description>
</property>
复制代码
将这个文件复制到其余集群 具体操做
cd /usr/local/hadoop-2.9.2
# 0. 全部节点上启动journal node
sbin/hadoop-daemon.sh start journalnode
# 1. master1上初始化namenode
bin/hdfs namenode -format
# 2. 在master1上启动namenode
sbin/hadoop-daemon.sh start namenode
# 3. master2上初始化另外一个namenode
bin/hdfs namenode -bootstrapStandby
# 4. master2上启动namenode
sbin/hadoop-daemon.sh start namenode
# 5. 启动zk集群
$ZOOKEEPER_HOME/bin/zkServer.sh start
# 6. 在任何一个能链接上zk的机器上格式化zk(建立hadoop-ha znode节点)
bin/hdfs zkfc -formatZK
# 7. 在两个namenode上启动zkfc
sbin/hadoop-daemon.sh --script bin/hdfs start zkfc
复制代码
打开每个namenode的UI界面,能够发现至少有一个namenode处于active状态,另外的处于standby状态。须要注意的是,处于standby的节点,没有open权限,也就是说不能查看集群上的具体文件结构。 这个时候已经启动了HA集群,能够作个实验:将master1直接kill -9,而后发现master1的namenode进程退出,而master2的状态由standby变成active。