hadoop之HDFS学习笔记(一)

主要内容:hdfs的总体运行机制,DATANODE存储文件块的观察,hdfs集群的搭建与配置,hdfs命令行客户端常见命令;业务系统中日志生成机制,HDFS的java客户端api基本使用。java

一、什么是大数据

基本概念

《数据处理》node

在互联网技术发展到现今阶段,大量平常、工做等事务产生的数据都已经信息化,人类产生的数据量相比之前有了爆炸式的增加,之前的传统的数据处理技术已经没法胜任,需求催生技术,一套用来处理海量数据的软件工具应运而生,这就是大数据!linux

 

处理海量数据的核心技术:web

海量数据存储:分布式算法

海量数据运算:分布式sql

大数据的海量数据的存储和运算,核心技术就是分布式。数据库

 

这些核心技术的实现是不须要用户从零开始造轮子的express

存储和运算,都已经有大量的成熟的框架来用apache

 

存储框架:编程

HDFS——分布式文件存储系统(HADOOP中的存储框架)

HBASE——分布式数据库系统

KAFKA——分布式消息缓存系统(实时流式数据处理场景中应用普遍)

 

文件系统中的数据以非结构化居多,没有直观的结构,数据库中的信息多以表的形式存在,具备结构化,存在规律;

查询的时候文本文件只能一行一行扫描,而数据库效率高不少,能够利用sql查询语法,数据库在存和取方便的多。

数据库和文件系统相比,数据库至关于在特定的文件系统上的软件封装。其实HBASE就是对HDFS的进一层封装,它的底层文件系统就是HDFS。

分布式消息缓存系统,既然是分布式,那就意味着横跨不少机器,意味着容量能够很大。和前二者相比它的数据存储形式是消息(不是表,也不是文件),消息能够看作有固定格式的一条数据,好比消息头,消息体等,消息体能够是json,数据库的一条记录,一个序列化对象等。消息最终存放在kafaka内部的特定的文件系统里。

 

