hadoop--大数据生态圈中最基础、最重要的组件

hadoop是什么?

hadoop是一个由Apache基金会所开发的分布式系统基础架构,hdfs分布式文件存储、MapReduce并行计算。主要是用来解决海量数据的存储和海量数据的分析计算问题,这是狭义上的hadoop。广义上来说,hadoop一般指的是一个更普遍的概念--hadoop生态圈html

hadoop三大发行版本

hadoop三大发型版本:Apache、Cloudera、Hortonworksjava

  • Apache版本,也成为社区版,是最原始的版本,对入门学习较好
  • Cloudera版本在大型互联网企业中用的较多,是对社区版进行了封装,主要是解决了和其它大数据组件(好比:hive)的兼容性问题。可是出了问题帮你解决要收费
  • Hortonworks版本文档较好

公司通常更经常使用的是Cloudera版本的hadoop,另外Cloudera版本的hadoop也简称为CDH。node

hadoop优点

1)高可靠性:hadoop底层维护多个数据副本,因此即便某个机器出现故障,也不会致使数据的丢失。python

好比我如今有1G的数据,要存在5台机器上,hadoop默认会先将整个数据进行切割,默认是128M/块,固然这个数值也能够本身改。而后是以三副本进行存储,也就是说每一个128M的块,都会被存储三份在不一样的机器上。这样即便一台服务器宕机了,数据也不会丢失。mysql

2)高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。很好理解,若是容量不够了,直接横向扩展,加机器就行。linux

3)高效性:在MapReduce的思想下,hadoop是并行工做的,以加快任务处理速度。实际上若是学了spark,会发现hadoop本身所描述的易用性、高效性实在是不敢恭维哈。可是hadoop做为大数据生态圈中很是重要的组件,咱们是有必要学好的,并且学了hadoop以后,再学spark会轻松不少。并且学习了hadoop,再学spark也会明白为何spark会比hadoop在效率上高出几十倍、甚至上百倍。git

4)高容错性:可以自动将失败的任务从新分配,若是某台机器挂掉了,那么会自动将任务分配到其余的机器上执行github

hadoop 1.x和hadoop 2.x的区别

hadoop组成

hadoop的组成上面已经说了,主要由四部分组成,可是哪一个common,咱们通常不用管。所以从下往上只须要关注,hdfsyarnMapReduce便可。golang

hdfsweb

hdfs:hadoop distributed file system,hadoop分布式文件系统,它由哪几部分组成呢?

1.NameNode(nn):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、副本数、文件权限),以及每一个文件的块列表和块所在的DataNode等等。

2.DataNode(dn):真正用来存储文件块数据以及块数据的校验和,以前说了大文件是要分红多块的,每一块存在不一样的DataNode节点(说白了就是服务器、或者电脑)上,每个节点存储了哪些文件、以及文件被切分了几份都存在哪些DataNode上、文件名、属性等等,这些都叫作元数据,统一交给NameNode管理,而DataNode只负责真正的存储数据。

3. Secondary NameNode(2nn):用来监控hdfs状态的后台辅助程序,每一个一段时间获取hdfs元数据的快照。

yarn

图中有一个Resource Manager和三个Node Manager,至关于有四个节点。

Resource Manager

1.处理客户端请求。客户端想访问集群,好比提交一个做业,要通过Resource Manager,它是整个资源的管理者,管理整个集群的CPU、内存、磁盘

2.监控Node Manager

3.启动或监控Application Master

4.资源的分配和调度

Node Manager

1.管理单个节点上的资源,Node Manager是当前节点资源的管理者,固然须要跟Resource Manager汇报

2.处理来自Resource Manager的命令

3.处理来自Application Master的命令

Application Master

1.某个任务的管理者。当任务在Node Manager上运行的时候,就是由Application Master负责管理

2.负责数据的切分

3.为应用程序申请资源并分配给内部的任务

4.任务的监控与容错

Container

Container是yarn中资源的抽象,它封装了节点上的多维度资源,如内存、CPU、磁盘、网络等等。

其实Container是为Application Master服务的,由于任务在运行的时候,是否是须要内存、cpu,这些资源都被虚拟化到Container里面了。

MapReduce

1. map阶段并行处理输入数据

2. reduce阶段对map结果进行汇总

几张图让你理解hdfs工做原理(细节内容后面介绍,先看几张漫画感觉一下)

大数据技术生态体系

安装

下面咱们来配置一下环境,个人全部大数据组件都安装在/opt目录下面

hadoop是java语言编写的,所以须要安装jdk,我这里已经安装了,安装jdk很是简单,能够自行查找方法,若是还不会的话,那么hadoop也能够不用学了。

下面安装hadoop,这个安装java同样简单。这里咱们使用社区版,直接去hadoop.apache.org网站下载便可,而后拷贝到个人阿里云、解压、配置环境变量便可。

而后在家目录中输入hdfs dfs,若是出现以下内容,证实安装而且环境变量配置都没问题

hadoop目录结构

hadoop目录结构以下,咱们依次来看。

bin目录

