HDFS写文件过程分析html
HDFS是一个分布式文件系统,在HDFS上写文件的过程与咱们平时使用的单机文件系统很是不一样,从宏观上来看,在HDFS文件系统上建立并写一个文件,流程以下图(来自《Hadoop:The Definitive Guide》一书)所示:node
具体过程描述以下:git
更详细的流程:github
机架感知(副本节点选择):算法
下面代码使用Hadoop的API来实现向HDFS的文件写入数据,一样也包括建立一个文件和写数据两个主要过程,代码以下所示:缓存
static String[] contents = new String[] { "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", "dddddddddddddddddddddddddddddddd", "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", }; public static void main(String[] args) { String file = "hdfs://h1:8020/data/test/test.log"; Path path = new Path(file); Configuration conf = new Configuration(); FileSystem fs = null; FSDataOutputStream output = null; try { fs = path.getFileSystem(conf); output = fs.create(path); // 建立文件 for(String line : contents) { // 写入数据 output.write(line.getBytes("UTF-8")); output.flush(); } } catch (IOException e) { e.printStackTrace(); } finally { try { output.close(); } catch (IOException e) { e.printStackTrace(); } } }
结合上面的示例代码,咱们先从fs.create(path);开始,能够看到FileSystem的实现DistributedFileSystem中给出了最终返回FSDataOutputStream对象的抽象逻辑,代码以下所示:安全
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); return new FSDataOutputStream (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics); }
上面,DFSClient dfs的create方法中建立了一个OutputStream对象,在DFSClient的create方法:服务器
public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize ) throws IOException { ... ... }
final DFSOutputStream result = new DFSOutputStream(src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512));
下面,咱们从DFSOutputStream类开始,说明其内部实现原理。数据结构
DFSOutputStream(String src, FsPermission masked, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum, replication); computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默认 writePacketSize=64*1024(即64K),bytesPerChecksum=512(没512个字节计算一个校验和), try { if (createParent) { // createParent为true表示,若是待建立的文件的父级目录不存在,则自动建立 namenode.create(src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create(src, masked, clientName, overwrite, false, replication, blockSize); } } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileAlreadyExistsException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } streamer.start(); // 启动一个DataStreamer线程,用来将写入的字节流打包成packet,而后发送到对应的Datanode节点上 } 上面computePacketChunkSize方法计算了一个packet的相关参数,咱们结合代码来查看,以下所示: int chunkSize = csize + checksum.getChecksumSize(); int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1); packetSize = n + chunkSize*chunksPerPacket;
咱们用默认的参数值替换上面的参数,获得:并发
int chunkSize = 512 + 4; int n = 21 + 4; chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1); // 127 packetSize = 25 + 516*127;
上面对应的参数,说明以下表所示:
参数名称 | 参数值 | 参数含义 |
chunkSize | 512+4=516 | 每一个chunk的字节数(数据+校验和) |
csize | 512 | 每一个chunk数据的字节数 |
psize | 64*1024 | 每一个packet的最大字节数(不包含header) |
DataNode.PKT_HEADER_LEN | 21 | 每一个packet的header的字节数 |
chunksPerPacket | 127 | 组成每一个packet的chunk的个数 |
packetSize | 25+516*127=65557 | 每一个packet的字节数(一个header+一组chunk) |
在计算好一个packet相关的参数之后,调用create方法与Namenode进行RPC请求,请求建立文件:
if (createParent) { // createParent为true表示,若是待建立的文件的父级目录不存在,则自动建立 namenode.create(src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create(src, masked, clientName, overwrite, false, replication, blockSize); }
远程调用上面方法,会在FSNamesystem中建立对应的文件路径,并初始化与该建立的文件相关的一些信息,如租约(向Datanode节点写数据的凭据)。文件在FSNamesystem中建立成功,就要初始化并启动一个DataStreamer线程,用来向Datanode写数据,后面咱们详细说明具体处理逻辑。
Packet结构与定义
字段名称 | 字段类型 | 字段长度 | 字段含义 |
pktLen | int | 4 | 4 + dataLen + checksumLen |
offsetInBlock | long | 8 | Packet在Block中偏移量 |
seqNo | long | 8 | Packet序列号,在同一个Block惟一 |
lastPacketInBlock | boolean | 1 | 是不是一个Block的最后一个Packet |
dataLen | int | 4 | dataPos – dataStart,不包含Header和Checksum的长度 |
ByteBuffer buffer; // only one of buf and buffer is non-null byte[] buf; long seqno; // sequencenumber of buffer in block long offsetInBlock; // 该packet在block中的偏移量 boolean lastPacketInBlock; // is this the last packet in block? int numChunks; // number of chunks currently in packet int maxChunks; // 一个packet中包含的chunk的个数 int dataStart; int dataPos; int checksumStart; int checksumPos;
Packet类有一个默认的没有参数的构造方法,它是用来作heatbeat的,以下所示:
Packet() { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = 0; this.seqno = HEART_BEAT_SEQNO; // 值为-1 buffer = null; int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25 buf = new byte[packetSize]; checksumStart = dataStart = packetSize; checksumPos = checksumStart; dataPos = dataStart; maxChunks = 0; }
经过代码能够看到,一个heatbeat的内容,实际上只有一个长度为25字节的header数据。经过this.seqno = HEART_BEAT_SEQNO;的值能够判断一个packet是不是heatbeat包,若是seqno为-1表示这是一个heatbeat包。
Client发送Packet数据
字段名称 | 字段类型 | 字段长度 | 字段含义 |
Transfer Version | short | 2 | Client与DataNode之间数据传输版本号,由常量DataTransferProtocol.DATA_TRANSFER_VERSION定义,值为17 |
OP | int | 4 | 操做类型,由常量DataTransferProtocol.OP_WRITE_BLOCK定义,值为80 |
blkId | long | 8 | Block的ID值,由NameNode分配 |
GS | long | 8 | 时间戳(Generation Stamp),NameNode分配blkId的时候生成的时间戳 |
DNCnt | int | 4 | DataNode复制Pipeline中DataNode节点的数量 |
Recovery Flag | boolean | 1 | Recover标志 |
Client | Text | Client主机的名称,在使用Text进行序列化的时候,实际包含长度len与主机名称字符串ClientHost | |
srcNode | boolean | 1 | 是否发送src node的信息,默认值为false,不发送src node的信息 |
nonSrcDNCnt | int | 4 | 由Client写的该Header数据,该数不包含Pipeline中第一个节点(即为DNCnt-1) |
DN2 | DatanodeInfo | DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
DN3 | DatanodeInfo | DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
Access Token | Token | 访问令牌信息,包括IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service | |
CheckSum Header | DataChecksum | 1+4 | 校验和Header信息,包括type、bytesPerChecksum |
Header数据包发送成功,Client会收到一个成功响应码(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着将Packet数据发送到Pipeline中第一个DataNode上,以下所示:
Packet one = null; one = dataQueue.getFirst(); // regular data packet ByteBuffer buf = one.getBuffer(); // write out data to remote datanode blockStream.write(buf.array(), buf.position(), buf.remaining()); if (one.lastPacketInBlock) { // 若是是Block中的最后一个Packet,还要写入一个0标识该Block已经写入完成 blockStream.writeInt(0); // indicate end-of-block }
if (!success) { LOG.info("Abandoning " + block); namenode.abandonBlock(block, src, clientName); if (errorIndex < nodes.length) { LOG.info("Excluding datanode " + nodes[errorIndex]); excludedNodes.add(nodes[errorIndex]); } // Connection failed. Let's wait a little bit and retry retry = true; }
Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong()); LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress); int pipelineSize = in.readInt(); // num of datanodes in entire pipeline boolean isRecovery = in.readBoolean(); // is this part of recovery? String client = Text.readString(in); // working on behalf of this client boolean hasSrcDataNode = in.readBoolean(); // is src node info present if (hasSrcDataNode) { srcDataNode = new DatanodeInfo(); srcDataNode.readFields(in); } int numTargets = in.readInt(); if (numTargets < 0) { throw new IOException("Mislabelled incoming datastream."); } DatanodeInfo targets[] = new DatanodeInfo[numTargets]; for (int i = 0; i < targets.length; i++) { DatanodeInfo tmp = new DatanodeInfo(); tmp.readFields(in); targets[i] = tmp; } Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>(); accessToken.readFields(in);