运算框架:(要解决的核心问题就是帮用户将处理逻辑在不少机器上并行

MAPREDUCE—— 离线批处理/HADOOP中的运算框架

SPARK —— 离线批处理/实时流式计算

STORM —— 实时流式计算

 

离线批处理:数据是静态的,一次处理一大批数据。

实时流式:数据在源源不断的生成,边生成,边计算

这些运算框架的思想都差很少,特别是mapreduce和spark,简单来看spark是对mapreduce的进一步封装;

运算框架和存储框架之间没有强耦合关系,spark能够读HDFS,HBASE,KAFKA里的数据,固然须要存储框架提供访问接口。

 

辅助类的工具(解放大数据工程师的一些繁琐工做):

HIVE —— 数据仓库工具:能够接收sql,翻译成mapreduce或者spark程序运行

FLUME——数据采集

SQOOP——数据迁移

ELASTIC SEARCH —— 分布式的搜索引擎

flume用于自动采集数据源机器上的数据到大数据集群中。

HIVE看起来像一个数据库,但其实不是,Hive中存了一些须要分析的数据,而后在直接写sql进行分析,hive接收sql,翻译成mapreduce或者spark程序运行;

hive本质是mapreduce或spark,咱们只须要写sql逻辑而不是mapreduce逻辑,Hive自动完成对sql的翻译,并且仍是在海量数据集上。

.......

 

换个角度说,大数据是:

一、有海量的数据

二、有对海量数据进行挖掘的需求

三、有对海量数据进行挖掘的软件工具(hadoop、spark、storm、flink、tez、impala......)

大数据在现实生活中的具体应用

数据处理的最典型应用:公司的产品运营状况分析

 

电商推荐系统:基于海量的浏览行为、购物行为数据,进行大量的算法模型的运算,得出各种推荐结论,以供电商网站页面来为用户进行商品推荐

 

精准广告推送系统:基于海量的互联网用户的各种数据,统计分析,进行用户画像(获得用户的各类属性标签),而后能够为广告主进行有针对性的精准的广告投放

二、什么是hadoop

hadoop中有3个核心组件:

分布式文件系统:HDFS —— 实现将文件分布式存储在不少的服务器上

分布式运算编程框架:MAPREDUCE —— 实如今不少机器上分布式并行运算

分布式资源调度平台:YARN —— 帮用户调度大量的mapreduce程序,并合理分配运算资源

三、hdfs总体运行机制

hdfs:分布式文件系统

hdfs有着文件系统共同的特征:

一、有目录结构,顶层目录是:  /

二、系统中存放的就是文件

三、系统能够提供对文件的:建立、删除、修改、查看、移动等功能

 

hdfs跟普通的单机文件系统有区别:

一、单机文件系统中存放的文件,是在一台机器的操做系统中

二、hdfs的文件系统会横跨N多的机器

三、单机文件系统中存放的文件,是在一台机器的磁盘上

四、hdfs文件系统中存放的文件,是落在n多机器的本地单机文件系统中(hdfs是一个基于linux本地文件系统之上的文件系统)

 

hdfs的工做机制:

一、客户把一个文件存入hdfs,其实hdfs会把这个文件切块后,分散存储在N台linux机器系统中(负责存储文件块的角色:data node)<准确来讲:切块的行为是由客户端决定的>

二、一旦文件被切块存储,那么,hdfs中就必须有一个机制,来记录用户的每个文件的切块信息,及每一块的具体存储机器(负责记录块信息的角色是:name node)

三、为了保证数据的安全性,hdfs能够将每个文件块在集群中存放多个副本(到底存几个副本,是由当时存入该文件的客户端指定的)

 

综述:一个hdfs系统,由一台运行了namenode的服务器,和N台运行了datanode的服务器组成!

四、搭建hdfs分布式集群

4.1 hdfs集群组成结构:

4.2 安装hdfs集群的具体步骤:

4.2.一、首先须要准备N台linux服务器

学习阶段,用虚拟机便可!

先准备4台虚拟机:1个namenode节点  + 3 个datanode 节点

 

4.2.二、修改各台机器的主机名和ip地址

主机名:hdp-01  对应的ip地址:192.168.33.11

主机名:hdp-02  对应的ip地址:192.168.33.12

主机名:hdp-03  对应的ip地址:192.168.33.13

主机名:hdp-04  对应的ip地址:192.168.33.14

4.2.三、从windows中用CRT软件进行远程链接

在windows中将各台linux机器的主机名配置到的windows的本地域名映射文件中:

c:/windows/system32/drivers/etc/hosts

192.168.33.11 hdp-01

192.168.33.12 hdp-02

192.168.33.13 hdp-03

192.168.33.14 hdp-04

用crt链接上后,修改一下crt的显示配置(字号,编码集改成UTF-8):

 

 

 

4.2.三、配置linux服务器的基础软件环境

 

  •  防火墙

关闭防火墙:service iptables stop 

关闭防火墙自启: chkconfig iptables off

 

  •  安装jdk:(hadoop体系中的各软件都是java开发的)

  1)        利用alt+p 打开sftp窗口,而后将jdk压缩包拖入sftp窗口

  2)         而后在linux中将jdk压缩包解压到/root/apps 下

  3)         配置环境变量:JAVA_HOME   PATH

  vi /etc/profile   在文件的最后,加入:

export JAVA_HOME=/root/apps/jdk1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin

  4)         修改完成后,记得 source /etc/profile使配置生效

  5)         检验:在任意目录下输入命令: java -version 看是否成功执行

  6)         将安装好的jdk目录用scp命令拷贝到其余机器

  7)         将/etc/profile配置文件也用scp命令拷贝到其余机器并分别执行source命令

  •   集群内主机的域名映射配置

在hdp-01上,vi /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.33.11   hdp-01

192.168.33.12   hdp-02

192.168.33.13   hdp-03

192.168.33.14   hdp-04

而后,将hosts文件拷贝到集群中的全部其余机器上

scp /etc/hosts hdp-02:/etc/

scp /etc/hosts hdp-03:/etc/

scp /etc/hosts hdp-04:/etc/

 

提示:

若是在执行scp命令的时候,提示没有scp命令,则能够配置一个本地yum源来安装

一、先在虚拟机中配置cdrom为一个centos的安装镜像iso文件

二、在linux系统中将光驱挂在到文件系统中(某个目录)

三、mkdir /mnt/cdrom

四、mount -t iso9660 -o loop /dev/cdrom /mnt/cdrom

五、检验挂载是否成功: ls /mnt/cdrom

六、三、配置yum的仓库地址配置文件

七、yum的仓库地址配置文件目录: /etc/yum.repos.d

八、先将自带的仓库地址配置文件批量改名:

 