bin目录主要放一些有关服务的文件,好比hadoop、hdfs、yarn等等

etc目录

里面是有一个hadoop目录,可是hadoop目录里面有不少配置文件,将来咱们会改大概七八个左右,固然不用怕

include目录

这是与C语言有关的一些头文件,咱们不用管

lib目录

一些本地库,.so文件,至关于windows的.dll文件,这个不须要关注

libexec目录

和lib目录相似

sbin目录

很是重要的一个目录,存放了大量的启动文件,好比启动、关闭集群,启动、关闭yarn等等

share目录

存放了一些手册、案例等等

伪分布式环境搭建以及配置、启动hdfs

hadoop的运行模式有三种,单机模式、伪分布式、彻底分布式。

  • 单机模式:基本不用,不用管
  • 伪分布式:按照彻底分布式来进行搭建、配置,可是机器只有一台
  • 彻底分布式:真正意义上的多台机器

下面就来搭建伪分布式环境,首先要修改配置文件,以前咱们说要修改七八个,可是目前先配三个就行,慢慢来,全部的配置文件都在hadoop安装目录/etc/hadoop下面

hadoop-env.xml

export JAVA_HOME=/opt/java/jdk1.8.0_221(默认是${JAVA_HOME},须要手动改为java的安装路径)

core-site.xml

<!--指定hdfs中NameNode的地址-->
<!--这里之因此是localhost,是由于咱们只有一台机器,多台机器就要换成相应的主机名-->
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
</property>

<!--指定hadoop运行时产生文件的存储目录,若是不指定,那么重启以后就丢失了-->
<!--data目录会自动建立-->
<property>
    <name>hadoop.tmp.dir</name>
    <value>/opt/hadoop/hadoop-2.7.7/data</value>
</property>

全部更改完的配置,都放在configuration标签里面

hdfs-site.xml

<!--指定hdfs副本的数量,默认是3,咱们是伪分布式,因此改为1-->
<property>
    <name>dfs.replication</name>
    <value>1</value>
</property>

配置完毕,下面启动集群(伪)

格式化namenode(第一次启动时格式化,之后不须要总格式化)

hdfs namenode -format

查看以前的data目录,自动帮咱们建立了,并且里面也有东西了

启动namenode

若是把sbin目录也配置了环境变量,那么sbin/也不须要加
sbin/hadoop-daemon.sh start namenode

启动datanode

sbin/hadoop-daemon.sh start datanode

查看集群是否启动成功

输入jps

注意jps是java的一个命令,必须安装jdk以后才可以使用,不然会提示命令未找到,出现以下内容表示安装成功

输入ip:50070

若是能访问,也表示安装成功

NameNode格式化注意事项

思考:为何不能一直格式化NameNode,格式化NameNode须要注意什么?

格式化NameNode会产生新的集群id,致使NameNode和DataNode的集群id不一致,集群找不到以往的数据。因此格式化NameNode的时候,务必要先删除data数据和log日志,而后再格式化NameNode。由于二者须要有一个共同的id,这样才能交互。

配置、启动yarn

老规矩,先修改配置文件

yarn-env.sh

和hadoop-env.sh同样,配置java的路径
加上export JAVA_HOME=/opt/java/jdk1.8.0_221

yarn-site.xml

<!--reducer获取数据的方式-->
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>

<!--指定yarn的ResourceManager的地址-->
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>主机名</value>
</property>

mapred-env.sh

老规矩,遇到env.sh都是配JAVA_HOME

mapred-site.xml

可是会发现没有这个文件,不过有一个mapred-site.xml.template
能够cp mapred-site.xml.template mapred-site.xml

<!--指定MR运行在yarn上-->
<property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
</property>

修改完毕,下面启动集群

yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager

经过网页查看一下,输入ip:8088,咱们以前好像输入过50070,那个是查看hdfs的,8088是查看运行MapReduce程序的进程的。

配置运行历史服务

有时候咱们想看一下程序的历史运行状况,那么咱们能够配置一下。

插一句,目前一直在搭建、配置环境,固然后面会详细介绍hdfs、MapReduce的相关操做,目前先把环境打好。

固然若是对运行历史不感兴趣的话就跳过,固然最好仍是关注一下。

mapred-site.xml

<!--仍是这个配置文件,指定历史服务端地址-->
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>主机名:10020</value>
</property>

<!--历史web端地址-->
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>主机名:19888</value>
</property>

启动历史服务器:sbin/mr-jobhistory-daemon.sh start historyserver

jps查看历史服务器,或者http://ip:19888/jobhistory

hdfs介绍

下面就开始着重讲解hadoop组件之一的hdfs,以前的几张漫画只是提早感觉一下。还有目前咱们搭建的都是伪分布式,至于彻底分布式,因为我阿里云机器只有一台因此就不演示怎么搭建了。最主要的是笔者是python和golang系的,集群的搭建什么的应该用不到,环境什么的会有其余人负责,我只负责链接、计算什么的。后面也会介绍如何使用python和golang链接hdfs,因此彻底分布式如何搭建,这里就不介绍了。

