源码|HDFS之DataNode:写数据块(1)

做为分布式文件系统,HDFS擅于处理大文件的读/写。这得益于“文件元信息与文件数据分离,文件数据分块存储”的思想:namenode管理文件元信息,datanode管理分块的文件数据。java

HDFS 2.x进一步将数据块存储服务抽象为blockpool,不过写数据块过程与1.x大同小异。本文假设副本系数1(即写数据块只涉及1个客户端+1个datanode),未发生任何异常,分析datanode写数据块的过程。node

源码版本:Apache Hadoop 2.6.0git

可参考猴子追源码时的速记打断点,亲自debug一遍。github

副本系数1,即只须要一个datanode构成最小的管道,与更常见的管道写相比,能够认为“无管道”。后续再写两篇文章分别分析管道写无异常、管道写有异常两种状况。安全

开始以前

总览

参考源码|HDFS之DataNode:启动过程,咱们大致了解了datanode上有哪些重要的工做线程。其中,与写数据块过程联系最紧密的是DataXceiverServer与BPServiceActor。多线程

参考HDFS-1.x、2.x的RPC接口,客户端与数据节点间主要经过流接口DataTransferProtocol完成数据块的读/写。DataTransferProtocol用于整个管道中的客户端、数据节点间的流式通讯,其中,DataTransferProtocol#writeBlock()负责完成写数据块的工做:并发

public void writeBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, final DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException;
复制代码

文章的组织结构

  1. 若是只涉及单个分支的分析,则放在同一节。
  2. 若是涉及多个分支的分析,则在下一级分多个节,每节讨论一个分支。
  3. 多线程的分析同多分支。
  4. 每个分支和线程的组织结构遵循规则1-3。

DataXceiverServer线程

注意,DataTransferProtocol并非一个RPC协议,所以,常见经过的寻找DataTransferProtocol接口的实现类来肯定“客户端调用的远程方法”是站不住脚。不过依然能够按照这个思路倒追,看实现类到底是如何被建立,与谁通讯,来验证是否找到了正确的实现类。app

依靠debug,猴子从DataXceiver类反向追到了DataXceiverServer类。这里从DataXceiverServer类开始,正向讲解。异步

DataXceiverServer线程在DataNode#runDatanodeDaemon()方法中启动。socket

DataXceiverServer#run():

public void run() {
    Peer peer = null;
    while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
      try {
        peer = peerServer.accept();

        ...// 检查DataXceiver线程的数量,超过最大限制就抛出IOE

        // 启动一个新的DataXceiver线程
        new Daemon(datanode.threadGroup,
            DataXceiver.create(peer, datanode, this))
            .start();
      } catch (SocketTimeoutException ignored) {
        // wake up to see if should continue to run
      } catch (AsynchronousCloseException ace) {
        // another thread closed our listener socket - that's expected during shutdown,
        // but not in other circumstances
        if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
          LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
        }
      } catch (IOException ie) {
        ...// 清理
      } catch (OutOfMemoryError ie) {
        ...// 清理并sleep 30s
      } catch (Throwable te) {
        // 其余异常就关闭datanode
        LOG.error(datanode.getDisplayName()
            + ":DataXceiverServer: Exiting due to: ", te);
        datanode.shouldRun = false;
      }
    }

    ...// 关闭peerServer并清理全部peers
  }
复制代码

DataXceiverServer线程是一个典型的Tcp Socket Server。客户端每来一个TCP请求,若是datanode上的DataXceiver线程数量还没超过限制,就启动一个新的DataXceiver线程。

默认的最大DataXceiver线程数量为4096,经过dfs.datanode.max.transfer.threads设置。

主流程:DataXceiver线程

DataXceiver#run():

public void run() {
    int opsProcessed = 0;
    Op op = null;

    try {
      ...// 一些初始化
      
      // 使用一个循环,以容许客户端发送新的操做请求时重用TCP链接
      do {
        updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));

        try {
          ...// 超时设置
          op = readOp();
        } catch (InterruptedIOException ignored) {
          // Time out while we wait for client rpc
          break;
        } catch (IOException err) {
          ...// 此处的优化使得正常处理完一个操做后,必定会抛出EOFException或ClosedChannelException,能够退出循环
          ...// 若是是其余异常,则说明出现错误,从新抛出以退出循环
        }

        ...// 超时设置

        opStartTime = now();
        processOp(op);
        ++opsProcessed;
      } while ((peer != null) &&
          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
    } catch (Throwable t) {
      ...// 异常处理
    } finally {
      ...// 资源清理,包括打开的文件、socket等
    }
  }
