上周已经把Hadoop的HDFS的架构和设计大概说了下,也有部署过程。在这周讲的HDFS的数据流及操做例子html
HDFS系统依赖于如下服务
1.NameNode
2.DataNode
3.JournalNode
4.zkfc前端
其中JournalNode和zkfc是用来作高可用的。
那么数据流将在客户端、NameNode和DataNode之间进行流转。
HDFS可经过如下接口进行操做:
1.HTTP
2.C
3.NFS
4.FUSE
5.Java接口
本篇文章着重讲的是Java接口——FileSystem类。java
FileSystem类是Hadoop提供操做HDFS的Java类,经过这类,咱们就能够做为客户端进行操做HDFS(除了本身写的服务是客户端,在节点上运行的MR程序(MapReduce)也是客户端,并且仍是主要且经常使用的)。
如下是客户端、NameNode和DataNode之间的(读)数据流图: node
1.客户端经过FileSystem对象的open()方法
2.open()方法经过DistributeFileSystem对象经过RPC调用NameNode,获取文件起始块的位置及其副本的DataNode地址,并返回FSDataInputStream对象。git
DataNode是根据DataNode与客户端的距离进行排序,若是客户端自己就是一个DataNode,那么客户端将会从保存有相应数据块副本的本地DataNode读取数据。
3.调用FSDataInputStream对象的read()方法时,它会调用其自身的read方法将数据从DataNode传输到客户端。
4.到达块的末端时,DFSInputStream关闭与该DataNode的连接,而后寻找下一块数据的最佳DataNode。这些对于客户端都是透明的,在客户端看来它是一直在读取一个连续的流。shell
若是DFSInputStream在与DataNode通讯时遇到错误,会尝试从这个块的另外一个最邻近的DataNode读取数据。DFSDFSInputStream会记住故障的DataNode,以保证不会反复读取该节点上后续的块。DFSInputStream也会经过 校验和 确认从DataNode读取的数据是否完整。若是发现有损坏的块,DFSInputStream会试图从其余DataNode读取其副本,也会将损坏的块通知给NameNode。
5.客户端读取完后,会调用close()方法
如下是客户端、NameNode和DataNode之间的(写)数据流图: apache
1.客户端经过FileSystem对象的create()方法
2.create()方法经过DistributeFileSystem对象经过RPC调用NameNode,在文件系统的命名空间新建一个文件,此时该文件中尚未相应的数据块,,并返回FSDataOutputStream对象。数组
NameNode执行各类不一样的检查以确保这个文件不存在以及客户端有新建该文件的权限,若是经过检查,NameNode会建立新文件,不然 ,文件建立失败并向客户端抛出IOException异常。
3.调用FSDataOutputStream对象的write()方法时,它会使用DFSOutputStream对象进行写入数据(DFSOutputStream是封装在FSDataOutputStream)。服务器
在客户端写入数据时,DFSOutputStream将它分红一个个的数据包(DFSPacket),并写入内部队列,称为“数据队列(dataQueue,是其内部成员变量LinkedList)”。DataStreamer处理数据队列,它的职责时挑选出适合存储数据副本的一组DataNode,并根据此要求NameNode分配新的数据块。这一组DataNode构成一个 管线,若是副本数是3个,则管线中有3个DataNode节点。DataStreamer将数据包流式传输到管线中第1个DataNode,第1个DataNode保存数据包并将数据包继续发送到管线的第2个DataNode,如此类推到第3个DataNode节点。
DFSOutputStream其成员变量ackQueue“确认队列”,维护者一个内部数据包队列来等待DataNode的确认回执。,收到管线中全部DataNode节点的确认信息后,该数据包才会从ackQueue删除。
异常状况,若是任意DataNode在数据包写入期间失败,则执行如下操做:首先关闭管线,会从ackQueue把全部数据包都添加回dataQueue的最前端,以保证故障节点下游的DataNode不会漏掉任何一个数据包。并将标识传给NameNode,以便故障DataNode在恢复后能够删除存储部分的数据块。从管线删除故障DataNode后,基于正常DataNode构建一条新的管线,继续写数据。
4.客户端完成数据的写入后,对数据流调用close()方法,该操做等待NameNode返回确认写入完成。架构
管理命令参考:http://hadoop.apache.org/docs...
文件操做命令参考:http://hadoop.apache.org/docs... 不过这文档里的hadoop fs 要改成hdfs dfs
[jevoncode@s1 ~]$ hdfs version Hadoop 2.7.3 Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r baa91f7c6bc9cb92be5982de4719c1c8af91ccff Compiled by root on 2016-08-18T01:41Z Compiled with protoc 2.5.0 From source with checksum 2e4ce5f957ea4db193bce3734ff29ff4 This command was run using /mydata1/hadoop-2.7.3/share/hadoop/common/hadoop-common-2.7.3.jar
[jevoncode@s1 ~]$hdfs dfsadmin -report Configured Capacity: 158127783936 (147.27 GB) Present Capacity: 148158701568 (137.98 GB) DFS Remaining: 148158615552 (137.98 GB) DFS Used: 86016 (84 KB) DFS Used%: 0.00% Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 Missing blocks (with replication factor 1): 0 ------------------------------------------------- Live datanodes (3): Name: 192.168.31.181:50010 (s6.jevoncode.com) Hostname: s6.jevoncode.com Decommission Status : Normal Configured Capacity: 52709261312 (49.09 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3323027456 (3.09 GB) DFS Remaining: 49386205184 (45.99 GB) DFS Used%: 0.00% DFS Remaining%: 93.70% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 1 Last contact: Sun Jun 10 14:00:14 CST 2018 ...
[jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/ [jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/command/ [jevoncode@s1 ~]$ hdfs dfs -ls / Found 1 items drwxr-xr-x - jevoncode supergroup 0 2018-06-10 14:05 /opt
hdfs dfs -put sougouword.txt /opt/command/word.txt
hdfs dfs -get /opt/command/word.txt sougouword2.txt
hdfs dfs -rm /opt/command/word.txt
package com.jc.demo.hadoop.hdfs; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; import java.io.InputStream; import java.net.URL; /** * 前期准备: * [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/ * [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/command/ * [jevoncode@s1 ~]# hdfs dfs -put sougouword.txt /opt/command/word.txt * <p> * <p> * 方法一:动态参数 * 命令以下:上传至hadoop服务器 * [jevoncode@s1 ~]# export HADOOP_CLASSPATH=jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar * [jevoncode@s1 ~]# hadoop com.jc.demo.hadoop.hdfs.URLCat hdfs://ns/opt/command/word.txt * 其中ns是hdfs-site.xml配置的主机名,用于高可用 * * <p> * 方法二:远程访问 * 直接执行main方法,使用hdfsHost作参数,可远程访问 */ public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) throws Exception { String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt"; InputStream in = null; try { // in = new URL(args[0]).openStream(); //方法一:动态参数 in = new URL(hdfsHost).openStream(); //方法二:远程访问 IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
package com.jc.demo.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.InputStream; import java.net.URI; /** * 使用FileSystem获取文件内容 * Configuration在这例子中仅仅作个参数而已,没啥用,仍是须要在代码里指定url * */ public class FileSystemCat { public static void main(String[] args) throws Exception { String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt"; String uri = hdfsHost; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); InputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
package com.jc.demo.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.net.URI; /** * 使用FSDataInputStream随机读 */ public class FileSystemDoubleCat { public static void main(String[] args) throws Exception { String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt"; String uri = hdfsHost; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); FSDataInputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); in.seek(0); // go back to the start of the file IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
package com.jc.demo.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; /** * 使用FileSystem复制文件(写入),会自动建立目录 * FileSystem也是能够操做本地文件的,因此没有指定协议,就会操做本地文件目录/opt/java */ public class FileCopyWithProgress { public static void main(String[] args) throws Exception { String localSrc = "/home/cherry/Downloads/斗破苍穹.txt"; String dst = "hdfs://s1.jevoncode.com:9000/opt/java/斗破苍穹.txt"; // String dst = "/opt/java/"; //FileSystem也是能够操做本地文件的,因此没有指定协议,就会操做本地文件目录/opt/java InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst), new Progressable() { public void progress() { System.out.print("."); } }); IOUtils.copyBytes(in, out, 4096, true); } }
package com.jc.demo.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import java.net.URI; /** * 获取当前目录的全部文件及目录信息(仅一层) */ public class ListStatus { public static void main(String[] args) throws Exception { String hdfsHost = "hdfs://s1.jevoncode.com:9000/"; String uri = hdfsHost; args = new String[]{"/opt/", "/dir"}; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) { paths[i] = new Path(args[i]); } FileStatus[] status = fs.listStatus(paths); Path[] listedPaths = FileUtil.stat2Paths(status); //将文件状态FileStatus数组转为Path数组 for (Path p : listedPaths) { System.out.println(p); } } } /** * output: * 06-10 14:29:16 [main] DEBUG o.a.h.s.a.util.KerberosName - Kerberos krb5 configuration not found, setting default realm to empty * 06-10 14:29:16 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Falling back to shell based * 06-10 14:29:18 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Both short-circuit local reads and UNIX domain socket are disabled. * 06-10 14:29:18 [main] DEBUG o.a.h.h.p.d.s.DataTransferSaslUtil - DataTransferProtocol not using SaslPropertiesResolver, no QOP found in configuration for dfs.data.transfer.protection * hdfs://s1.jevoncode.com:9000/opt/command * hdfs://s1.jevoncode.com:9000/opt/java */
package com.jc.demo.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; /** * 获取文件状态 */ public class ShowFileStatusTest { private FileSystem fs; @Before public void setUp() throws IOException { String hdfsHost = "hdfs://s1.jevoncode.com:9000/"; String uri = hdfsHost; Configuration conf = new Configuration(); fs = FileSystem.get(URI.create(uri), conf); OutputStream out = fs.create(new Path("/dir/file")); out.write("content".getBytes("UTF-8")); out.close(); } @After public void tearDown() throws IOException { if (fs != null) { fs.close(); } } @Test(expected = FileNotFoundException.class) public void throwsFileNotFoundForNonExistentFile() throws IOException { fs.getFileStatus(new Path("no-such-file")); } /** * 测试文件状态 * @throws IOException * @throws InterruptedException */ @Test public void fileStatusForFile() throws IOException, InterruptedException { Path file = new Path("/dir/file"); FileStatus stat = fs.getFileStatus(file); assertThat(stat.getPath().toUri().getPath(), is("/dir/file")); //路径应为/dir/file assertThat(stat.isDirectory(), is(false)); //不是目录 assertThat(stat.getLen(), is(7L)); //文件大小 Thread.sleep(3000); //避免建立时间大于测试时间 assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));//建立时间应该小于测试时间 assertThat(stat.getReplication(), is((short) 3)); //副本个数 assertThat(stat.getBlockSize(), is(128 * 1024 * 1024L)); //块大小 assertThat(stat.getOwner(), is(System.getProperty("user.name"))); //当前用户是其建立者 assertThat(stat.getGroup(), is("supergroup")); //文件的用户组校验 assertThat(stat.getPermission().toString(), is("rw-r--r--")); //文件权限教研 } @Test public void fileStatusForDirectory() throws IOException { Path dir = new Path("/dir"); FileStatus stat = fs.getFileStatus(dir); assertThat(stat.getPath().toUri().getPath(), is("/dir")); assertThat(stat.isDirectory(), is(true)); assertThat(stat.getLen(), is(0L)); assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(), is((short) 0)); assertThat(stat.getBlockSize(), is(0L)); assertThat(stat.getOwner(), is(System.getProperty("user.name"))); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rwxr-xr-x")); } }