咱们下面的演示都是基于伪分布式的

hdfs产生背景

随着数据量愈来愈大,在一台机器上没法存下全部的数据,那么就分配到更多的机器上,可是不方便管理和维护,所以迫切须要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。hdfs只是分布式文件管理系统中的一种

hdfs定义

hdfs(hadoop distributed file system),它是一个文件系统,用于存储文件,经过目录树来定位文件;其次它是分布式的,由不少服务器联合起来实现其功能,集群中的服务器有各自的角色

hdfs的使用场景

适合一次写入,屡次读出的场景,且不支持文件的修改。适合用来作数据分析,并不适合作网盘应用。

hdfs的优缺点

优势:

  • 高容错性
    • 数据自动保存多个副本,它经过增长副本的形式,提升容错性
    • 某一个副本丢失后,它能够自动恢复
  • 适合处理大数据
    • 数据规模:可以处理TB、甚至PB级别的数据
    • 文件规模:可以处理百万规模以上的文件数量,数量至关之大
  • 可构建在廉价的机器之上,经过多副本机制,提升可靠性

缺点:

  • 不适合低延时数据访问,若是你想作到毫秒级存储,别想了,作不到的

  • 没法高效地对大量小文件进行存储,存一个1G的数据比存10个100MB加上一个24MB的数据要高效不少

    至于为何,由于NameNode是否是惟一的啊,这就意味着空间是有限的,不可能像DataNode同样,容量不够了就加机器。而NameNode要记录文件的元数据,无论你是1KB,仍是1GB,都须要150字节的空间进行记录,若是全是小文件的话,是否是很耗费NameNode所在机器的空间呢?

    并且小文件存储的寻址时间会超过读取时间,它违反了hdfs的设计目标。

  • 不支持并发写入、文件随机修改

    一个文件只能有一个写,不容许多个线程同时写

    仅支持数据的append,不支持文件的随机修改

hdfs的架构

先看官网给的一张图

NameNode(nn):就是master,它是一个主管、管理者

  • 管理hdfs的名称空间
  • 配置副本策略
  • 管理数据块(block)映射信息
  • 处理客户端读写请求

DataNode(nn):就是slave,NameNode下达命令,DataNode执行实际的操做

  • 存储实际的数据块
  • 执行数据块的读/写操做

client:就是客户端

  • 文件切分,文件上传到hdfs的时候,client将文件切分红一个个的block,而后上传
  • 与NameNode交互,获取文件的位置信息
  • 与DataNode交互,读取或者写入数据
  • 客户端提供一些命令来管理hdfs,好比NameNode的格式化
  • 客户端能够经过一些命令来访问hdfs,好比对hdfs的增删改查操做

secondary NameNode:它不是NameNode的替补,当NameNode挂掉时,并不能立刻替换NameNode并提供服务。

  • 辅助NameNode,分担其工做量,好比按期合并Fsimage和Edits,并推送给NameNode
  • 紧急状况下,可辅助恢复NameNode(能够恢复一部分)

强烈建议结合以前的漫画来理解,会有更好的体验

hdfs块的大小设置

hdfs中的文件在物理上是分块存储(block),块的大小能够经过配置参数(df.blocksize)指定,默认大小在hadoop 2.x版本中是128M,老版本中是64M

思考:为何块不能设置过小,也不能设置太大?

1.hdfs块设置过小,会增长寻址时间,程序一直在找块的开始位置

2.hdfs块设置太大,从磁盘传输的时间会明显大于定位这个块的开始位置所须要的时间。致使程序在处理这块数据时,会很是慢。

总结:hdfs块的大小设置主要取决于磁盘的传输速率

hdfs shell命令(重点)

基本语法:hdfs dfs 命令 参数

这个hdfs的shell命令和linux是很是相似的,好比查看某个目录下的文件,linux中是ls,那么hdfs shell中就是hdfs dfs -ls,查看文件内容,hdfs dfs -cat filename,能够看到是很是类似的,只不过在hdfs shell中须要加上一个横杠。另外hdfs dfs还能够写成hadoop fs,对于shell操做来讲二者区别不大

启动集群

sbin/start-dfs.sh sbin/start-yarn.sh

以前咱们说过hadoop-daemon.sh和yarn-daemon.sh,那是对于伪分布式也就是单节点来讲的,若是是启动集群须要使用sbin/start-dfs.sh sbin/start-yarn.sh

hdfs dfs -help 命令

从linux命令也能看出来,这是一个查看命令使用方法的命令

hdfs dfs -ls 目录路径

查看某个目录有哪些文件,加上-R表示递归显示

hdfs dfs -mkdir 目录

在hdfs上面建立目录,加上-p表示递归建立,和linux是同样的

hdfs dfs -moveFromLocal 本地路径 hdfs路径

将本地文件或目录移动到hdfs上面,注意是移动,移完以后本地就没了

hdfs dfs -cat 文件

查看一个文件的内容

hdfs dfs -appendToFile 追加的文件 追加到哪一个文件