复制代码

此处的优化很少讲。

DataXceiver#readOp()继承自Receiver类:从客户端发来的socket中读取op码,判断客户端要进行何种操做操做。写数据块使用的op码为80,返回的枚举变量op = Op.WRITE_BLOCK

DataXceiver#processOp()也继承自Receiver类:

protected final void processOp(Op op) throws IOException {
    switch(op) {
    case READ_BLOCK:
      opReadBlock();
      break;
    case WRITE_BLOCK:
      opWriteBlock(in);
      break;
    ...// 其余case
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }
  
  ...
  
  private void opWriteBlock(DataInputStream in) throws IOException {
    final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
    final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
        proto.getClass().getSimpleName());
    try {
      writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
          PBHelper.convertStorageType(proto.getStorageType()),
          PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
          proto.getHeader().getClientName(),
          targets,
          PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
          PBHelper.convert(proto.getSource()),
          fromProto(proto.getStage()),
          proto.getPipelineSize(),
          proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
          proto.getLatestGenerationStamp(),
          fromProto(proto.getRequestedChecksum()),
          (proto.hasCachingStrategy() ?
              getCachingStrategy(proto.getCachingStrategy()) :
            CachingStrategy.newDefaultStrategy()),
            (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
     } finally {
      if (traceScope != null) traceScope.close();
     }
  }
复制代码

HDFS 2.x相对于1.x的另外一项改进,在流式接口中也大幅替换为使用protobuf,再也不是裸TCP分析字节流了。

Receiver类实现了DataTransferProtocol接口,但没有实现DataTransferProtocol#writeBlock()。多态特性告诉咱们,这里会调用DataXceiver#writeBlock()。

终于回到了DataXceiver#writeBlock():

public void writeBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
    ...// 检查,设置参数等

    ...// 构建向上游节点或客户端回复的输出流(此处即为客户端)

    ...// 略
    
    try {
      if (isDatanode || 
          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        // 建立BlockReceiver,准备接收数据块
        blockReceiver = new BlockReceiver(block, storageType, in,
            peer.getRemoteAddressString(),
            peer.getLocalAddressString(),
            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
            clientname, srcDataNode, datanode, requestedChecksum,
            cachingStrategy, allowLazyPersist);

        storageUuid = blockReceiver.getStorageUuid();
      } else {
        ...// 管道错误恢复相关
      }

      ...// 下游节点的处理。一个datanode是没有下游节点的。
      
      // 发送的第一个packet是空的,只用于创建管道。这里当即返回ack表示管道是否创建成功
      // 因为该datanode没有下游节点,则执行到此处,表示管道已经创建成功
      if (isClient && !isTransfer) {
        if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
        }
        BlockOpResponseProto.newBuilder()
          .setStatus(mirrorInStatus)
          .setFirstBadLink(firstBadLink)
          .build()
          .writeDelimitedTo(replyOut);
        replyOut.flush();
      }

      // 接收数据块(也负责发送到下游,不过此处没有下游节点)
      if (blockReceiver != null) {
        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
            mirrorAddr, null, targets, false);

        ...// 数据块复制相关
      }

      ...// 数据块恢复相关
      
      ...// 数据块复制相关
      
    } catch (IOException ioe) {
      LOG.info("opWriteBlock " + block + " received exception " + ioe);
      throw ioe;
    } finally {
      ...// 清理资源
    }

    ...// 更新metrics
  }
复制代码

特别说明几个参数:

  • stage:表示数据块构建的状态。此处为BlockConstructionStage.PIPELINE_SETUP_CREATE
  • isDatanode:表示写数据块请求是否由数据节点发起。若是写请求中clientname为空,就说明是由数据节点发起(如数据块复制等由数据节点发起)。此处为false。
  • isClient:表示写数据块请求是否由客户端发起,此值必定与isDatanode相反。此处为true。
  • isTransfers:表示写数据块请求是否为数据块复制。若是stage为BlockConstructionStage.TRANSFER_RBWBlockConstructionStage.TRANSFER_FINALIZED,则表示为了数据块复制。此处为false。

下面讨论“准备接收数据块”和“接收数据块”两个过程。

准备接收数据块:BlockReceiver.<init>()

