随着数据量愈来愈大,在一个操做系统中存不下全部的数据。须要将这些数据分配到更多的操做系统中,带来的问题是多操做系统不方便管理和维护。须要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS是分布式文件管理系统中的一种html
HDFS(Hadoop Distributed File System)它是一个文件系统,用于存储文件,经过目录树来定位文件。其次,他是分布式的,由不少服务器联合起来实现其功能,集群中的服务器有各自的角色java
HDFS 的使用场景:适合一次写,屡次读的场景,且不支持文件的修改。适合用来作数据分析node
HDFS 中的文件在物理上是分块存储(Block),块的大小能够手动配置参数 dfs.blocksize
来修改(Hadoop 2.x 是 128m,以前是 64m)linux
广泛认为,寻址时间(即查找目标 block 的时间)为 10ms,而寻址时间为传输时间的 1% 时,为 HDFS 运行的理想最佳状态。此时传输时间为 10ms / 1% = 1000ms = 1s,而目前硬盘的传输速度广泛为 100m/s ,所以 block 的大小取 1s*100m/s = 100m。离它最近的 2 的次幂就是 128 了。这块能够看出,影响 block 大小的主要因素就是硬盘的读取速度。所以当采用固态硬盘的时候彻底能够把数值调整到 256 m 甚至更多。git
块过小的时候,会增长寻址时间github
但当块变得很大时,就要想办法避免热点数据的频繁读取了。这一点在 Google 的论文中有提到,论文中给到的解决思路是客户端缓存,可是并无提具体实现 https://www.cnblogs.com/zzjhn/p/3834729.htmlshell
bin/hadoop fs 具体命令 OR bin/hdfs dfs 具体命令apache
dfs是fs的实现类缓存
命令 | 解释 | 示例 | 备注 |
---|---|---|---|
-ls | 显示目录信息 | ||
-mkdir | 在HDFS上建立目录 | hadoop fs -mkdir -p /user/keats/love | -p 建立多级目录 |
-moveFromLocal | 从本地剪切粘贴到HDFS | hadoop fs -moveFromLocal ./yaya.txt /user/keats/love/ | 前面是来源路径 后面是目标路径,下同 |
-appendToFile | 追加一个文件到已经存在的文件末尾 | ||
-cat | 显示文件内容 | ||
-chgrp 、-chmod、-chown | Linux文件系统中的用法同样,修改文件所属权限 | ||
-copyFromLocal | 从本地文件系统中拷贝文件到HDFS路径去 | 同 -put | |
-copyToLocal | 从HDFS拷贝到本地 | 同 -get | |
-getmerge | 合并下载多个文件 | hadoop fs -getmerge /user/keats/love/* ./yaya.txt | |
-tail | 显示一个文件的末尾 | ||
-rm | 删除文件或文件夹 | ||
-rmdir | 删除空目录 | ||
-du | 统计文件夹的大小信息 | 能够理解为 disk use | |
-setrep | 设置HDFS中文件的副本数量 | 里设置的副本数只是记录在NameNode的元数据中,是否真的会有这么多副本,还得看DataNode的数量。由于目前只有3台设备,最多也就3个副本,只有节点数的增长到10台时,副本数才能达到10 |
项目地址 https://github.com/keatsCoder/HdfsClientDemo安全
建立 maven 项目,引入依赖
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies>
建立 Java 类,HdfsClient 主要进行了三步操做
public class HdfsClient { static FileSystem fs; static { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://linux102:9000"); try { fs = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { // 执行操做 mkDir(); // 释放资源 fs.close(); } private static void mkDir() throws IOException { fs.mkdirs(new Path("/john/keats")); } }
尝试运行,会获得第一个错误,大意是权限被拒绝,这个时候就须要配置 JVM 参数 -DHADOOP_USER_NAME=root
来告诉集群,使用 root 用户进行操做
配置好以后再运行,会遇到第二个错误
Could not locate Hadoop executable: D:\develop\hadoop\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems
这是由于咱们以前配置环境的时候,解压的 hadoop 文件 bin 目录下没有 winutils.exe 这个文件,根据后面地址 wiki 百科的指示,能够下载该文件放在 bin 目录下。可是目前那个文件的最新版本是 2.8.1,也许会存在某些方面不兼容的问题,目前还暂时没有发现。所以能够直接下载该版本使用 https://github.com/steveloughran/winutils/releases
FileSystem.get() 有一个重载方法,三个参数,第一个是 hadoop namenode 地址,第二个是 conf 对象,第三个是用户名。能够一次配好
测试方法详见示例代码 HdfsClient2.java 类
fs = FileSystem.get(new URI("hdfs://linux102:9000"), conf, "root");
在项目 resource 目录下建立文件 zhangsan.txt
调用 copyFromLocalFile 方法上传文件
private static void uploadFile() throws IOException { URL systemResource = ClassLoader.getSystemResource("zhangsan.txt"); String path = systemResource.getPath(); fs.copyFromLocalFile(new Path(path), new Path("/john/keats/love")); }
copyFromLocalFile 还有三个重载方法,分别提供如下功能
以前咱们在 hadoop 集群配置的副本数量是 3 ,而 hadoop client 也支持两种方式配置参数
加上默认的 default-xxxx.xml 一共四种配置的方式。他们的优先级是
conf > resources 下的配置文件 > hadoop 集群配置文件 > default
ConfigFileTest.java 类对此处的配置进行的说明与测试,读者能够运行体验
fs.copyToLocalFile(new Path("/three.txt"), new Path("D://zhangsan.txt"));
copyToLocalFile 还有两个重载方法,分别添加了。具体代码可参考 DownLoadFileTest.java
// 是否删除源文件 boolean delSrc
// 是否使用RawLocalFileSystem做为本地文件系统 // 默认是 false,目前比较直观的就是 false 状态下下载文件会同时生成 .crc 格式的校验文件,设置为 true 时不会生成 boolean useRawLocalFileSystem
删除文件的API,第二个参数表示是否递归删除。若是要删除的 Path 对应的是文件夹,recursive 须要设置为 true ,不然会抛异常。其实就至关于 rm -r
中的参数 -r
public abstract boolean delete(Path f, boolean recursive) throws IOException;
private static void deleteFile() throws IOException { // /john/keats 是文件夹目录,递归设置为 false 会报错 PathIsNotEmptyDirectoryException: ``/john/keats is non empty': Directory is not empty // fs.delete(new Path("/john/keats"), false); // 先上传,再删除 HdfsClient2.uploadFile(true); fs.delete(new Path("/john/keats/love/zhangsan.txt"), true); }
这块我在删除以前添加了上传操做,目的是为了防止文件不存在。而上传又存在一种可能就是目标文件已存在。这是个薛定谔的文件- -,所以我查看了 FileSystem 的上传 API,他是提供 overwrite 开关的,默认是 true 即覆盖目标文件
private static void renameFile() throws IOException { String dstFileName = "wangwu.txt"; HdfsClient2.uploadFile(); deleteFile(dstFileName); // 目标文件不存在,则改名成功 boolean rename = fs.rename(new Path("/john/keats/love/zhangsan.txt"), new Path("/john/keats/love/", dstFileName)); Assert.assertTrue(rename); // 目标文件存在,则改名失败 boolean renameButDstIsExist = fs.rename(new Path("/john/keats/love/zhangsan.txt"), new Path("/john/keats/love/", dstFileName)); Assert.assertFalse(renameButDstIsExist); }
public static void listFiles() throws IOException { // 获取文件详情 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus status = listFiles.next(); // 输出详情 // 文件名称 System.out.println(status.getPath().getName()); // 长度 System.out.println(status.getLen()); // 权限 System.out.println(status.getPermission()); // 分组 System.out.println(status.getGroup()); // 获取存储的块信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 获取块存储的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("-----------分割线----------"); } }
判断某路径下的内容是文件仍是文件夹
public static void isFile() throws IOException { // FileStatus[] listStatus = fs.listStatus(new Path("/")); FileStatus[] listStatus = fs.listStatus(new Path("/three.txt")); for (FileStatus fileStatus : listStatus) { // 若是是文件 if (fileStatus.isFile()) { System.out.println("f:"+fileStatus.getPath().getName()); }else { System.out.println("d:"+fileStatus.getPath().getName()); } } }
// 从本地上传到HDFS public static void copyFileFromDiskByIO() throws IOException { // 2 建立输入流 FileInputStream fis = new FileInputStream(new File("D:/zhangsan.txt")); // 3 获取输出流 FSDataOutputStream fos = fs.create(new Path("/zhangsan.txt")); // 4 流对拷 IOUtils.copyBytes(fis, fos, conf); } // 从HDFS拷贝到本地 public static void copyFileFromHDFSByIO() throws IOException { FSDataInputStream fis = fs.open(new Path("/zhangsan.txt")); // 3 获取输出流 FileOutputStream fos = new FileOutputStream(new File("D:/zhangsan1.txt")); // 4 流的对拷 IOUtils.copyBytes(fis, fos, conf); }
/** * 从某个位置开始拷贝文件,用于读取某个完整文件的部份内容 */ public static void copyFileSeek() throws Exception{ // 2 打开输入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.10.1.tar.gz")); // 3 定位输入数据位置 fis.seek(1024*1024*128); // 4 建立输出流 FileOutputStream fos = new FileOutputStream(new File("D:/hadoop-2.7.2.tar.gz.part2")); // 5 流的对拷 IOUtils.copyBytes(fis, fos, conf); }
1)客户端经过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在
2)NameNode返回是否能够上传
3)客户端请求第一个 Block上传到哪几个DataNode服务器上(根据服务器距离以及负载排序,取前副本数个服务器返回)
4)NameNode返回3个DataNode节点,分别为dn一、dn二、dn3
5)客户端经过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,而后dn2调用dn3,将这个通讯管道创建完成
6)dn一、dn二、dn3逐级应答客户端
7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答
8)当一个Block传输完成以后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)
1)客户端经过Distributed FileSystem向NameNode请求下载文件,NameNode经过查询元数据,找到文件块所在的DataNode地址
2)挑选一台DataNode(就近原则,而后随机)服务器,请求读取数据
3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来作校验)
4)客户端以Packet为单位接收,先在本地缓存,而后写入目标文件
在HDFS写数据的过程当中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。那么这个最近距离怎么计算呢?
节点距离:两个节点到达最近的共同祖先的距离总和
机架感知说明
For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack.
这样布置,第一考虑的是速度,也兼顾了容灾的要求
首先,咱们作个假设,若是存储在NameNode节点的磁盘中,由于常常须要进行随机访问,还有响应客户请求,必然是效率太低。所以,元数据须要存放在内存中。但若是只存在内存中,一旦断电,元数据丢失,整个集群就没法工做了。所以产生在磁盘中备份元数据的FsImage
这样又会带来新的问题,当在内存中的元数据更新时,若是同时更新FsImage,就会致使效率太低,但若是不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。所以,引入Edits文件(只进行追加操做,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,能够经过FsImage和Edits的合并,合成元数据。
可是,若是长时间添加数据到Edits中,会致使该文件数据过大,效率下降,并且一旦断电,恢复元数据须要的时间过长。所以,须要按期进行FsImage和Edits的合并,若是这个操做由NameNode节点完成,又会效率太低。所以,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并
总体的操做机制和 Redis 差很少,FsImage 至关于 Redis 中的 RDB 快照,Edits 至关于 Redis 中的 AOF 日志,二者结合。而 Redis 合并两个文件是采用的 Fork 进程的方式
第一阶段:NameNode启动
(1)第一次启动NameNode格式化后,建立Fsimage和Edits文件。若是不是第一次启动,直接加载编辑日志和镜像文件到内存
(2)客户端对元数据进行增删改的请求
(3)NameNode记录操做日志,更新滚动日志
(4)NameNode在内存中对数据进行增删改
第二阶段:Secondary NameNode工做
(1)Secondary NameNode询问NameNode是否须要CheckPoint。直接带回NameNode是否检查结果
(2)Secondary NameNode请求执行CheckPoint
(3)NameNode滚动正在写的Edits日志
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并
(6)生成新的镜像文件fsimage.chkpoint
(7)拷贝fsimage.chkpoint到NameNode
(8)NameNode将fsimage.chkpoint从新命名成fsimage
1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据自己,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳
2)DataNode启动后向NameNode注册,经过后,周期性(1小时)的向NameNode上报全部的块信息
3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另外一台机器,或删除某个数据块。若是超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用
4)集群运行中能够安全加入和退出一些机器
DataNode 也能够配置成多个目录,每一个目录存储的数据不同。不一样于 NameNode 多目录配置,NameNode 多个目录直接的数据是同样的,仅作备份和容灾用。我想是由于 DataNode 已经使用副原本作备份了,若是还继续在本机复制多份,不是颇有必要。而 NameNode 在未作高可用以前并无足够的备份,所以产生了差别
hdfs-site.xml
<property> <name>dfs.datanode.data.dir</name> <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value> </property>
(1)在hadoop104主机上再克隆一台hadoop105主机
(2)修改IP地址和主机名称
(3)删除原来HDFS文件系统留存的文件(/opt/module/hadoop/data和log)
(4)source一下配置文件
(5)直接启动DataNode,便可关联到集群
若是数据不均衡,可使用 ./start-balancer.sh
命令实现集群的再均衡
可是这样存在一个问题:若是某些恶意分子知道了 NameNode 的地址,即可以链接集群并克隆出集群的数据,这样是极不安全的
只容许白名单内的地址链接 NameNode
在 NameNode 的 /opt/module/hadoop/etc/hadoop目录下建立 dfs.hosts 文件,并添加以下主机名称
linux102 linux103 linux104
在 NameNode 的 hdfs-site.xml 配置文件中增长 dfs.hosts 属性
<property> <name>dfs.hosts</name> <value>/opt/module/hadoop/etc/hadoop/dfs.hosts</value> </property>
文件分发
xsync hdfs-site.xml
刷新NameNode
hdfs dfsadmin -refreshNodes
打开 Web 页面,能够看到不在白名单的 DataNode 会被下线
在黑名单上的节点会被强制退出
黑名单的配置 key 以下
<property> <name>dfs.hosts.exclude</name> <value>/opt/module/hadoop/etc/hadoop/dfs.hosts.exclude</value> </property>
须要注意
bin/hadoop distcp hdfs://linux102:9000/user/keats/hello.txt hdfs://linux103:9000/user/keats/hello.txt
开启回收站功能,能够将删除的文件在不超时的状况下,恢复原数据,起到防止误删除、备份等做用
快照至关于对目录作一个备份,并不会马上复制全部文件。而是记录文件变化