将一个文件的内容追加到另外一个文件里面去,好比本地有一个file.txt,那么hdfs dfs -appendToFile file.txt /a.txt表示将本地的file.txt文件里面的内容追加到hdfs上的/a.txt文件里面去

-chgrp、-chmod、-chown

更改组、更改权限、更改全部者,这个和linux中用法同样

hdfs dfs -copyFromLocal 本地路径 hdfs路径

将文件从本地拷贝到hdfs上面去,这个和刚才moveFromLocal就相似于linux中cp和mv

hdfs dfs -copyToLocal hdfs路径 本地路径

将hdfs上的文件拷贝到本地,这个路径是hdfs路径在前、本地路径在后。

hdfs dfs -cp hdfs路径 hdfs路径

copyFromLocal是针对本地和hdfs来讲了,cp是hdfs路径和hdfs路径之间的拷贝

hdfs dfs -mv hdfs路径 hdfs路径

不用说也能明白

hdfs dfs -get hdfs路径 本地路径

等同于copyToLocal

hdfs dfs -put 本地路径 hdfs路径

等同于copyFromLocal

hdfs dfs -getmerge hdfs路径(通配符) 本地路径

将hdfs上面的多个文件合并下载到本地

hdfs dfs -tail 文件名

显示文件的结尾

hdfs dfs -rm 文件

删除文件,若是是文件夹须要加上-r

hdfs dfs -rmdir 空目录

删除一个空目录,不经常使用,通常使用-rm

hdfs dfs -du 目录

统计目录的大小信息

hdfs dfs -du -h /:加上-h人性化显示

hdfs dfs -du -h -s / :查看当前目录的总大小

hdfs dfs -setrep 数值 文件

设置文件的副本数量,hdfs dfs -setrep 5 /file.txt:表示将file.txt的副本设置成5

python链接hdfs进行相关操做

下面咱们来介绍如何使用python操做hdfs,首先python若想操做hdfs,须要下载一个第三方库,也叫hdfs,直接pip install hdfs便可。

import hdfs
from pprint import pprint

# 导入相关模块,输入http://ip:50070,建立客户端
client = hdfs.Client("http://ip:50070")

client.list:查看当前目录的内容

print(client.list("/"))  # ['黑色相簿.txt', 'a.txt', 'b.txt', 'test']

# status默认为False,表示是否显示文件的相关属性
# 返回数据的格式为:[("", {}), ("", {}), ("", {}), ...]
pprint(client.list("/", status=True))
"""
[('黑色相簿.txt',
  {'accessTime': 1570347361399,
   'blockSize': 134217728,
   'childrenNum': 0,
   'fileId': 16393,
   'group': 'supergroup',
   'length': 0,
   'modificationTime': 1570343722271,
   'owner': 'root',
   'pathSuffix': '黑色相簿.txt',
   'permission': '644',
   'replication': 1,
   'storagePolicy': 0,
   'type': 'FILE'}),
 ('a.txt',
  {'accessTime': 1570347222071,
   'blockSize': 134217728,
   'childrenNum': 0,
   'fileId': 16386,
   'group': 'supergroup',
   'length': 26,
   'modificationTime': 1570344172155,
   'owner': 'root',
   'pathSuffix': 'a.txt',
   'permission': '755',
   'replication': 1,
   'storagePolicy': 0,
   'type': 'FILE'}),
 ('b.txt',
  {'accessTime': 1570347263315,
   'blockSize': 134217728,
   'childrenNum': 0,
   'fileId': 16396,
   'group': 'supergroup',
   'length': 10,
   'modificationTime': 1570347263628,
   'owner': 'root',
   'pathSuffix': 'b.txt',
   'permission': '644',
   'replication': 1,
   'storagePolicy': 0,
   'type': 'FILE'}),
 ('test',
  {'accessTime': 0,
   'blockSize': 0,
   'childrenNum': 2,
   'fileId': 16387,
   'group': 'supergroup',
   'length': 0,
   'modificationTime': 1570346913737,
   'owner': 'root',
   'pathSuffix': 'test',
   'permission': '755',
   'replication': 0,
   'storagePolicy': 0,
   'type': 'DIRECTORY'})]
"""

client.status:获取指定路径的状态信息

pprint(client.status("/"))
"""
{'accessTime': 0,
 'blockSize': 0,
 'childrenNum': 4,
 'fileId': 16385,
 'group': 'supergroup',
 'length': 0,
 'modificationTime': 1570347630036,
 'owner': 'root',
 'pathSuffix': '',
 'permission': '755',
 'replication': 0,
 'storagePolicy': 0,
 'type': 'DIRECTORY'}
"""

# 里面还有一个strict=True,表示严格模式
# 若是改成False,那么若是输入的路径不存在就返回None
# 为True的话,路径不存在,报错

client.makedirs:建立目录