BlockReceiver.<init>()

BlockReceiver(final ExtendedBlock block, final StorageType storageType,
      final DataInputStream in,
      final String inAddr, final String myAddr,
      final BlockConstructionStage stage, 
      final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
      final String clientname, final DatanodeInfo srcDataNode,
      final DataNode datanode, DataChecksum requestedChecksum,
      CachingStrategy cachingStrategy,
      final boolean allowLazyPersist) throws IOException {
    try{
      ...// 检查,设置参数等

      // 打开文件,准备接收数据块
      if (isDatanode) { // 数据块复制和数据块移动是由数据节点发起的,这是在tmp目录下建立block文件
        replicaInfo = datanode.data.createTemporary(storageType, block);
      } else {
        switch (stage) {
        // 对于客户端发起的写数据请求(只考虑create,不考虑append),在rbw目录下建立数据块(block文件、meta文件,数据块处于RBW状态)
        case PIPELINE_SETUP_CREATE:
          replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
          datanode.notifyNamenodeReceivingBlock(
              block, replicaInfo.getStorageUuid());
          break;
        ...// 其余case
        default: throw new IOException("Unsupported stage " + stage + 
              " while receiving block " + block + " from " + inAddr);
        }
      }
      ...// 略
      
      // 对于数据块复制、数据块移动、客户端建立数据块,本质上都在建立新的block文件。对于这些状况,isCreate为true
      final boolean isCreate = isDatanode || isTransfer 
          || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
      assert streams != null : "null streams!";

      ...// 计算meta文件的文件头
      // 若是须要建立新的block文件,也就须要同时建立新的meta文件,并写入文件头
      if (isCreate) {
        BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
      } 
    } catch (ReplicaAlreadyExistsException bae) {
      throw bae;
    } catch (ReplicaNotFoundException bne) {
      throw bne;
    } catch(IOException ioe) {
      ...// IOE一般涉及文件等资源,所以要额外清理资源
    }
  }
复制代码

尽管上述代码的注释加了很多,但建立block的场景比较简单,只须要记住在rbw目录下建立block文件和meta文件便可。

在rbw目录下建立数据块后,还要经过DataNode#notifyNamenodeReceivingBlock()向namenode汇报正在接收的数据块。该方法仅仅将数据块放入缓冲区中,由BPServiceActor线程异步汇报。

此处不展开,后面会介绍一个类似的方法DataNode#notifyNamenodeReceivedBlock()。

接收数据块:BlockReceiver#receiveBlock()

BlockReceiver#receiveBlock():

void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams, boolean isReplaceBlock) throws IOException {
    ...// 参数设置

    try {
      // 若是是客户端发起的写请求(此处即为数据块create),则启动PacketResponder发送ack
      if (isClient && !isTransfer) {
        responder = new Daemon(datanode.threadGroup, 
            new PacketResponder(replyOut, mirrIn, downstreams));
        responder.start(); // start thread to processes responses
      }

      // 同步接收packet,写block文件和meta文件
      while (receivePacket() >= 0) {}

      // 此时,节点已接收了全部packet,能够等待发送完全部ack后关闭responder
      if (responder != null) {
        ((PacketResponder)responder.getRunnable()).close();
        responderClosed = true;
      }

      ...// 数据块复制相关

    } catch (IOException ioe) {
      if (datanode.isRestarting()) {
        LOG.info("Shutting down for restart (" + block + ").");
      } else {
        LOG.info("Exception for " + block, ioe);
        throw ioe;
      }
    } finally {
      ...// 清理
    }
  }
复制代码

同步接收packet:BlockReceiver#receivePacket()

先看BlockReceiver#receivePacket()。

严格来讲,BlockReceiver#receivePacket()负责接收上游的packet,并继续向下游节点管道写

