HDFS 是一个分布式文件系统,在 HDFS 上写文件的过程与咱们平时使用的单机文件系统很是不一样,从宏观上来看,在 HDFS 文件系统上建立并写一个文件,流程以下图(来自《Hadoop:The Definitive Guide》一书)所示:node
具体过程描述以下:算法
下面代码使用 Hadoop 的 API 来实现向 HDFS 的文件写入数据,一样也包括建立一个文件和写数据两个主要过程,代码以下所示:缓存
static String[] contents = new String[] {
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
"dddddddddddddddddddddddddddddddd",
"eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
};
public static void main(String[] args) {
String file = "hdfs://h1:8020/data/test/test.log";
Path path = new Path(file);
Configuration conf = new Configuration();
FileSystem fs = null;
FSDataOutputStream output = null;
try {
fs = path.getFileSystem(conf);
output = fs.create(path); // 建立文件
for(String line : contents) { // 写入数据
output.write(line.getBytes("UTF-8"));
output.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
结合上面的示例代码,咱们先从 fs.create(path); 开始,能够看到 FileSystem 的实现 DistributedFileSystem 中给出了最终返回 FSDataOutputStream 对象的抽象逻辑,代码以下所示:bash
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream
(dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
}
复制代码
上面,DFSClient dfs 的 create 方法中建立了一个 OutputStream 对象,在 DFSClient 的 create 方法:数据结构
public OutputStream create(String src,
FsPermission permission,
boolean overwrite,
boolean createParent,
short replication,
long blockSize,
Progressable progress,
int buffersize
) throws IOException {
... ...
}
复制代码
建立了一个 DFSOutputStream 对象,以下所示:并发
final DFSOutputStream result = new DFSOutputStream(src, masked,
overwrite, createParent, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
复制代码
下面,咱们从 DFSOutputStream 类开始,说明其内部实现原理。分布式
DFSOutputStream 内部原理ide
打开一个 DFSOutputStream 流,Client 会写数据到流内部的一个缓冲区中,而后数据被分解成多个 Packet,每一个 Packet 大小为 64k 字节,每一个 Packet 又由一组 chunk 和这组 chunk 对应的 checksum 数据组成,默认 chunk 大小为 512 字节,每一个 checksum 是对 512 字节数据计算的校验和数据。 当 Client 写入的字节流数据达到一个 Packet 的长度,这个 Packet 会被构建出来,而后会被放到队列 dataQueue 中,接着 DataStreamer 线程会不断地从 dataQueue 队列中取出 Packet,发送到复制 Pipeline 中的第一个 DataNode 上,并将该 Packet 从 dataQueue 队列中移到 ackQueue 队列中。ResponseProcessor 线程接收从 Datanode 发送过来的 ack,若是是一个成功的 ack,表示复制 Pipeline 中的全部 Datanode 都已经接收到这个 Packet,ResponseProcessor 线程将 packet 从队列 ackQueue 中删除。 在发送过程当中,若是发生错误,全部未完成的 Packet 都会从 ackQueue 队列中移除掉,而后从新建立一个新的 Pipeline,排除掉出错的那些 DataNode 节点,接着 DataStreamer 线程继续从 dataQueue 队列中发送 Packet。 下面是 DFSOutputStream 的结构及其原理,如图所示:oop
咱们从下面 3 个方面来描述内部流程:ui
Client 写数据时,会将字节流数据缓存到内部的缓冲区中,当长度知足一个 Chunk 大小(512B)时,便会建立一个 Packet 对象,而后向该 Packet 对象中写 Chunk Checksum 校验和数据,以及实际数据块 Chunk Data,校验和数据是基于实际数据块计算获得的。每次知足一个 Chunk 大小时,都会向 Packet 中写上述数据内容,直到达到一个 Packet 对象大小(64K),就会将该 Packet 对象放入到 dataQueue 队列中,等待 DataStreamer 线程取出并发送到 DataNode 节点。
DataStreamer 线程从 dataQueue 队列中取出 Packet 对象,放到 ackQueue 队列中,而后向 DataNode 节点发送这个 Packet 对象所对应的数据。
发送一个 Packet 数据包之后,会有一个用来接收 ack 的 ResponseProcessor 线程,若是收到成功的 ack,则表示一个 Packet 发送成功。若是成功,则 ResponseProcessor 线程会将 ackQueue 队列中对应的 Packet 删除。
DFSOutputStream 初始化
首先看一下,DFSOutputStream 的初始化过程,构造方法以下所示:
DFSOutputStream(String src, FsPermission masked, boolean overwrite,
boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
this(src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默认 writePacketSize=64*1024(即64K),bytesPerChecksum=512(没512个字节计算一个校验和),
try {
if (createParent) { // createParent为true表示,若是待建立的文件的父级目录不存在,则自动建立
namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} else {
namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
}
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
streamer.start(); // 启动一个DataStreamer线程,用来将写入的字节流打包成packet,而后发送到对应的Datanode节点上
}
上面computePacketChunkSize方法计算了一个packet的相关参数,咱们结合代码来查看,以下所示:
int chunkSize = csize + checksum.getChecksumSize();
int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
packetSize = n + chunkSize*chunksPerPacket;
复制代码
咱们用默认的参数值替换上面的参数,获得:
int chunkSize = 512 + 4;
int n = 21 + 4;
chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1); // 127
packetSize = 25 + 516*127;
复制代码
上面对应的参数,说明以下表所示:
参数名称 | 参数值 | 参数含义 |
---|---|---|
chunkSize | 512+4=516 | 每一个 chunk 的字节数(数据 + 校验和) |
csize | 512 | 每一个 chunk 数据的字节数 |
psize | 64*1024 | 每一个 packet 的最大字节数(不包含 header) |
DataNode.PKT_HEADER_LEN | 21 | 每一个 packet 的 header 的字节数 |
chunksPerPacket | 127 | 组成每一个 packet 的 chunk 的个数 |
packetSize | 25+516*127=65557 | 每一个 packet 的字节数(一个 header + 一组 chunk) |
在计算好一个 packet 相关的参数之后,调用 create 方法与 Namenode 进行 RPC 请求,请求建立文件:
if (createParent) { // createParent为true表示,若是待建立的文件的父级目录不存在,则自动建立
namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} else {
namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
}
复制代码
远程调用上面方法,会在 FSNamesystem 中建立对应的文件路径,并初始化与该建立的文件相关的一些信息,如租约(向 Datanode 节点写数据的凭据)。文件在 FSNamesystem 中建立成功,就要初始化并启动一个 DataStreamer 线程,用来向 Datanode 写数据,后面咱们详细说明具体处理逻辑。
Packet 结构与定义
Client 向 HDFS 写数据,数据会被组装成 Packet,而后发送到 Datanode 节点。Packet 分为两类,一类是实际数据包,另外一类是 heatbeat 包。一个 Packet 数据包的组成结构,如图所示:
上图中,一个 Packet 是由 Header 和 Data 两部分组成,其中 Header 部分包含了一个 Packet 的概要属性信息,以下表所示:
字段名称 | 字段类型 | 字段长度 | 字段含义 |
---|---|---|---|
pktLen | int | 4 | 4 + dataLen + checksumLen |
offsetInBlock | long | 8 | Packet 在 Block 中偏移量 |
seqNo | long | 8 | Packet 序列号,在同一个 Block 惟一 |
lastPacketInBlock | boolean | 1 | 是不是一个 Block 的最后一个 Packet |
dataLen | int | 4 | dataPos – dataStart,不包含 Header 和 Checksum 的长度 |
Data 部分是一个 Packet 的实际数据部分,主要包括一个 4 字节校验和(Checksum)与一个 Chunk 部分,Chunk 部分最大为 512 字节。 在构建一个 Packet 的过程当中,首先将字节流数据写入一个 buffer 缓冲区中,也就是从偏移量为 25 的位置(checksumStart)开始写 Packet 数据的 Chunk Checksum 部分,从偏移量为 533 的位置(dataStart)开始写 Packet 数据的 Chunk Data 部分,直到一个 Packet 建立完成为止。若是一个 Packet 的大小未能达到最大长度,也就是上图对应的缓冲区中,Chunk Checksum 与 Chunk Data 之间还保留了一段未被写过的缓冲区位置,这种状况说明,已经在写一个文件的最后一个 Block 的最后一个 Packet。在发送这个 Packet 以前,会检查 Chunksum 与 Chunk Data 之间的缓冲区是否为空白缓冲区(gap),若是有则将 Chunk Data 部分向前移动,使得 Chunk Data 1 与 Chunk Checksum N 相邻,而后才会被发送到 DataNode 节点。 咱们看一下 Packet 对应的 Packet 类定义,定义了以下一些字段:
ByteBuffer buffer; // only one of buf and buffer is non-null
byte[] buf;
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // 该packet在block中的偏移量
boolean lastPacketInBlock; // is this the last packet in block?
int numChunks; // number of chunks currently in packet
int maxChunks; // 一个packet中包含的chunk的个数
int dataStart;
int dataPos;
int checksumStart;
int checksumPos;
复制代码
Packet 类有一个默认的没有参数的构造方法,它是用来作 heatbeat 的,以下所示:
Packet() {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO; // 值为-1
buffer = null;
int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25
buf = new byte[packetSize];
checksumStart = dataStart = packetSize;
checksumPos = checksumStart;
dataPos = dataStart;
maxChunks = 0;
}
复制代码
经过代码能够看到,一个 heatbeat 的内容,实际上只有一个长度为 25 字节的 header 数据。经过 this.seqno = HEART_BEAT_SEQNO; 的值能够判断一个 packet 是不是 heatbeat 包,若是 seqno 为 - 1 表示这是一个 heatbeat 包。
Client 发送 Packet 数据
能够 DFSClient 类中看到,发送一个 Packet 以前,首先须要向选定的 DataNode 发送一个 Header 数据包,代表要向 DataNode 写数据,该 Header 的数据结构,如图所示:
上图显示的是 Client 发送 Packet 到第一个 DataNode 节点的 Header 数据结构,主要包括待发送的 Packet 所在的 Block(先向 NameNode 分配 Block ID 等信息)的相关信息、Pipeline 中另外 2 个 DataNode 的信息、访问令牌(Access Token)和校验和信息,Header 中各个字段及其类型,详见下表:
字段名称 | 字段类型 | 字段长度 | 字段含义 |
---|---|---|---|
Transfer Version | short | 2 | Client 与 DataNode 之间数据传输版本号,由常量 DataTransferProtocol.DATA_TRANSFER_VERSION 定义,值为 17 |
OP | int | 4 | 操做类型,由常量 DataTransferProtocol.OP_WRITE_BLOCK 定义,值为 80 |
blkId | long | 8 | Block 的 ID 值,由 NameNode 分配 |
GS | long | 8 | 时间戳(Generation Stamp),NameNode 分配 blkId 的时候生成的时间戳 |
DNCnt | int | 4 | DataNode 复制 Pipeline 中 DataNode 节点的数量 |
Recovery Flag | boolean | 1 | Recover 标志 |
Client | Text | Client 主机的名称,在使用 Text 进行序列化的时候,实际包含长度 len 与主机名称字符串 ClientHost | |
srcNode | boolean | 1 | 是否发送 src node 的信息,默认值为 false,不发送 src node 的信息 |
nonSrcDNCnt | int | 4 | 由 Client 写的该 Header 数据,该数不包含 Pipeline 中第一个节点(即为 DNCnt-1) |
DN2 | DatanodeInfo | DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
DN3 | DatanodeInfo | DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
Access Token | Token | 访问令牌信息,包括 IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service | |
CheckSum Header | DataChecksum | 1+4 | 校验和 Header 信息,包括 type、bytesPerChecksum |
Header 数据包发送成功,Client 会收到一个成功响应码(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着将 Packet 数据发送到 Pipeline 中第一个 DataNode 上,以下所示:
Packet one = null;
one = dataQueue.getFirst(); // regular data packet
ByteBuffer buf = one.getBuffer();
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
if (one.lastPacketInBlock) { // 若是是Block中的最后一个Packet,还要写入一个0标识该Block已经写入完成
blockStream.writeInt(0); // indicate end-of-block
}
复制代码
不然,若是失败,则会与 NameNode 进行 RPC 调用,删除该 Block,并把该 Pipeline 中第一个 DataNode 加入到 excludedNodes 列表中,代码以下所示:
if (!success) {
LOG.info("Abandoning " + block);
namenode.abandonBlock(block, src, clientName);
if (errorIndex < nodes.length) {
LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.add(nodes[errorIndex]);
}
// Connection failed. Let's wait a little bit and retry retry = true; } 复制代码
DataNode 端服务组件
数据最终会发送到 DataNode 节点上,在一个 DataNode 上,数据在各个组件之间流动,流程以下图所示:
DataNode 服务中建立一个后台线程 DataXceiverServer,它是一个 SocketServer,用来接收来自 Client(或者 DataNode Pipeline 中的非最后一个 DataNode 节点)的写数据请求,而后在 DataXceiverServer 中将链接过来的 Socket 直接派发给一个独立的后台线程 DataXceiver 进行处理。因此,Client 写数据时链接一个 DataNode Pipeline 的结构,实际流程如图所示:
每一个 DataNode 服务中的 DataXceiver 后台线程接收到来自前一个节点(Client/DataNode)的 Socket 链接,首先读取 Header 数据:
Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong());
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress);
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
boolean isRecovery = in.readBoolean(); // is this part of recovery?
String client = Text.readString(in); // working on behalf of this client
boolean hasSrcDataNode = in.readBoolean(); // is src node info present
if (hasSrcDataNode) {
srcDataNode = new DatanodeInfo();
srcDataNode.readFields(in);
}
int numTargets = in.readInt();
if (numTargets < 0) {
throw new IOException("Mislabelled incoming datastream.");
}
DatanodeInfo targets[] = new DatanodeInfo[numTargets];
for (int i = 0; i < targets.length; i++) {
DatanodeInfo tmp = new DatanodeInfo();
tmp.readFields(in);
targets[i] = tmp;
}
Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
复制代码
上面代码中,读取 Header 的数据,与前一个 Client/DataNode 写入 Header 字段的顺序相对应,再也不累述。在完成读取 Header 数据后,当前 DataNode 会首先将 Header 数据再发送到 Pipeline 中下一个 DataNode 结点,固然该 DataNode 确定不是 Pipeline 中最后一个 DataNode 节点。接着,该 DataNode 会接收来自前一个 Client/DataNode 节点发送的 Packet 数据,接收 Packet 数据的逻辑实际上在 BlockReceiver 中完成,包括未来自前一个 Client/DataNode 节点发送的 Packet 数据写入本地磁盘。在 BlockReceiver 中,首先会将接收到的 Packet 数据发送写入到 Pipeline 中下一个 DataNode 节点,而后再将接收到的数据写入到本地磁盘的 Block 文件中。
DataNode 持久化 Packet 数据
在 DataNode 节点的 BlockReceiver 中进行 Packet 数据的持久化,一个 Packet 是一个 Block 中一个数据分组,咱们首先看一下,一个 Block 在持久化到磁盘上的物理存储结构,以下图所示:
每一个 Block 文件(如上图中 blk_1084013198 文件)都对应一个 meta 文件(如上图中 blk_1084013198_10273532.meta 文件),Block 文件是一个一个 Chunk 的二进制数据(每一个 Chunk 的大小是 512 字节),而 meta 文件是与每个 Chunk 对应的 Checksum 数据,是序列化形式存储。
写文件过程当中 Client/DataNode 与 NameNode 进行 RPC 调用
Client 在 HDFS 文件系统中写文件过程当中,会发生屡次与 NameNode 节点进行 RPC 调用来完成写数据相关操做,主要是在以下时机进行 RPC 调用:
具体 RPC 调用的详细过程,能够参考源码。