print(client.list("/"))  # ['黑色相簿.txt', 'a.txt', 'b.txt', 'test']
# 会自动递归建立,若是想建立的时候给目录赋予权限,可使用permission参数,默认为None
client.makedirs("/a/b/c", permission=777)
print(client.list("/"))  # ['黑色相簿.txt', 'a', 'a.txt', 'b.txt', 'test']
print(client.list("/a"))  # ['b']
print(client.list("/a/b"))  # ['c']

client.rename:重命名

print(client.list("/"))  # ['黑色相簿.txt', 'a', 'a.txt', 'b.txt', 'test']
client.rename("/黑色相簿.txt", "/白色相簿")
print(client.list("/"))  # ['白色相簿', 'a', 'a.txt', 'b.txt', 'test']

client.write:往文件里面写内容

client.read:往文件里面读内容

关于写、读、上传、下载,若是报错,出现了requests.exceptions.ConnectionError:xxxxx,那么解决办法就是在你当前使用python的Windows机器上的hosts文件中增长以下内容:部署hadoop的服务器ip 部署hadoop的服务器主机名

若是出现了hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE,······异常,那么须要在hdfs-site.xml中加入以下内容

<property>
  <name>dfs.permissions</name>
  <value>false</value>
</property>

下面就开始写数据、读数据

with client.write("/这是一个不存在的文件.txt") as writer:
    # 须要传入字节
    writer.write(bytes("this file not exists", encoding="utf-8"))


with client.read("/这是一个不存在的文件.txt") as reader:
    # 读取出来也是字节类型
    print(reader.read())  # b'this file not exists'

write方法,若是不指定额外的参数,那么须要文件不能存在,不然会报错,提示文件已经存在。若是要对已存在的文件进行操做,那么须要显式的指定参数:overwrite(重写)或者append(追加)

with client.write("/白色相簿", append=True) as writer:
    writer.write(bytes("使人讨厌的冬天又来了,", encoding="utf-8"))


with client.read("/白色相簿") as reader:
    print(str(reader.read(), encoding="utf-8"))  # 使人讨厌的冬天又来了,
with client.write("/白色相簿", append=True) as writer:
    writer.write(bytes("冬天的街道,恋人们的微笑,让人想一把火全烧了", encoding="utf-8"))


with client.read("/白色相簿") as reader:
    # 因为是追加,以前的内容也读取出来了
    print(str(reader.read(), encoding="utf-8"))  # 使人讨厌的冬天又来了,冬天的街道,恋人们的微笑,让人想一把火全烧了
with client.write("/白色相簿", overwrite=True) as writer:
    writer.write(bytes("暖かい日差しが降り注いできて、眩しすぎ、目が見えない", encoding="utf-8"))


with client.read("/白色相簿") as reader:
    # 若是是overwrite,那么以前的内容就全没了
    print(str(reader.read(), encoding="utf-8"))  # 暖かい日差しが降り注いできて、眩しすぎ、目が見えない

注意:overwrite和append不能同时出现,不然报错

# 若是write里面传入了encoding参数,那么writer.write则须要写入str,由于会自动按照传入的encoding进行编码
with client.write("/白色相簿", overwrite=True, encoding="utf-8") as writer:
    girls = ["古明地觉", "古明地恋", "八重樱"]
    writer.write(str(girls))


# 同理若是传入了encoding参数,reader.read会读出str,由于会自动按照传入的encoding进行解码
with client.read("/白色相簿", encoding="utf-8") as reader:
    print(reader.read())  # ['古明地觉', '古明地恋', '八重樱']

client.content:查看目录的汇总状况

好比:当前目录下有多少个子目录、多少文件等等

print(client.content("/", strict=True))
# {'directoryCount': 6, 'fileCount': 6, 'length': 95, 'quota': 9223372036854775807, 'spaceConsumed': 95, 'spaceQuota': -1}

client.set_owner:设置全部者

client.set_permission:设置权限

client.set_replication:设置副本系数

client.set_times:设置时间

"""
def set_owner(self, hdfs_path, owner=None, group=None):
def set_permission(self, hdfs_path, permission):
def set_replication(self, hdfs_path, replication):
def set_times(self, hdfs_path, access_time=None, modification_time=None):
"""

client.resolve: 将带有符号的路径,转换成绝对、规范化路径

# 固然并不要求路径真实存在
print(client.resolve("/白色相簿/白色相簿/.."))  # /白色相簿

client.walk:递归遍历目录

# 递归遍历文件,相似于os.walk,会返回一个生成器,能够进行迭代
# 每一步迭代的内容是一个三元组,("路径", ["目录1", "目录2"], ["文件1", "文件2", "文件3"])
for file in client.walk("/"):
    print(file)
"""
('/', ['a', 'test'], ['白色相簿', '这是一个不存在的文件.txt', 'a.txt', 'b.txt'])
('/a', ['b'], [])
('/a/b', ['c'], [])
('/a/b/c', [], [])
('/test', ['test1'], ['黑色相簿.txt'])
('/test/test1', [], ['b.txt'])
"""

client.upload:上传文件

print("2.py" in client.list("/"))  # False
client.upload(hdfs_path="/", local_path="2.py")
print("2.py" in client.list("/"))  # True