private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);

    PacketHeader header = packetReceiver.getHeader();
    ...// 略

    ...// 检查packet头

    long offsetInBlock = header.getOffsetInBlock();
    long seqno = header.getSeqno();
    boolean lastPacketInBlock = header.isLastPacketInBlock();
    final int len = header.getDataLen();
    boolean syncBlock = header.getSyncBlock();

    ...// 略
    
    // 若是不须要当即持久化也不须要校验收到的数据,则能够当即委托PacketResponder线程返回 SUCCESS 的ack,而后再进行校验和持久化
    if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    ...// 管道写相关
    
    ByteBuffer dataBuf = packetReceiver.getDataSlice();
    ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
    
    if (lastPacketInBlock || len == 0) {    // 收到空packet多是表示心跳或数据块发送
      // 这两种状况均可以尝试把以前的数据刷到磁盘
      if (syncBlock) {
        flushOrSync(true);
      }
    } else {    // 不然,须要持久化packet
      final int checksumLen = diskChecksum.getChecksumSize(len);
      final int checksumReceivedLen = checksumBuf.capacity();

      ...// 若是是管道中的最后一个节点,则持久化以前,要先对收到的packet作一次校验(使用packet自己的校验机制)
      ...// 若是校验错误,则委托PacketResponder线程返回 ERROR_CHECKSUM 的ack

      final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
          && streams.isTransientStorage();
      try {
        long onDiskLen = replicaInfo.getBytesOnDisk();
        if (onDiskLen<offsetInBlock) {
          ...// 若是校验块不完整,须要加载并调整旧的meta文件内容,供后续从新计算crc

          // 写block文件
          int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
              + dataBuf.arrayOffset() + dataBuf.position();
          int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
          
          // 写meta文件
          final byte[] lastCrc;
          if (shouldNotWriteChecksum) {
            lastCrc = null;
          } else if (partialCrc != null) {  // 若是是校验块不完整(以前收到过一部分)
            ...// 从新计算crc
            ...// 更新lastCrc
            checksumOut.write(buf);
            partialCrc = null;
          } else { // 若是校验块完整
            ...// 更新lastCrc
            checksumOut.write(checksumBuf.array(), offset, checksumLen);
          }

          ...//略
        }
      } catch (IOException iex) {
        datanode.checkDiskErrorAsync();
        throw iex;
      }
    }

    // 相反的,若是须要当即持久化或须要校验收到的数据,则如今已经完成了持久化和校验,能够委托PacketResponder线程返回 SUCCESS 的ack
    // if sync was requested, put in queue for pending acks here
    // (after the fsync finished)
    if (responder != null && (syncBlock || shouldVerifyChecksum())) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    ...// 若是超过了响应时间,还要主动发送一个IN_PROGRESS的ack,防止超时

    ...// 节流器相关
    
    // 当整个数据块都发送完成以前,客户端会可能会发送有数据的packet,也由于维持心跳或表示结束写数据块发送空packet
    // 所以,当标志位lastPacketInBlock为true时,不能返回0,要返回一个负值,以区分未到达最后一个packet以前的状况
    return lastPacketInBlock?-1:len;
  }
  
  ...
  
  private boolean shouldVerifyChecksum() {
    // 对于客户端写,只有管道中的最后一个节点知足`mirrorOut == null`
    return (mirrorOut == null || isDatanode || needsChecksumTranslation);
  }
复制代码

BlockReceiver#shouldVerifyChecksum()主要与管道写有关,本文只有一个datanode,则必定知足mirrorOut == null

上述代码看起来长,主要工做只有四项:

  1. 接收packet
  2. 校验packet
  3. 持久化packet
  4. 委托PacketResponder线程发送ack

BlockReceiver#receivePacket() + PacketResponder线程 + PacketResponder#ackQueue构成一个生产者消费者模型。生产和消费的对象是ack,BlockReceiver#receivePacket()是生产者,PacketResponder线程是消费者。

扫一眼PacketResponder#enqueue():

void enqueue(final long seqno, final boolean lastPacketInBlock, final long offsetInBlock, final Status ackStatus) {
      final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
          System.nanoTime(), ackStatus);
      if(LOG.isDebugEnabled()) {
        LOG.debug(myString + ": enqueue " + p);
      }
      synchronized(ackQueue) {
        if (running) {
          ackQueue.addLast(p);
          ackQueue.notifyAll();
        }
      }
    }
复制代码

ackQueue是一个线程不安全的LinkedList。

关于如何利用线程不安全的容器实现生产者消费者模型可参考Java实现生产者-消费者模型中的实现三。

异步发送ack:PacketResponder线程

与BlockReceiver#receivePacket()相对,PacketResponder线程负责接收下游节点的ack,并继续向上游管道响应

PacketResponder#run():