九、而后,拷贝一个出来进行修改

 

 

十、修改完配置文件后,再安装scp命令:

十一、yum install openssh-clients -y

4.2.四、安装hdfs集群

 

一、上传hadoop安装包到hdp-01

bin文件为hadoop功能命令,sbin中为集群管理命令。

二、修改配置文件

 

核心配置参数:

1)         指定hadoop的默认文件系统为:hdfs

2)         指定hdfs的namenode节点为哪台机器

3)         指定namenode软件存储元数据的本地目录

4)         指定datanode软件存放文件块的本地目录

hadoop的配置文件在:/root/apps/hadoop安装目录/etc/hadoop/

hadoop中的其余组件如mapreduce,yarn等,这些组将会去读数据,指定hadoop的默认文件系统为:hdfs,就是告诉这些组件去hdfs中读数据;该项配置意味dadoop中的组件能够访问各类文件系统。

若不指定数据的存放目录,hadoop默认将数据存放在/temp下。

能够参考官网的默认配置信息。

1) 修改hadoop-env.sh
export JAVA_HOME=/root/apps/jdk1.8.0_60
2) 修改core-site.xml
<configuration>

<property>

<name>fs.defaultFS</name>

<value>hdfs://hdp-01:9000/</value>

</property>

</configuration>

 

<value>hdfs://hdp-01:9000</value>包含两层意思:

  一、指定默认的文件系统。

  二、指明了namenode是谁。

value中的值是URI风格

3) 修改hdfs-site.xml

配置namenode和datanode的工做目录,添加secondary name node。

<configuration>

<property>

<name>dfs.namenode.name.dir</name>

<value>/root/hdpdata/name/</value>

</property>

 

<property>

<name>dfs.datanode.data.dir</name>

<value>/root/hdpdata/data</value>

</property>

 

<property>

<name>dfs.namenode.secondary.http-address</name>

<value>hdp-02:50090</value>

</property>

 

</configuration>

 

4) 拷贝整个hadoop安装目录到其余机器 
  scp -r /root/apps/hadoop-2.8.1  hdp-02:/root/apps/

  scp -r /root/apps/hadoop-2.8.1  hdp-03:/root/apps/

  scp -r /root/apps/hadoop-2.8.1  hdp-04:/root/apps/

 

5) 启动HDFS

 

所谓的启动HDFS,就是在对的机器上启动对的软件

提示:

要运行hadoop的命令,须要在linux环境中配置HADOOP_HOME和PATH环境变量

vi /etc/profile

 


export JAVA_HOME=/root/apps/jdk1.8.0_60 export HADOOP_HOME=/root/apps/hadoop-2.8.1 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

 

 

首先,初始化namenode的元数据目录

要在hdp-01上执行hadoop的一个命令来初始化namenode的元数据存储目录

hadoop namenode -format

  建立一个全新的元数据存储目录

  生成记录元数据的文件fsimage

  生成集群的相关标识:如:集群id——clusterID

该步骤叫作namenode的初始化也叫格式化,本质是创建namenode运行所须要的目录以及一些必要的文件,因此该操做通常只在集群第一次启动以前执行。

 

 

而后,启动namenode进程(在hdp-01上)

hadoop-daemon.sh start namenode

启动完后,首先用jps查看一下namenode的进程是否存在

namenode就是一个java软件,咱们知道启动一个java软件须要主类的main方法 java xxx.java - 若干参数,处于方便的考虑,hadoop中提供了一个通用的软件启动脚本hadoop-daemon.sh,脚本能够接受参数,专门用来启动hadoop中的软件。

 

能够看到namenode在监听两个端口,9000用来和客户端通讯(9000为RPC端口号,内部进程之间互相通讯的端口,datanode和namenode的通讯),接受hdfs客户端的请求,50070是web服务端口,也就是说namenode内置一个web服务器,http客户端能够经过次端口发送请求。

而后,在windows中用浏览器访问namenode提供的web端口:50070

http://hdp-01:50070

而后,启动众datanode们(在任意地方)

hadoop-daemon.sh start datanode

 

下图是datanode的一下信息展现,能够看到datanode内部通讯的端口号是50010,并且datanode也提供了问访问端口50075.

6) 用自动批量启动脚原本启动HDFS

hdfs其实就是一堆java软件,咱们能够本身手动hadoop-daemon.sh逐个启动,也可使用hadoop提供的批量启动脚本。

