若是说上一篇是在阐述HDFS最基础的理论知识,这一篇就是HDFS的主要工做流程,和一些较为有用的策略node
补充一个问题,就是当咱们 NameNode 挂掉,SecondaryNameNode做为新的NameNode上位时,它确实能够根据fsimage.ckpt把一部分元数据加载到内存,但是若是这时还有一部分操做日志在edits new中没有执行怎么办?linux
这时候有一个解决方案就是利用一个network fileSystem来解决,好比说集群中有一个服务器安装了一个nfs server,而在NameNode上再安装一个nfs client,此时客户端向HDFS写数据时,同时把向edits new中写的数据写一份到nfs server中,SecondaryNamenode就能够经过这个nfs server来获取此时断层的数据了算法
其余彷佛也没啥可多说的,让咱们直奔主题吧服务器
从零开始的大数据(一) --- HDFS的知识概述(上)网络
以后的内容会围绕下图开始多线程
简单过一下图里面的角色,最大块的是一个client node,也就是说,这个节点上运行着客户端,若是实在是没搞清楚哪一个是客户端,那也很简单,平时没事就执行tcp
hadoop fs -ls /
复制代码
这个命令的机器,那就是客户端了,其余就是NameNode和DataNode,在client node上运行着一个JVM虚拟机,让HDFS client跑起来分布式
Distributed FileSystem顾名思义是一个分布式文件系统,它会经过RPC的方式远程过程调用NameNode里的open方法,这个open方法有什么做用呢,就是获取要读的文件的file block locations,也就是文件的block的位置,在上一讲咱们也已经提到了,一个文件是会分割成128M一块的大小分别存储在各个数据节点的。oop
同时在执行open方法时,客户端会产生一个FSData InputStream的一个输入流对象(客户端读数据是从外部读回来的)post
HDFS client调用FSData InputStream的read方法,同上也是远程过程调用DataNode的read方法,此时的读取顺序是由远到近,就是DataNode和client node的距离,这里所指的距离是一种物理距离,断定能够参考上一篇文章中机架的概念。
在联系上DataNode并成功读取后,关闭流就走完了一个正常的流程。
并且补充一下就是,上面Distributed FileSystem所调用的get block locations的方法只会返回部分数据块,get block locations会分批次地返回block块的位置信息。读block块理论上来讲是依次读,固然也能够经过多线程的方式实现同步读。
此时咱们会找到block另外的副本(一个block块有3个副本,上一篇已经说过了),而且经过FSData InputStream进行记录,之后就再也不从中断的副本上读了。
在上一篇中咱们提到了一个HDFS的心跳机制,DataNode会隔一小时向NameNode汇报blockReport,好比如今的状况是,block1的三个副本分别存储在DataNode1,2,3上,此时DataNode1挂掉了。NameNode得知某个block还剩2个副本,此时携带这block的其他两个副本的DataNode2,3在向NameNode报告时,NameNode就会对它们中的某一个返回一个指令,把block1复制一份给其余正常的节点。让block1恢复成本来的3个副本。
由于从DataNode上读数据是经过网络来读取的,这说明会存在读取过来的数据是不完整的或者是错误的状况。
DataNode上存储的不只仅是数据,数据还附带着一个叫作checkSum检验和(CRC32算法)的概念,针对于任何大小的数据块计算CRC32的值都是32位4个字节大小。此时咱们的FSData InputStream向DataNode读数据时,会将与这份数据对应的checkSum也一并读取过来,此时FSData InputStream再对它读过来的数据作一个checkSum,把它与读过来的checkSum作一个对比,若是不一致,就从新从另外的DataNode上再次读取。
FSData InputStream会告诉NameNode,这个DataNode上的这个block有问题了,NameNode收到消息后就会再经过心跳机制通知这个DataNode删除它的block块,而后再用相似2的作法,让正常的DataNode去copy一份正常的block数据给其它节点,保证副本数为3
try {
//
String srcFile = "hdfs://node-01:9000/data/hdfs01.mp4";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(srcFile),conf);
FSDataInputStream hdfsInStream = fs.open(new Path(srcFile));
BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream("/home/node1/hdfs02.mp4"));
IOUtils.copyBytes(hdfsInStream, outputStream, 4096, true);
} catch (IOException e) {
e.printStackTrace();
}
复制代码
写流程咱们会按照下图来进行讲解,比读数据更加复杂一丢丢,角色基本没有改变因此就不详细介绍了
客户端向HDFS写数据的时候是把文件分块存储在HDFS的各个节点上,而规定了存储位置的是NameNode,因此Client node在存储文件时须要先和NameNode进行联系让它进行分配。
和上面读的方法相似,不过此次调用的是Distributed FileSystem的create方法,此时也是经过远程调用NameNode的create方法
此时NameNode会进行的举措
1.检测本身是否正常运行
2.判断要建立的文件是否存在
3.client是否有建立文件的权限
4.对HDFS作状态的更改须要在edits log写日志记录
create方法的返回值是一个OutputStream对象,为何是output,由于是由HDFS去往DataNode去写数据,此时HDFS会调用这个OutputStream的write方法
可是有个问题,此时咱们还不知道咱们的这些block块要分别存放于哪些节点上,因此此时FSData OutputStream就要再和NameNode交互一下,远程过程调用NameNode的addBlock方法,这个方法返回的是各个block块分别须要写在哪3个DataNode上面。
此时OutputStream就完整得知了数据和数据该往哪里去写了
请看流程4.1,chunk是一个512字节大小的数据块,写数据的过程当中数据是一字节一字节往chunk那里写的,当写满一个chunk后,会计算一个checkSum,这个checkSum是4个字节大小,计算完成后一并放入chunk,因此整一个chunk大小实际上是512字节+4字节=516字节。
上述步骤结束后,一个chunk就会往package里面放,package是一个64kb大小的数据包,咱们知道64kb = 64 * 1024字节,因此这个package能够放很是多的chunk。
此时一个package满了以后,会把这个packjage放到一个data queue队列里面,以后会陆续有源源不断的package传输过来,图中用p1,p2···等表示
这时候开始真正的写数据过程
(ps:传输的类为一个叫作dataStreamer的类,并且其实addBlock方法返回的列表基本是按照离客户端物理距离由近到远的顺序的)
往DataNode上传输的同时也往确认队列ack queue上传输
针对DataNode中传输完成的数据作一个checkSum,并与本来打包前的checkSum作一个比较
校验成功,就从确认队列ack queue中删除该package,不然该package从新置入data queue重传
补充:1.以上逻辑归属于FSData OutputStream的逻辑
2.虽然自己一个block为128M,而package为64Kb,128M对于网络传输过程来讲算是比较大,拆分为小包是为了可靠传输
3.网络中断时的举措:HDFS会先把整个pineline关闭,而后获取一个已存在的完整的文件的version,发送给NameNode后,由NameNode经过心跳机制对未正确传输的数据下达删除命令
4.若是是某个DataNode不可用,在1中咱们也提到过了,经过心跳机制会通知其他的可用DataNode的其中一个进行copy到一个可用节点上
完成后经过心跳机制NameNode就能够得知副本已经建立完成,再调用addBlock()方法写以后的文件。
1.client端调用Distributed FileSystem的create,此时是远程调用了NameNode的create,此时NameNode进行4个操做,检测本身是否正常,文件是否存在,客户端的权限和写日志
2.create的返回值为一个FSData OutputStream对象,此时client调用流的write方法,和NameNode进行链接,NameNode的addBlock方法返回块分配的DataNode列表
3.开始写数据,先写在chunk,后package,置入data queue,此时两个操做,向DataNode传输,和放入ack queue,DataNode传输结束会检测checkSum,成功就删除ack queue的package,不然放回data queue重传
4.结束后关闭流,告诉NameNode,调用complete方法结束
String source="/home/node1/hdfs01.mp4"; //linux中的文件路徑,demo存在必定数据
//先确保/data目录存在
String destination="hdfs://node-01:9000/data/hdfs01.mp4";//HDFS的路徑
InputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(source));
//HDFS读写的配置文件
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destination),conf);
//调用Filesystem的create方法返回的是FSDataOutputStream对象
//该对象不容许在文件中定位,由于HDFS只容许一个已打开的文件顺序写入或追加
OutputStream out = fs.create(new Path(destination));
IOUtils.copyBytes(in, out, 4096, true);
} catch (FileNotFoundException e) {
System.out.println("exception");
e.printStackTrace();
} catch (IOException e) {
System.out.println("exception1");
e.printStackTrace();
}
复制代码
以前已经提到过,元数据是放在NameNode的内存中的,当元数据丢失,形成服务不可用,这时候就须要时间来恢复。HA就可让用户感知不到这种问题。这须要yarn,MapReduce,zookeeper的支持
仅有一个NameNode时,当NameNode挂掉就须要把fsimage读到内存,而后把edits log所记录的日志从新执行一遍,元数据才能恢复,而这种作法须要大量的时间
因此解决方案就在于咱们要花大量时间来恢复元数据metaData,因此解决的方案就是让集群瞬间变回可用状态便可。
经过设置一个stand by的NameNode,并和主NameNode的元数据保持一致,图中绿色的区域表示一个共享存储,主NameNode的元数据会传输至共享存储里面,让stand by的NameNode进行同步。
下面的DataNode会同时往两个NameNode发送blockReport,由于读取DataNode的块信息并不会很快,因此为了保证在active挂掉的时候,standby能马上顶上位置,因此要事先读取块信息,同时这也是方便standby来构建它的元数据信息的途径。
active挂掉后让stand by马上生效的机制是上面的FailoverControllerActive实现的,简称zkfc,它会定时ping主NameNode,若是发现NameNode挂掉,就会通知咱们的zookeeper集群,而后集群的另外一个FailoverControllerActive就会通知stand by。
集群中的元数据会保存在NameNode的内存中,而这些元数据每份占用约150字节,对于一个拥有大量文件的集群来讲,由于NameNode的metaData被占满,DataNode就没法写入了,联邦就能够帮助系统突破文件数上限
其实就是布置了多个NameNode来共同维护集群,来增长namespace,并且分散了NameNode的访问压力,并且客户端的读写互不影响。就是扩展性,高吞吐和隔离性。
和刚刚的联邦的介绍时的状况同样,文件数量(每一个文件元数据150byte)会影响到NameNode的内存
其实就是经过一个MR程序把许多小文件合并成一个大文件,须要启动Yarn
# 建立archive文件
hadoop archive -archiveName test.har -p /testhar -r 3 th1 th2 /outhar # 原文件还存在,需手动删除
# 查看archive文件
hdfs dfs -ls -R har:///outhar/test.har
# 解压archive文件
hdfs dfs -cp har:///outhar/test.har/th1 hdfs:/unarchivef
hadoop fs -ls /unarchivef # 顺序
hadoop distcp har:///outhar/test.har/th1 hdfs:/unarchivef2 # 并行,启动MR
复制代码
其核心是以文件名为key,文件内容为value组织小文件。10000个100KB的小文件,能够编写程序将这些文件放到一个SequenceFile文件,而后就以数据流的方式处理这些文件,也可使用MapReduce进行处理。一个SequenceFile是可分割的,因此MapReduce可将文件切分红块,每一块独立操做。不像HAR,SequenceFile支持压缩。在大多数状况下,以block为单位进行压缩是最好的选择,由于一个block包含多条记录,压缩做用在block之上,比reduce压缩方式(一条一条记录进行压缩)的压缩比高.把已有的数据转存为SequenceFile比较慢。比起先写小文件,再将小文件写入SequenceFile,一个更好的选择是直接将数据写入一个SequenceFile文件,省去小文件做为中间媒介.
此方案的代码不是很重要,因此就直接省略了,实在是想看看长啥样的能够艾特我
到此HDFS的内容就差很少了,但愿对你会有所帮助。以后会继续往下分享MapReduce,带你走完整个大数据的流程,感兴趣的朋友能够持续关注下,谢谢。