public void run() {
      boolean lastPacketInBlock = false;
      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
      while (isRunning() && !lastPacketInBlock) {
        long totalAckTimeNanos = 0;
        boolean isInterrupted = false;
        try {
          Packet pkt = null;
          long expected = -2;
          PipelineAck ack = new PipelineAck();
          long seqno = PipelineAck.UNKOWN_SEQNO;
          long ackRecvNanoTime = 0;
          try {
            // 若是当前节点不是管道的最后一个节点,且下游节点正常,则从下游读取ack
            if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
              ack.readFields(downstreamIn);
              ...// 统计相关
              ...// OOB相关(暂时忽略)
              seqno = ack.getSeqno();
            }
            // 若是从下游节点收到了正常的 ack,或当前节点是管道的最后一个节点,则须要从队列中消费pkt(即BlockReceiver#receivePacket()放入的ack)
            if (seqno != PipelineAck.UNKOWN_SEQNO
                || type == PacketResponderType.LAST_IN_PIPELINE) {
              pkt = waitForAckHead(seqno);
              if (!isRunning()) {
                break;
              }
              // 管道写用seqno控制packet的顺序:当且仅当下游正确接收的序号与当前节点正确处理完的序号相等时,当前节点才认为该序号的packet已正确接收;上游同理
              expected = pkt.seqno;
              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
                  && seqno != expected) {
                throw new IOException(myString + "seqno: expected=" + expected
                    + ", received=" + seqno);
              }
              ...// 统计相关
              lastPacketInBlock = pkt.lastPacketInBlock;
            }
          } catch (InterruptedException ine) {
            ...// 异常处理
          } catch (IOException ioe) {
            ...// 异常处理
          }

          ...// 中断退出

          // 若是是最后一个packet,将block的状态转换为FINALIZED,并关闭BlockReceiver
          if (lastPacketInBlock) {
            finalizeBlock(startTime);
          }

          // 此时,必然知足 ack.seqno == pkt.seqno,构造新的 ack 发送给上游
          sendAckUpstream(ack, expected, totalAckTimeNanos,
              (pkt != null ? pkt.offsetInBlock : 0), 
              (pkt != null ? pkt.ackStatus : Status.SUCCESS));
          // 已经处理完队头元素,出队
          // 只有一种状况下知足pkt == null:PacketResponder#isRunning()返回false,即PacketResponder线程正在关闭。此时不管队列中是否有元素,都不须要出队了
          if (pkt != null) {
            removeAckHead();
          }
        } catch (IOException e) {
          ...// 异常处理
        } catch (Throwable e) {
          ...// 异常处理
        }
      }
      LOG.info(myString + " terminating");
    }
复制代码

总结起来,PacketResponder线程的核心工做以下:

  1. 接收下游节点的ack
  2. 比较ack.seqno与当前队头的pkt.seqno
  3. 若是相等,则向上游发送pkt
  4. 若是是最后一个packet,将block的状态转换为FINALIZED

一不当心把管道响应的逻辑也分析了。。。

扫一眼PacketResponder线程使用的出队和查看对头的方法:

// 查看队头
    Packet waitForAckHead(long seqno) throws InterruptedException {
      synchronized(ackQueue) {
        while (isRunning() && ackQueue.size() == 0) {
          if (LOG.isDebugEnabled()) {
            LOG.debug(myString + ": seqno=" + seqno +
                      " waiting for local datanode to finish write.");
          }
          ackQueue.wait();
        }
        return isRunning() ? ackQueue.getFirst() : null;
      }
    }
    
    ...
    
    // 出队
    private void removeAckHead() {
      synchronized(ackQueue) {
        ackQueue.removeFirst();
        ackQueue.notifyAll();
      }
    }
复制代码

队尾入队,队头出队。

  • 每次查看对头后,若是发现队列非空,则只要不出队,则队列后续状态必定是非空的,且队头元素不变。
  • 查看队头后的第一次出队,弹出的必定是刚才查看队头看到的元素。

须要看下PacketResponder#finalizeBlock():