1)         先配置hdp-01到集群中全部机器(包含本身)的免密登录

 

2)         配完免密后,能够执行一次  ssh 0.0.0.0

3)         修改hadoop安装目录中/etc/hadoop/slaves(把须要启动datanode进程的节点列入)

hdp-01

hdp-02

hdp-03

hdp-04

core-site.xml中配置过namenode,可是须要批量启动那些datanode呢,该文件/etc/hadoop/slaves的配置就是解决这个问题的,该文件就是给启动脚本看的。

4)         在hdp-01上用脚本:start-dfs.sh 来自动启动整个集群

5)         若是要中止,则用脚本:stop-dfs.sh

start-dfs.sh、stop-dfs.sh会启动、关闭namenode,datanode和secondnamenode

固然你也能够本身写脚原本作上述的事情 ,以下所示。

 

 

五、hdfs的客户端操做

hdfs装好以后,接下来的工做就是hdfs里传东西,去东西,由客户端来完成。

5.一、客户端的理解

hdfs的客户端有多种形式:

一、网页形式

二、命令行形式

三、客户端在哪里运行,没有约束,只要运行客户端的机器可以跟hdfs集群联网们

   对于客户端来说,hdfs是一个总体,网页版的客户端主要是用来查看hdfs信息的,能够建立目录,可是须要权限

命令行客户端 

bin命令中的 hadoophdfs  均可以启动 hdfs 客户端,hadoop和hdfs都是脚本,都会去启动一个hdfs的java客户端。java客户端在安装包的jar包中

./hadoop fs -ls /

 

表示hadoop要访问hdfs,该脚本就会去启动hdfs客户端,客户端能够接收参数,好比查看hdfs根目录。

 

 

文件的切块大小和存储的副本数量,都是由客户端决定!

所谓的由客户端决定,是经过配置参数来定的

hdfs的客户端会读如下两个参数,来决定切块大小(默认128M)、副本数量(默认3):

切块大小的参数: dfs.blocksize

副本数量的参数: dfs.replication

若是使用命令行客户端时,上面两个参数应该配置在客户端机器的hadoop目录中的hdfs-site.xml中配置,(命令行客户端本质就是启动了一个java客户端,这个客户端在启动的时候会将它依赖的全部jar包加入classpath中,客户端会从jar包中,加载xx-default.xml来得到默认的配置文件,也能够在hadoop/etc/xxx-site.xml中配置具体的参数来覆盖默认值。此时的/etc下的配置文件就是客户自定义的配置文件,也会被java客户端加载【客户端能够运行在任何地方】);

固然也能够在具体代码中指定,见6节中的核心代码

<property>
<name>dfs.blocksize</name>
<value>64m</value>
</property>

<property>
<name>dfs.replication</name>
<value>2</value>
</property>

 5.1.一、上传过程

下图为datanode中的数据,.meta是该block的校验和信息。咱们能够经过linux cat命令将两个块合并,会发现与原来的文件是同样的。

 

 5.1.二、下载过程

客户端首先回去namenode上去查找,有没有请求的hdfs路径下的文件,有的话都有该文件被切割成几块,每块有几个副本,这些副本都存放在集群中的哪些机器上,而后去存放了第一块数据的某一台机器上去下载第一块数据,将数据追加到本地,而后去下载下一块数据,继续追加到本地文件,知道下载完全部的块。

 

 

5.二、hdfs客户端的经常使用操做命令

一、上传文件到hdfs中

hadoop fs -put /本地文件  /aaa

 

二、下载文件到客户端本地磁盘

hadoop fs -get /hdfs中的路径   /本地磁盘目录

 

三、在hdfs中建立文件夹

hadoop fs -mkdir  -p /aaa/xxx

 

四、移动hdfs中的文件(改名)

hadoop fs -mv /hdfs的路径1  /hdfs的另外一个路径2

 

复制hdfs中的文件到hdfs的另外一个目录

hadoop fs -cp /hdfs路径_1  /hdfs路径_2

 

五、删除hdfs中的文件或文件夹

hadoop fs -rm -r /aaa

 

六、查看hdfs中的文本文件内容

hadoop fs -cat /demo.txt

hadoop fs -tail /demo.txt

hadoop fs -tail -f /demo.txt

hadoop fs -text /demo.txt

 

七、查看hdfs目录下有哪些文件