client.download:下载文件

client.download(hdfs_path="/白色相簿", local_path="白色相簿")
print(open("白色相簿", "r", encoding="utf-8").read())  # ['古明地觉', '古明地恋', '八重樱']

client.checksum:获取文件的校验和

# 获取文件的校验和
print(client.checksum("/白色相簿"))
# {'algorithm': 'MD5-of-0MD5-of-512CRC32C', 'bytes': '00000200000000000000000095b1c9929656ce2b779093c67c95b76000000000', 'length': 28}

client.delete:删除文件或目录

# recursive表示是否递归删除,默认为False
try:
    client.delete("/test")
except Exception as e:
    print(e)  # `/test is non empty': Directory is not empty

print("test" in client.list("/"))  # True
client.delete("/test", recursive=True)
print("test" in client.list("/"))  # False

golang操做hdfs

golang链接hdfs一样须要一个第三方驱动,golang链接hdfs的驱动推荐两个,一个也叫hdfs,另外一个叫gowfs。先来看看怎么安装。

安装hdfs稍微有点费劲,首先能够经过go get github.com/colinmarc/hdfs下载,由于总所周知的缘由,不出意外会失败,报出以下错误

那么咱们须要先在gopath(第三方包安装的位置,通常是进入C盘,点击用户,再点击你的用户名对应的目录,而后就会看到一个go目录,我这里是C:\Users\satori\go)的src目录下新建golang.org/x目录,而后进入到golang.org/x下,在此处打开命令窗口,而后执行git clone https://github.com/golang/crypto

而后再执行go get github.com/colinmarc/hdfs就没问题了

至于安装gowfs就简单多了,直接go get github.com/vladimirvivien/gowfs就没问题了

可是咱们使用哪个呢?我的推荐使用gowfs,下面咱们就来看看怎么用。

读取文件

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
    "io/ioutil"
)

func main() {
    //这是配置,传入Addr: "ip: 50070", User: "随便写一个英文名就行"
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    //返回一个客户端(这里面叫文件系统)和error
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:",err))
    }
    
    //这里不能直接传入文件名,而是须要做为gowfs.Path结构体的Name参数的值
    //而后将Path传进去,咱们后面的api都是这样作的
    path := gowfs.Path{Name:"/whitealbum.txt"}
    
    //接收以下参数:gowfs.Path,offset(偏移量),长度(显然是字节的长度), 容量(本身的cap)
    //返回一个io.ReadCloser,这是须要实现io.Reader和io.Closer的接口
    reader, _ := client.Open(path, 0, 512, 2048)
    
    //可使用reader.Read(buf)的方式循环读取,也能够丢给ioutil。ReadAll,一次性所有读取
    data, _ := ioutil.ReadAll(reader)
    fmt.Println(string(data))
    /*
    白色相簿什么的,已经无所谓了。
    由于已经再也不有歌,值得去唱了。
    传达不了的恋情,已经不须要了。
    由于已经再也不有人,值得去爱了。
     */
}

查看目录有哪些内容

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    path := gowfs.Path{Name: "/"}
    // 返回[]FileStatus和error
    //这个FileStatus是什么?咱们看一下源码
    /*
        type FileStatus struct {
            AccesTime        int64  访问时间
            BlockSize        int64  块大小,只针对文件(134217728 Bytes,128 MB),目录的话为0
            Group            string 所属组
            Length           int64  文件的字节数(目录为0)
            ModificationTime int64  修改时间
            Owner            string 全部者
            PathSuffix       string 文件后缀,说白了就是文件名
            Permission       string 权限
            Replication      int64  副本数
            Type             string 类型,文本的话是FILE,目录的话是DIRECTORY
        }
    */
    fs_arr, _ := client.ListStatus(path)
    fmt.Println(fs_arr)
    // [{0 0 supergroup 0 1570359570447 dr.who tkinter 755 0 DIRECTORY} {0 134217728 supergroup 184 1570359155457 root whitealbum.txt 644 1 FILE}]

    for _, fs := range fs_arr {
        fmt.Println("文件名:", fs.PathSuffix)
        /*
        文件名: tkinter
        文件名: whitealbum.txt
         */
    }
    
    //FileStatus里面包含了文件的详细信息,若是想查看某个文件的详细信息
    //可使用fs, err := client.GetFileStatus(path)
}

建立文件

package main

import (
    "bytes"
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    path := gowfs.Path{Name: "/黑色相簿.txt"}

    /*
    Create函数接收以下参数。
    data:io.Reader,一个实现了io.Reader接口的struct
    Path:很简单,就是咱们这里的path
    overwrite:是否覆盖,若是为false表示不覆盖,那么要求文件不能存在,不然报错
    blocksize:块大小
    replication:副本
    permission:权限
    buffersize:缓存大小
    contenttype:内容类型

    返回一个bool和error
     */
    if flag, err :=client.Create(
        bytes.NewBufferString("这是黑色相簿,不是白色相簿"), //若是不指定内容,就直接bytes.NewBufferString()便可
        path, //路径
        false,//不覆盖
        0,
        0,
        0666,
        0,
        "text/html",  //纯文本格式
        ); err != nil {
            fmt.Println("建立文件出错,错误为:", err)
    } else {
        fmt.Println("建立文件成功, flag =", flag)  //建立文件成功, flag = true
    }
}