private void finalizeBlock(long startTime) throws IOException {
      // 关闭BlockReceiver,并清理资源
      BlockReceiver.this.close();
      ...// log
      block.setNumBytes(replicaInfo.getNumBytes());
      // datanode上的数据块关闭委托给FsDatasetImpl#finalizeBlock()
      datanode.data.finalizeBlock(block);
      // namenode上的数据块关闭委托给Datanode#closeBlock()
      datanode.closeBlock(
          block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
      ...// log
    }
复制代码

datanode角度的数据块关闭:FsDatasetImpl#finalizeBlock()

FsDatasetImpl#finalizeBlock():

public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
    if (Thread.interrupted()) {
      // Don't allow data modifications from interrupted threads
      throw new IOException("Cannot finalize block from Interrupted Thread");
    }
    ReplicaInfo replicaInfo = getReplicaInfo(b);
    if (replicaInfo.getState() == ReplicaState.FINALIZED) {
      // this is legal, when recovery happens on a file that has
      // been opened for append but never modified
      return;
    }
    finalizeReplica(b.getBlockPoolId(), replicaInfo);
  }
  
  ...
  
  private synchronized FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException {
    FinalizedReplica newReplicaInfo = null;
    if (replicaInfo.getState() == ReplicaState.RUR &&
       ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == 
         ReplicaState.FINALIZED) {  // 数据块恢复相关(略)
      newReplicaInfo = (FinalizedReplica)
             ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
    } else {
      FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
      // 回忆BlockReceiver.<init>()的分析,咱们建立的block处于RBW状态,block文件位于rbw目录(固然,实际上位于哪里也无所谓,缘由见后)
      File f = replicaInfo.getBlockFile();
      if (v == null) {
        throw new IOException("No volume for temporary file " + f + 
            " for block " + replicaInfo);
      }

      // 在卷FsVolumeImpl上进行block文件与meta文件的状态转换
      File dest = v.addFinalizedBlock(
          bpid, replicaInfo, f, replicaInfo.getBytesReserved());
      // 该副本即表明最终的数据块副本,处于FINALIZED状态
      newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());

      ...// 略
    }
    volumeMap.add(bpid, newReplicaInfo);

    return newReplicaInfo;
  }
复制代码

FsVolumeImpl#addFinalizedBlock():

File addFinalizedBlock(String bpid, Block b, File f, long bytesReservedForRbw) throws IOException {
    releaseReservedSpace(bytesReservedForRbw);
    return getBlockPoolSlice(bpid).addBlock(b, f);
  }
复制代码

还记得datanode启动过程当中分析的FsVolumeImpl与BlockPoolSlice的关系吗?此处将操做继续委托给BlockPoolSlice#addBlock():

可知,BlockPoolSlice仅管理处于FINALIZED的数据块

File addBlock(Block b, File f) throws IOException {
    File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
    if (!blockDir.exists()) {
      if (!blockDir.mkdirs()) {
        throw new IOException("Failed to mkdirs " + blockDir);
      }
    }
    File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
    ...// 统计相关
    return blockFile;
  }
复制代码

BlockPoolSlice反向借助FsDatasetImpl提供的静态方法FsDatasetImpl.moveBlockFiles():

static File moveBlockFiles(Block b, File srcfile, File destdir) throws IOException {
    final File dstfile = new File(destdir, b.getBlockName());
    final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
    final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
    try {
      NativeIO.renameTo(srcmeta, dstmeta);
    } catch (IOException e) {
      throw new IOException("Failed to move meta file for " + b
          + " from " + srcmeta + " to " + dstmeta, e);
    }
    try {
      NativeIO.renameTo(srcfile, dstfile);
    } catch (IOException e) {
      throw new IOException("Failed to move block file for " + b
          + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
    }
    ...// 日志
    return dstfile;
  }
复制代码

直接将block文件和meta文件从原目录(rbw目录,对应RBW状态)移动到finalized目录(对应FINALIZED状态)。

至此,datanode上的写数据块已经完成。

不过,namenode上的元信息尚未更新,所以,还要向namenode汇报收到了数据块。

  • 线程安全由FsDatasetImpl#finalizeReplica()保证
  • 整个FsDatasetImpl#finalizeReplica()的流程中,都不关系数据块的原位置,状态转换逻辑自己保证了其正确性。

namenode角度的数据块关闭:Datanode#closeBlock()

Datanode#closeBlock():

void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
    metrics.incrBlocksWritten();
    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
    if(bpos != null) {
      // 向namenode汇报已收到的数据块
      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
    } else {
      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
          + block.getBlockPoolId());
    }
    // 将新数据块添加到blockScanner的扫描范围中(暂不讨论)
    FsVolumeSpi volume = getFSDataset().getVolume(block);
    if (blockScanner != null && !volume.isTransientStorage()) {
      blockScanner.addBlock(block);
    }
  }
复制代码

BPOfferService#notifyNamenodeReceivedBlock():