hadoop fs –ls /

 

八、追加本地文件到hdfs中的文件

hadoop fs -appendToFile 本地路径 /hdfs路径

 

九、权限修改

 hadoop fs -chmod username1:usergroup1 /hdfs路径

要说明的是,hdfs中的用户和用户组这是一个名字称呼,与linux不同,linux中不能将选线分配给一个不存在的用户。

 

 能够查看hadoop fs 不带任何参数,来查看hdfs所支持的命令

Usage: hadoop fs [generic options] [-appendToFile <localsrc> ... <dst>] [-cat [-ignoreCrc] <src> ...] [-checksum <src> ...] [-chgrp [-R] GROUP PATH...] [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...] [-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>] [-createSnapshot <snapshotDir> [<snapshotName>]] [-deleteSnapshot <snapshotDir> <snapshotName>] [-df [-h] [<path> ...]] [-du [-s] [-h] [-x] <path> ...] [-expunge] [-find <path> ... <expression> ...] [-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-getfacl [-R] <path>] [-getfattr [-R] {-n name | -d} [-e en] <path>] [-getmerge [-nl] [-skip-empty-file] <src> <localdst>] [-help [cmd ...]] [-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]] [-mkdir [-p] <path> ...] [-moveFromLocal <localsrc> ... <dst>] [-moveToLocal <src> <localdst>] [-mv <src> ... <dst>] [-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-renameSnapshot <snapshotDir> <oldName> <newName>] [-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...] [-rmdir [--ignore-fail-on-non-empty] <dir> ...] [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]] [-setfattr {-n name [-v value] | -x name} <path>] [-setrep [-R] [-w] <rep> <path> ...] [-stat [format] <path> ...] [-tail [-f] <file>] [-test -[defsz] <path>] [-text [-ignoreCrc] <src> ...] [-touchz <path> ...] [-truncate [-w] <length> <path> ...] [-usage [cmd ...]]

 

 

六、hdfs的java客户端编程

HDFS客户端编程应用场景:数据采集

业务系统中日志生成机制

数据采集程序其实就是经过对java客户端编程,将数据不断的上传到hdfs。

在windows开发环境中作一些准备工做:

一、在windows的某个路径中解压一份windows版本的hadoop安装包

二、将解压出的hadoop目录配置到windows的环境变量中:HADOOP_HOME

缘由:若不配置环境变量,会在下载hdfs文件是出错,是因为使用hadoop的FileSystem保存文件到本地的时候出于效率的考虑,会使用hadoop安装包中的c语言库,显然没有配置hadoop环境变量时是找不到该c语言类库中的文件的;然而上传文件到hdfs没有相似问题;

 

 6.一、核心代码

一、将hdfs客户端开发所需的jar导入工程(jar包可在hadoop安装包中找到common和hdfs)

二、写代码

6.1.一、获取hdfs客户端

要点:要对hdfs中的文件进行操做,代码中首先须要得到一个hdfs的客户端对象

Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");

 

 完整代码以下:

/** * Configuration参数对象的机制: * 构造时,会加载jar包中的默认配置 xx-default.xml(core-default.xmlhdfs-default.xml) * 再加载 用户配置xx-site.xml ,覆盖掉默认参数 * 构造完成以后,还能够conf.set("p","v"),会再次覆盖用户配置文件中的参数值 */
        // new Configuration()会从项目的classpath中加载core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件
        Configuration conf = new Configuration(); // 指定本客户端上传文件到hdfs时须要保存的副本数为:2
        conf.set("dfs.replication", "2"); // 指定本客户端上传文件到hdfs时切块的规格大小:64M
        conf.set("dfs.blocksize", "64m"); // 构造一个访问指定HDFS系统的客户端对象: 参数1:——HDFS系统的URI,参数2:——客户端要特别指定的参数,参数3:客户端的身份(用户名)
        FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); // 上传一个文件到HDFS中
        fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/")); fs.close();
View Code

 6.1.二、对文件进行操做

上传、下载文件;文件夹的建立和删除、文件的移动和复制、查看文件夹和文件等。

三、利用fs对象的方法进行文件操做

方法均与命令行方法对应,好比:

上传文件

fs.copyFromLocalFile(new Path("本地路径"),new Path("hdfs的路径"));

 

下载文件