查看一下

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
    "io/ioutil"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    path := gowfs.Path{Name: "/黑色相簿.txt"}

    reader , _ := client.Open(path, 0, 512, 2048)
    data, _ := ioutil.ReadAll(reader)
    fmt.Println(string(data)) // 这是黑色相簿,不是白色相簿
}

建立目录

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    path := gowfs.Path{Name: "/a/b/c"}
    
    //递归建立
    flag, err := client.MkDirs(path, 0666)
    fmt.Println(flag) // true

    fs_arr, _ := client.ListStatus(gowfs.Path{Name:"/"})
    for _, fs := range fs_arr{
        fmt.Println(fs.PathSuffix)
        /*
        黑色相簿.txt
        a
        tkinter
        whitealbum.txt
         */
    }
}

重命名

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    fs_arr, _ := client.ListStatus(gowfs.Path{Name:"/"})
    for _, fs := range fs_arr{
        fmt.Println(fs.PathSuffix)
        /*
        黑色相簿.txt
        a
        tkinter
        whitealbum.txt
         */
    }

    flag, err := client.Rename(gowfs.Path{Name:"/黑色相簿.txt"}, gowfs.Path{Name:"/blackalbum.txt"})
    fmt.Println(flag) // true
    fs_arr, _ = client.ListStatus(gowfs.Path{Name:"/"})
    for _, fs := range fs_arr{
        fmt.Println(fs.PathSuffix)
        /*
            a
            blackalbum.txt
            tkinter
            whitealbum.txt
        */
    }
}

向已经存在的文件追加内容

package main

import (
    "bytes"
    "fmt"
    "github.com/vladimirvivien/gowfs"
    "io/ioutil"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    path := gowfs.Path{Name: "/whitealbum.txt"}
    reader, _ := client.Open(path, 0, 512, 2048)
    data, _ := ioutil.ReadAll(reader)
    fmt.Println(string(data))
    /*
    白色相簿什么的,已经无所谓了。
    由于已经再也不有歌,值得去唱了。
    传达不了的恋情,已经不须要了。
    由于已经再也不有人,值得去爱了。
     */

    //参数1:内容,必须是实现了io.Reader接口
    //参数2:路径
    //参数3:缓存大小
    flag, err := client.Append(bytes.NewBufferString("\n让人讨厌的冬天又来了"), path, 2048)
    fmt.Println(flag) // true

    reader, _ = client.Open(path, 0, 512, 2048)
    data, _ = ioutil.ReadAll(reader)
    fmt.Println(string(data))
    /*
    白色相簿什么的,已经无所谓了。
    由于已经再也不有歌,值得去唱了。
    传达不了的恋情,已经不须要了。
    由于已经再也不有人,值得去爱了。
    
    让人讨厌的冬天又来了。
     */
}

设置文件或目录的全部者

func (fs *FileSystem) SetOwner(path Path, owner string, group string) (bool, error)

设置文件或目录的权限

func (fs *FileSystem) SetPermission(path Path, permission os.FileMode) (bool, error)

设置文件或目录的副本系数

func (fs *FileSystem) SetReplication(path Path, replication uint16) (bool, error)

设置文件或目录的访问时间和修改时间

func (fs *FileSystem) SetTimes(path Path, accesstime int64, modificationtime int64) (bool, error)

获取文件的校验和

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    path := gowfs.Path{Name: "/whitealbum.txt"}
    f, _ := client.GetFileChecksum(path)
    fmt.Println(f)  // {MD5-of-0MD5-of-512CRC32C 0000020000000000000000001255073187d3e801940eee180acebe4e00000000 28}
    fmt.Println(f.Algorithm, f.Length, f.Bytes) // MD5-of-0MD5-of-512CRC32C 28 0000020000000000000000001255073187d3e801940eee180acebe4e00000000
}

删除文件

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    path := gowfs.Path{Name:"/blackalbum.txt"}
    //路径,是否递归
    flag, _ := client.Delete(path, true)
    fmt.Println(flag) // true
}

判断文件是否存在

为何这里用蓝色了,由于以后的用法就不同了

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, err := gowfs.NewFileSystem(config)
    if err != nil {
        panic(fmt.Sprintln("出现异常,异常信息为:", err))
    }

    //建立一个shell,可使用下面shell进行操做
    shell := gowfs.FsShell{FileSystem: client}

    //直接传字符串便可,不须要传Path了
    flag, _ := shell.Exists("/whitealbum.txt")
    fmt.Println(flag) // true
    flag, _ = shell.Exists("/whitealbum.txt1")
    fmt.Println(flag) // false
}

改变全部者

flag, _ := shell.Chown([]string{"/file1", "/file2", "/file3"}, "owner")

改变所属组