void notifyNamenodeReceivedBlock( ExtendedBlock block, String delHint, String storageUuid) {
    checkBlock(block);
    // 收到数据块(增长)与删除数据块(减小)是一块儿汇报的,都构造为ReceivedDeletedBlockInfo
    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(),
        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
        delHint);

    // 每一个BPServiceActor都要向本身负责的namenode发送报告
    for (BPServiceActor actor : bpServices) {
      actor.notifyNamenodeBlock(bInfo, storageUuid, true);
    }
  }
复制代码

BPServiceActor#notifyNamenodeBlock():

void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo, String storageUuid, boolean now) {
    synchronized (pendingIncrementalBRperStorage) {
      // 更新pendingIncrementalBRperStorage
      addPendingReplicationBlockInfo(
          bInfo, dn.getFSDataset().getStorage(storageUuid));
      // sendImmediateIBR是一个volatile变量,控制是否当即发送BlockReport(BR)
      sendImmediateIBR = true;
      // 传入的now为true,接下来将唤醒阻塞在pendingIncrementalBRperStorage上的全部线程
      if (now) {
        pendingIncrementalBRperStorage.notifyAll();
      }
    }
  }
复制代码

该方法的核心是pendingIncrementalBRperStorage,它维护了两次汇报之间收到、删除的数据块pendingIncrementalBRperStorage是一个缓冲区,此处将收到的数据块放入缓冲区后即认为通知完成(固然,不必定成功);由其余线程读取缓冲区,异步向namenode汇报

猴子看的源码比较少,但这种缓冲区的设计思想在HDFS和Yarn中很是常见。缓冲区实现了解耦,解耦不只能提升可扩展性,还能在缓冲区两端使用不一样的处理速度、处理规模。如pendingIncrementalBRperStorage,生产者不按期、零散放入的数据块,消费者就能够按期、批量的对数据块进行处理。而保障必定及时性的前提下,批量汇报减轻了RPC的压力。

利用IDE,很容易得知,只有负责向各namenode发送心跳的BPServiceActor线程阻塞在pendingIncrementalBRperStorage上。后文将分析该线程如何进行实际的汇报。

PacketResponder#close()

根据对BlockReceiver#receivePacket()与PacketResponder线程的分析,节点已接收全部packet时,ack可能尚未发送完。

所以,须要调用PacketResponder#close(),等待发送完全部ack后关闭responder:

public void close() {
      synchronized(ackQueue) {
        // ackQueue非空就说明ack尚未发送完成
        while (isRunning() && ackQueue.size() != 0) {
          try {
            ackQueue.wait();
          } catch (InterruptedException e) {
            running = false;
            Thread.currentThread().interrupt();
          }
        }
        if(LOG.isDebugEnabled()) {
          LOG.debug(myString + ": closing");
        }
        // notify阻塞在PacketResponder#waitForAckHead()方法上的PacketResponder线程,使其检测到关闭条件
        running = false;
        ackQueue.notifyAll();
      }

      // ???
      synchronized(this) {
        running = false;
        notifyAll();
      }
    }
复制代码

猴子没明白19-22行的synchronized语句块有什么用,,,求解释。

BPServiceActor线程

根据前文,接下来须要分析BPServiceActor线程如何读取pendingIncrementalBRperStorage缓冲区,进行实际的汇报。

在BPServiceActor#offerService()中调用了pendingIncrementalBRperStorage#wait()。因为涉及阻塞、唤醒等操做,没法按照正常流程分析,这里从线程被唤醒的位置开始分析:

// 若是目前不须要汇报,则wait一段时间
        long waitTime = dnConf.heartBeatInterval - 
        (Time.now() - lastHeartbeat);
        synchronized(pendingIncrementalBRperStorage) {
          if (waitTime > 0 && !sendImmediateIBR) {
            try {
              // BPServiceActor线程今后处醒来,而后退出synchronized块
              pendingIncrementalBRperStorage.wait(waitTime);
            } catch (InterruptedException ie) {
              LOG.warn("BPOfferService for " + this + " interrupted");
            }
          }
        } // synchronized
复制代码

可能有读者阅读过猴子的条件队列大法好:使用wait、notify和notifyAll的正确姿式,认为此处if(){wait}的写法姿式不正确。读者可再复习一下该文的“version2:过早唤醒”部分,结合HDFS的心跳机制,思考一下为何此处的写法没有问题。更甚,此处偏偏应当这么写。