fs.copyToLocalFile(new Path("hdfs的路径"),new Path("本地路径"))

 对文件的增删改查以下,对文件数据的操做后续介绍。

FileSystem fs = null; @Before public void init() throws Exception{ Configuration conf = new Configuration(); conf.set("dfs.replication", "2"); conf.set("dfs.blocksize", "64m"); fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); } /** * 从HDFS中下载文件到客户端本地磁盘 * @throws IOException * @throws IllegalArgumentException */ @Test public void testGet() throws IllegalArgumentException, IOException{ fs.copyToLocalFile(new Path("/hdp20-05.txt"), new Path("f:/")); fs.close(); } /** * 在hdfs内部移动文件\修更名称 */ @Test public void testRename() throws Exception{ fs.rename(new Path("/install.log"), new Path("/aaa/in.log")); fs.close(); } /** * 在hdfs中建立文件夹 */ @Test public void testMkdir() throws Exception{ fs.mkdirs(new Path("/xx/yy/zz")); fs.close(); } /** * 在hdfs中删除文件或文件夹 */ @Test public void testRm() throws Exception{ fs.delete(new Path("/aaa"), true); fs.close(); } /** * 查询hdfs指定目录下的文件信息 */ @Test public void testLs() throws Exception{ // 只查询文件的信息,不返回文件夹的信息
        RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true); while(iter.hasNext()){ LocatedFileStatus status = iter.next(); System.out.println("文件全路径:"+status.getPath()); System.out.println("块大小:"+status.getBlockSize()); System.out.println("文件长度:"+status.getLen()); System.out.println("副本数量:"+status.getReplication()); System.out.println("块信息:"+Arrays.toString(status.getBlockLocations())); System.out.println("--------------------------------"); } fs.close(); } /** * 查询hdfs指定目录下的文件和文件夹信息 */ @Test public void testLs2() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status:listStatus){ System.out.println("文件全路径:"+status.getPath()); System.out.println(status.isDirectory()?"这是文件夹":"这是文件"); System.out.println("块大小:"+status.getBlockSize()); System.out.println("文件长度:"+status.getLen()); System.out.println("副本数量:"+status.getReplication()); System.out.println("--------------------------------"); } fs.close(); }
View Code

 6.1.三、对文件数据进行操做

 同过客户端使用open打开流对象来读取hdfs中文件的具体数据,包括指定偏移量来读取特定范围的数据;经过客户端向hdfs文件追加数据。

/** * 读取hdfs中的文件的内容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/test.txt")); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 读取hdfs中文件的指定偏移量范围的内容 * * * 做业题:用本例中的知识,实现读取一个文本文件中的指定BLOCK块中的全部数据 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/xx.dat")); // 将读取的起始位置进行指定
        in.seek(12); // 读16个字节
        byte[] buf = new byte[16]; in.read(buf); System.out.println(new String(buf)); in.close(); fs.close(); }
View Code

 

 写数据,create提供了丰富的重载函数,轻松实现覆盖,追加,以及指定缓存大小,副本数量等等信息。

/** * 往hdfs中的文件写内容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false); // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg
 FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg"); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) != -1) { out.write(buf,0,read); } in.close(); out.close(); fs.close(); }
View Code

 

七、HDFS实例

hdfs版本wordcount程序。

任务描述:

一、从hdfs文件中读取数据,每次读取一行数据;

二、将数据交给具体的单词统计业务去做业(使用面向接口编程,当业务逻辑改变时,无需修改主程序代码);

三、并将该行数据产生的结果存入缓存中(能够用hashmap模拟)

数据采集设计:

一、流程
启动一个定时任务:
——定时探测日志源目录
——获取须要采集的文件
——移动这些文件到一个待上传临时目录
——遍历待上传目录中各文件,逐一传输到HDFS的目标路径,同时将传输完成的文件移动到备份目录

启动一个定时任务:
——探测备份目录中的备份数据,检查是否已超出最长备份时长,若是超出,则删除


二、规划各类路径
日志源路径: d:/logs/accesslog/
待上传临时目录: d:/logs/toupload/
备份目录: d:/logs/backup/日期/

HDFS存储路径: /logs/日期
HDFS中的文件的前缀:access_log_
HDFS中的文件的后缀:.log

将路径配置写入属性文件

MAPPER_CLASS=cn.edu360.hdfs.wordcount.CaseIgnorWcountMapper INPUT_PATH=/wordcount/input OUTPUT_PATH=/wordcount/output2
View Code

 

主程序代码示例:

import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; public class HdfsWordcount { public static void main(String[] args) throws Exception{ /** * 初始化工做 */ Properties props = new Properties(); props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties")); Path input = new Path(props.getProperty("INPUT_PATH")); Path output = new Path(props.getProperty("OUTPUT_PATH")); Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS")); Mapper mapper = (Mapper) mapper_class.newInstance(); Context context =  new Context(); /** * 处理数据 */ FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root"); RemoteIterator<LocatedFileStatus> iter = fs.listFiles(input, false); while(iter.hasNext()){ LocatedFileStatus file = iter.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; // 逐行读取
            while ((line = br.readLine()) != null) { // 调用一个方法对每一行进行业务处理
 mapper.map(line, context); } br.close(); in.close(); } /** * 输出结果 */ HashMap<Object, Object> contextMap = context.getContextMap(); if(fs.exists(output)){ throw new RuntimeException("指定的输出目录已存在,请更换......!"); } FSDataOutputStream out = fs.create(new Path(output,new Path("res.dat"))); Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for (Entry<Object, Object> entry : entrySet) { out.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes()); } out.close(); fs.close(); System.out.println("恭喜!数据统计完成....."); } }