flag, _ := shell.Chgrp([]string{"/file1", "/file2", "/file3"}, "groupName")

改变权限

flag, _ := shell.Chmod([]string{"/file1", "/file2", "/file3"}, 0666)

查看文件内容

package main

import (
    "bytes"
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, _ := gowfs.NewFileSystem(config)

    shell := gowfs.FsShell{FileSystem: client}

    buf := bytes.Buffer{}
    if err := shell.Cat([]string{"/whitealbum.txt"}, &buf); err != nil {
        fmt.Println("err =", err)
    } else {
        fmt.Println(buf.String())
        /*
            白色相簿什么的,已经无所谓了。
            由于已经再也不有歌,值得去唱了。
            传达不了的恋情,已经不须要了。
            由于已经再也不有人,值得去爱了。

            让人讨厌的冬天又来了
        */
    }
}

追加文件内容

package main

import (
    "bytes"
    "fmt"
    "github.com/vladimirvivien/gowfs"
    "io/ioutil"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, _ := gowfs.NewFileSystem(config)

    shell := gowfs.FsShell{FileSystem: client}

    _ = ioutil.WriteFile("aaa.txt", []byte("\n冬马小三\n"), 0666)
    _ = ioutil.WriteFile("bbb.txt", []byte("雪菜碧池\n"), 0666)
    _ = ioutil.WriteFile("ccc.txt", []byte("打死春哥\n"), 0666)
    _ = ioutil.WriteFile("ddd.txt", []byte("抱走冬马雪菜"), 0666)

    _, _ = shell.AppendToFile([]string{"aaa.txt", "bbb.txt", "ccc.txt", "ddd.txt"}, "/whitealbum.txt")

    buf := bytes.Buffer{}
    _ = shell.Cat([]string{"/whitealbum.txt"}, &buf)
    fmt.Println(buf.String())
    /*
        白色相簿什么的,已经无所谓了。
        由于已经再也不有歌,值得去唱了。
        传达不了的恋情,已经不须要了。
        由于已经再也不有人,值得去爱了。

        让人讨厌的冬天又来了
        冬马小三
        雪菜碧池
        打死春哥
        抱走冬马雪菜
    */
}

上传文件

package main

import (
    "fmt"
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, _ := gowfs.NewFileSystem(config)

    shell := gowfs.FsShell{FileSystem: client}

    //本地路径,hdfs路径,是否重写
    _, _ = shell.Put("aaa.txt", "/aaa.txt", false)

    path := gowfs.Path{Name: "/"}
    fs_arr, _ := client.ListStatus(path)
    for _, fs := range fs_arr {
        fmt.Println(fs.PathSuffix)
        /*
            黑色相簿.txt
            a
            aaa.txt
            tkinter
            whitealbum.txt
        */
    }
}

下载文件

package main

import (
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, _ := gowfs.NewFileSystem(config)

    shell := gowfs.FsShell{FileSystem: client}

    _, _ = shell.Get("/whitealbum.txt", "白色album.txt")
}

删除文件

package main

import (
    "github.com/vladimirvivien/gowfs"
)

func main() {
    config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
    client, _ := gowfs.NewFileSystem(config)

    shell := gowfs.FsShell{FileSystem: client}

    _, _ = shell.Rm("/whitealbum.txt")
}

MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发"基于hadoop的数据分析应用"的核心框架

MapReduce的核心功能是将用户编写的业务逻辑代码自带默认组件组合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

MapReduce优缺点

优势

  • MapReduce易于编程

    它简单地实现一些接口,就能够完成一个分布式应用程序,这个分布式应用程序能够分布到大量廉价的pc机器上运行。也就是说,你写一个分布式应用程序,跟写一个简单的串行程序是如出一辙的。就是由于这个特色,使得MapReduce编程很是流行

  • 良好的扩展性

    当你的计算资源不足时,你能够经过简单的增长机器来扩展计算能力

  • 高容错性

    MapReduce设计的初衷就是使程序可以运行在廉价的PC机器上,这就要求它具备很高的容错性。好比其中一台机器挂了,它能够把上面的计算任务转移到另外一个节点上运行,不至于这个任务彻底失败。并且这个过程不须要人工参与,是由hadoop内部完成的。

  • 适合PB级以上海量数据的离线处理

    能够实现上千台服务器集群并发工做,提升数据处理能力

缺点

  • 不擅长实时计算

    MapReduce没法像mysql同样,能够在毫秒级或者秒级内返回结果

  • 不擅长流式计算

    流式计算输入的数据是动态的,而MapReduce的数据数据必须是静态的,不能动态变化。这是由于MapReduce自身的设计特色决定了数据源必须是静态的

  • 不支持DAG(有向无环图)计算

    多个程序之间存在依赖,后一个应用程序的输入依赖于上一个程序的输出。在这种状况,MapReduce不是不能作,而是使用后,每一个MapReduce做业的输出结果都会写入到磁盘,而后再从磁盘中读取,进行下一个操做,这样作会形成大量的磁盘IO,致使性能很是的低下。

相关文章
相关标签/搜索