若是目前不须要汇报,则BPServiceActor线程会wait一段时间,正式这段wait的时间,让BPServiceActor#notifyNamenodeBlock()的唤醒产生了意义。

BPServiceActor线程唤醒后,醒来后,继续心跳循环:

while (shouldRun()) {
      try {
        final long startTime = now();
        if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
复制代码

假设还到达心跳发送间隔,则不执行if语句块。

此时,在BPServiceActor#notifyNamenodeBlock()方法中修改的volatile变量sendImmediateIBR就派上了用场:

// 检测到sendImmediateIBR为true,则当即汇报已收到和已删除的数据块
        if (sendImmediateIBR ||
            (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
          // 汇报已收到和已删除的数据块
          reportReceivedDeletedBlocks();
          // 更新lastDeletedReport
          lastDeletedReport = startTime;
        }

        // 再来一次完整的数据块汇报
        List<DatanodeCommand> cmds = blockReport();
        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));

        // 处理namenode返回的命令
        DatanodeCommand cmd = cacheReport();
        processCommand(new DatanodeCommand[]{ cmd });
复制代码

有意思的是,这里先单独汇报了一次数据块收到和删除的状况,该RPC不须要等待namenode的返回值;又汇报了一次整体状况,此时须要等待RPC的返回值了。

所以,尽管对于增删数据块采起增量式汇报,但因为增量式汇报后必然跟着一次全量汇报,使得增量汇报的成本仍然很是高。为了提升并发,BPServiceActor#notifyNamenodeBlock修改缓冲区后当即返回,不关心汇报是否成功。也没必要担忧汇报失败的后果:在汇报以前,数据块已经转为FINALIZED状态+持久化到磁盘上+修改了缓冲区,若是汇报失败能够等待重试,若是datanode在发报告前挂了能够等启动后从新汇报,必然能保证一致性。

暂时不关心整体汇报的逻辑,只看单独汇报的BPServiceActor#reportReceivedDeletedBlocks():

private void reportReceivedDeletedBlocks() throws IOException {

    // 构造报告,并重置sendImmediateIBR为false
    ArrayList<StorageReceivedDeletedBlocks> reports =
        new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
    synchronized (pendingIncrementalBRperStorage) {
      for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
           pendingIncrementalBRperStorage.entrySet()) {
        final DatanodeStorage storage = entry.getKey();
        final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();

        if (perStorageMap.getBlockInfoCount() > 0) {
          ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
          reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
        }
      }
      sendImmediateIBR = false;
    }

    // 若是报告为空,就直接返回
    if (reports.size() == 0) {
      return;
    }

    // 不然经过RPC向本身负责的namenode发送报告
    boolean success = false;
    try {
      bpNamenode.blockReceivedAndDeleted(bpRegistration,
          bpos.getBlockPoolId(),
          reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
      success = true;
    } finally {
      // 若是汇报失败,则将增删数据块的信息放回缓冲区,等待从新汇报
      if (!success) {
        synchronized (pendingIncrementalBRperStorage) {
          for (StorageReceivedDeletedBlocks report : reports) {
            PerStoragePendingIncrementalBR perStorageMap =
                pendingIncrementalBRperStorage.get(report.getStorage());
            perStorageMap.putMissingBlockInfos(report.getBlocks());
            sendImmediateIBR = true;
          }
        }
      }
    }
  }
复制代码

有两个注意点:

  • 无论namenode处于active或standy状态,BPServiceActor线程都会汇报(尽管会忽略standby namenode的命令)
  • 最后success为false时,可能namenode已收到汇报,但将信息添加会缓冲区致使重复汇报也没有坏影响,这分为两个方面:
    • 重复汇报已删除的数据块:namenode发现未存储该数据块的信息,则得知其已经删除了,会忽略该信息。
    • 重复汇报已收到的数据块:namenode发现新收到的数据块与已存储数据块的信息彻底一致,也会忽略该信息。

总结

1个客户端+1个datanode构成了最小的管道。本文梳理了在这个最小管道上无异常状况下的写数据块过程,在此之上,再来分析管道写的有异常的难度将大大下降。


本文连接:源码|HDFS之DataNode:写数据块(1)
做者:猴子007
出处:monkeysayhi.github.io
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布,欢迎转载,演绎或用于商业目的,可是必须保留本文的署名及连接。

相关文章
相关标签/搜索