View Code

 

自定义的业务接口

public interface Mapper { public void map(String line,Context context); }
View Code

 

业务实现类1

public class WordCountMapper implements Mapper{ @Override public void map(String line, Context context) { String[] words = line.split(" "); for (String word : words) { Object value = context.get(word); if(null==value){ context.write(word, 1); }else{ int v = (int)value; context.write(word, v+1); } } } }
View Code

 

业务实现类2

public class CaseIgnorWcountMapper implements Mapper { @Override public void map(String line, Context context) { String[] words = line.toUpperCase().split(" "); for (String word : words) { Object value = context.get(word); if (null == value) { context.write(word, 1); } else { int v = (int) value; context.write(word, v + 1); } } } }
View Code

 

缓存模拟

import java.util.HashMap; public class Context { private HashMap<Object,Object> contextMap = new HashMap<>(); public void write(Object key,Object value){ contextMap.put(key, value); } public Object get(Object key){ return contextMap.get(key); } public HashMap<Object,Object> getContextMap(){ return contextMap; } }
View Code

 

八、实战描述

 需求描述:

在业务系统的服务器上,业务程序会不断生成业务日志(好比网站的页面访问日志)

业务日志是用log4j生成的,会不断地切出日志文件

须要按期(好比每小时)从业务服务器上的日志目录中,探测须要采集的日志文件(access.log,不是直接采集数据),发往HDFS

 

注意点:业务服务器可能有多台(hdfs上的文件名不能直接用日志服务器上的文件名)

当天采集到的日志要放在hdfs的当天目录中

采集完成的日志文件,须要移动到到日志服务器的一个备份目录中

按期检查(一小时检查一次)备份目录,将备份时长超出24小时的日志文件清除

Timer timer = new Timer() timer.schedual()

 简易版日志采集主程序

import java.util.Timer; public class DataCollectMain { public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new CollectTask(), 0, 60*60*1000L); timer.schedule(new BackupCleanTask(), 0, 60*60*1000L); } }
View Code

 

日志收集定时任务类

import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class CollectTask extends TimerTask { @Override public void run() { /** * ——定时探测日志源目录 ——获取须要采集的文件 ——移动这些文件到一个待上传临时目录 * ——遍历待上传目录中各文件,逐一传输到HDFS的目标路径,同时将传输完成的文件移动到备份目录 * */
        try { // 获取配置参数
            Properties props = PropertyHolderLazy.getProps(); // 构造一个log4j日志对象
            Logger logger = Logger.getLogger("logRollingFile"); // 获取本次采集时的日期
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR)); // 列出日志源目录中须要采集的文件
            File[] listFiles = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) { return true; } return false; } }); // 记录日志
            logger.info("探测到以下文件须要采集:" + Arrays.toString(listFiles)); // 将要采集的文件移动到待上传临时目录
            File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR)); for (File file : listFiles) { FileUtils.moveFileToDirectory(file, toUploadDir, true); } // 记录日志
            logger.info("上述文件移动到了待上传目录" + toUploadDir.getAbsolutePath()); // 构造一个HDFS的客户端对象
 FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root"); File[] toUploadFiles = toUploadDir.listFiles(); // 检查HDFS中的日期目录是否存在,若是不存在,则建立
            Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day); if (!fs.exists(hdfsDestPath)) { fs.mkdirs(hdfsDestPath); } // 检查本地的备份目录是否存在,若是不存在,则建立
            File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/"); if (!backupDir.exists()) { backupDir.mkdirs(); } for (File file : toUploadFiles) { // 传输文件到HDFS并更名access_log_
                Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX)); fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath); // 记录日志
                logger.info("文件传输到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath); // 将传输完成的文件移动到备份目录
                FileUtils.moveFileToDirectory(file, backupDir, true); // 记录日志
                logger.info("文件备份完成:" + file.getAbsolutePath() + "-->" + backupDir); } } catch (Exception e) { e.printStackTrace(); } } }
