doc:http://pyhdfs.readthedocs.io/en/latest/html
pip install hdfs python
https://hdfscli.readthedocs.io/en/latest/quickstart.htmlgit
此外还有一个库pyhdfsgithub
https://github.com/jingw/pyhdfs/blob/master/README.rstshell
通常也能够直接hadoop HDFS 执行hdfscli command操做编程
默认状况下,HdfsCLI带有单个入口点hdfscli
,该入口点提供了方便的界面来执行常见操做。它的全部命令都接受一个自 --alias
变量(如上所述),该自变量定义了针对哪一个集群进行操做。json
HdfsCLI支持从HDFS透明地下载和上传文件和文件夹(咱们也能够使用该--threads
选项指定并行度)。api
$ # Write a single file to HDFS. $ hdfscli upload --alias=dev weights.json models/ $ # Read all files inside a folder from HDFS and store them locally. $ hdfscli download export/results/ "results-$(date +%F)"
若是读取(或写入)单个文件,则还能够经过将其内容-
用做路径参数,将其内容流式传输到标准输出(从标准输入返回)。session
$ # Read a file from HDFS and append its contents to a local log file. $ hdfscli download logs/1987-03-23.txt - >>logs
默认状况下,若是尝试写入现有路径(在本地或在HDFS上),HdfsCLI将引起错误。咱们能够使用该--force
选项强制覆盖路径 。app
该interactive
命令(在未指定任何命令时也使用)将建立一个HDFS客户端,并将其公开在python shell中(若是可用,请使用IPython)。这使得在HDFS上执行文件系统操做并与其数据进行交互变得很方便。有关可用方法的概述,请参见下面的Python绑定。
$ hdfscli --alias=dev Welcome to the interactive HDFS python shell. The HDFS client is available as `CLIENT`. In [1]: CLIENT.list('data/') Out[1]: ['1.json', '2.json'] In [2]: CLIENT.status('data/2.json') Out[2]: { 'accessTime': 1439743128690, 'blockSize': 134217728, 'childrenNum': 0, 'fileId': 16389, 'group': 'supergroup', 'length': 2, 'modificationTime': 1439743129392, 'owner': 'drwho', 'pathSuffix': '', 'permission': '755', 'replication': 1, 'storagePolicy': 0, 'type': 'FILE' } In [3]: CLIENT.delete('data/2.json') Out[3]: True
利用python的所有功能,咱们能够轻松地执行更复杂的操做,例如重命名与某些模式匹配的文件夹,删除一段时间未访问的文件,查找某个用户拥有的全部路径等。
cf. 有关命令和选项的完整列表。hdfscli --help
获取hdfs.client.Client
实例的最简单方法是使用上述的Interactive Shell,在该Shell中客户端将自动可用。要以编程方式实例化客户端,有两种选择:
第一种是导入客户端类并直接调用其构造函数。这是最直接,最灵活的方法,可是不容许咱们重复使用已配置的别名:
from hdfs import InsecureClient client = InsecureClient('http://host:port', user='ann')
第二种方法利用hdfs.config.Config
该类加载现有的配置文件(默认与CLI相同)并从现有别名建立客户端:
from hdfs import Config client = Config().get_client('dev')
该read()
方法提供了相似文件的界面,用于从HDFS读取文件。它必须在一个with
块中使用(确保始终正确关闭链接):
# Loading a file in memory. with client.read('features') as reader: features = reader.read() # Directly deserializing a JSON object. with client.read('model.json', encoding='utf-8') as reader: from json import load model = load(reader)
若是chunk_size
传递了参数,则该方法将返回一个生成器,有时使流文件内容更简单。
# Stream a file. with client.read('features', chunk_size=8096) as reader: for chunk in reader: pass
一样,若是delimiter
传递了一个参数,则该方法将返回定界块的生成器。
with client.read('samples.csv', encoding='utf-8', delimiter='\n') as reader: for line in reader: pass
使用如下write()
方法将文件写入HDFS:该方法返回相似文件的可写对象:
# Writing part of a file. with open('samples') as reader, client.write('samples') as writer: for line in reader: if line.startswith('-'): writer.write(line) # Writing a serialized JSON object. with client.write('model.json', encoding='utf-8') as writer: from json import dump dump(model, writer)
为了方便起见,还能够将可迭代的data
参数直接传递给该方法。
# This is equivalent to the JSON example above. from json import dumps client.write('model.json', dumps(model))
全部Client
子类都公开了各类与HDFS交互的方法。大多数都是在WebHDFS操做以后直接建模的,其中一些在下面的代码段中显示:
# Retrieving a file or folder content summary. content = client.content('dat') # Listing all files inside a directory. fnames = client.list('dat') # Retrieving a file or folder status. status = client.status('dat/features') # Renaming ("moving") a file. client.rename('dat/features', 'features') # Deleting a file or folder. client.delete('dat', recursive=True)
基于这些方法的其余方法可提供更多高级功能:
# Download a file or folder locally. client.download('dat', 'dat', n_threads=5) # Get all files under a given folder (arbitrary depth). import posixpath as psp fpaths = [ psp.join(dpath, fname) for dpath, _, fnames in client.walk('predictions') for fname in fnames ]
有关可用方法的完整列表,请参见API参考。
上述大多数方法都会HdfsError
在缺乏的路径上引起if调用。推荐的检查路径是否存在的方法是使用带有参数的content()
或 status()
方法strict=False
(在这种状况下,它们将None
在缺乏的路径上返回)。
请参阅高级用法部分以了解更多信息。
=========================
2:Client——建立集群链接
> from hdfs import *
> client = Client("http://s100:50070")
其余参数说明:
classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)
url:ip:端口
root:制定的hdfs根目录
proxy:制定登录的用户身份
timeout:设置的超时时间
session:链接标识
client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)
>>> client.list("/")
[u'home',u'input', u'output', u'tmp']
3:dir——查看支持的方法
>dir(client)
4:status——获取路径的具体信息
其余参数:status(hdfs_path, strict=True)
hdfs_path:就是hdfs路径
strict:设置为True时,若是hdfs_path路径不存在就会抛出异常,若是设置为False,若是路径为不存在,则返回None
5:list——获取指定路径的子目录信息
>client.list("/")
[u'home',u'input', u'output', u'tmp']
其余参数:list(hdfs_path, status=False)
status:为True时,也返回子目录的状态信息,默认为Flase
6:makedirs——建立目录
>client.makedirs("/123")
其余参数:makedirs(hdfs_path, permission=None)
permission:设置权限
>client.makedirs("/test",permission=777)
7: rename—重命名
>client.rename("/123","/test")
8:delete—删除
>client.delete("/test")
其余参数:delete(hdfs_path, recursive=False)
recursive:删除文件和其子目录,设置为False若是不存在,则会抛出异常,默认为False
9:upload——上传数据
>client.upload("/test","F:\[PPT]Google Protocol Buffers.pdf");
其余参数:upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None,
chunk_size=65536,progress=None, cleanup=True, **kwargs)
overwrite:是不是覆盖性上传文件
n_threads:启动的线程数目
temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完以后进行交换
chunk_size:文件上传的大小区间
progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将做为第二个参数
cleanup:若是在上传任何文件时发生错误,则删除该文件
10:download——下载
>client.download("/test/NOTICE.txt","/home")
11:read——读取文件
withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
print reader.read()
其余参数:read(*args, **kwds)
hdfs_path:hdfs路径
offset:设置开始的字节位置
length:读取的长度(字节为单位)
buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。
encoding:制定编码
chunk_size:若是设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个相似文件的对象
delimiter:若是设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。
progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,若是块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1做为第二个参数。