View Code

 

按期清理过期备份日志

import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask; import org.apache.commons.io.FileUtils; public class BackupCleanTask extends TimerTask { @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); long now = new Date().getTime(); try { // 探测本地备份目录
            File backupBaseDir = new File("d:/logs/backup/"); File[] dayBackDir = backupBaseDir.listFiles(); // 判断备份日期子目录是否已超24小时
            for (File dir : dayBackDir) { long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ FileUtils.deleteDirectory(dir); } } } catch (Exception e) { e.printStackTrace(); } } }
View Code

 

配置信息提取到属性配置文件中,并写成常量,以单例设计模式去加载配置信息。

LOG_SOURCE_DIR=d:/logs/accesslog/ LOG_TOUPLOAD_DIR=d:/logs/toupload/ LOG_BACKUP_BASE_DIR=d:/logs/backup/ LOG_BACKUP_TIMEOUT=24 LOG_LEGAL_PREFIX=access.log. HDFS_URI=hdfs://hdp-01:9000/
HDFS_DEST_BASE_DIR=/logs/ HDFS_FILE_PREFIX=access_log_ HDFS_FILE_SUFFIX=.log
View Code
public class Constants { /** * 日志源目录参数key */
    public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR"; /** * 日志待上传目录参数key */
    public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR"; public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR"; public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT"; public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX"; public static final String HDFS_URI = "HDFS_URI"; public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR"; public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX"; public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX"; }
View Code
import java.util.Properties; /** * 单例模式:懒汉式——考虑了线程安全 * @author ThinkPad * */

public class PropertyHolderLazy { private static Properties prop = null; public static Properties getProps() throws Exception { if (prop == null) { synchronized (PropertyHolderLazy.class) { if (prop == null) { prop = new Properties(); prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties")); } } } return prop; } }
View Code

 

import java.util.Properties; /** * 单例设计模式,方式一: 饿汉式单例 * @author ThinkPad * */
public class PropertyHolderHungery { private static Properties prop = new Properties(); static { try { prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties")); } catch (Exception e) { } } public static Properties getProps() throws Exception { return prop; } }
View Code

九、总结

hdfs有服务端和客户端;

服务端:

  成员:namenode 管理元数据,datanode存储数据

  配置:须要指定使用的文件系统(默认的配置在core-default.xml,为本地文件系统,须要修改服务器core-site.xml修改成hdfs文件系统,并指定namenode),namenode和datanode的工做目录(服务器的默认配置在hdfs-default.xml中,默认是在/temp下,须要就该hdfs-site.xml来覆盖默认值。);

  细节:第一次启动集群时须要格式化namenode

客户端:

  形式:网页端,命令行,java客户端api;客户端能够运行在任何地方。

  功能:指定上传文件的信息报括切块大小(hdfs-default.xml中默认值128m,能够在hdfs-site.xml中修改,也能够咋java api 中建立客户端对象的时候指定,总之由客户端来指定),副本数量(hdfs-default.xml中默认值3,一样能够修改覆盖);完成hdfs中文件的系列操做,如上传,下载

虽然服务端和客户端的共用配置 core-default.xml core-site.xml;hdfs-default.xml hdfs-site.xml,可是不一样的程序所须要的参数不一样,只不过为了方便,全部参数都写在一个文件中了。便是在服务器的hdfs-site.xml中配置了切块大小和副本数量,服务器的namenode和datanode根本不关心也不使用这些参数,只有启动服务器上的命令行客户端时,该参数才可能起做用。

相关文章
相关标签/搜索