源码|HDFS之DataNode:写数据块(3)

源码|HDFS之DataNode:写数据块(1)源码|HDFS之DataNode:写数据块(2)分别分析了无管道无异常、管道写无异常的状况下,datanode上的写数据块过程。本文分析管道写有异常的状况,假设副本系数3(即写数据块涉及1个客户端+3个datanode),讨论datanode对不一样异常种类、不一样异常时机的处理。java

源码版本:Apache Hadoop 2.6.0node

结论与实现都相对简单。可仅阅读总览。git

开始以前

总览

datanode对写数据块过程当中的异常处理比较简单,一般采用两种策略:github

  1. 当前节点抛异常,关闭上下游的IO流、socket等,以关闭管道。
  2. 向上游节点发送携带故障信息的ack。

只有少部分状况采用方案2;大部分状况采用方案1,简单关闭管道了事;部分状况两者结合。多线程

虽然异常处理策略简单,但涉及异常处理的代码却很多,总体思路参照源码|HDFS之DataNode:写数据块(1)主流程中的DataXceiver#writeBlock()方法,部分融合了源码|HDFS之DataNode:写数据块(2)中管道写的内容 。本文从主流程DataXceiver#writeBlock()入手,部分涉及DataXceiver#writeBlock()的外层方法。app

更值得关注的是写数据块的故障恢复流程,该工做由客户端主导,猴子将在对客户端的分析中讨论。异步

文章的组织结构

  1. 若是只涉及单个分支的分析,则放在同一节。
  2. 若是涉及多个分支的分析,则在下一级分多个节,每节讨论一个分支。
  3. 多线程的分析同多分支。
  4. 每个分支和线程的组织结构遵循规则1-3。

主流程:DataXceiver#writeBlock()

DataXceiver#writeBlock():socket

public void writeBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
    ...// 检查,设置参数等

    ...// 构建向上游节点或客户端回复的输出流(此处即为客户端)

    ...// 略
    
    try {
      if (isDatanode || 
          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        // 建立BlockReceiver,准备接收数据块
        blockReceiver = new BlockReceiver(block, storageType, in,
            peer.getRemoteAddressString(),
            peer.getLocalAddressString(),
            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
            clientname, srcDataNode, datanode, requestedChecksum,
            cachingStrategy, allowLazyPersist);

        storageUuid = blockReceiver.getStorageUuid();
      } else {
        ...// 管道错误恢复相关
      }

      // 下游节点的处理:以当前节点为“客户端”,继续触发下游管道的创建
      if (targets.length > 0) {
        // 链接下游节点
        InetSocketAddress mirrorTarget = null;
        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to datanode " + mirrorNode);
        }
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        // 尝试创建管道(下面展开)
        try {
          // 设置创建socket的超时时间、写packet的超时时间、写buf大小等
          int timeoutValue = dnConf.socketTimeout
              + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
          int writeTimeout = dnConf.socketWriteTimeout + 
                      (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setSoTimeout(timeoutValue);
          mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
          
          // 设置当前节点到下游的输出流mirrorOut、下游到当前节点的输入流mirrorIn等
          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
              writeTimeout);
          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
          DataEncryptionKeyFactory keyFactory =
            datanode.getDataEncryptionKeyFactoryForBlock(block);
          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
          unbufMirrorOut = saslStreams.out;
          unbufMirrorIn = saslStreams.in;
          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(unbufMirrorIn);

          // 向下游节点发送创建管道的请求,将来将继续使用mirrorOut做为写packet的输出流
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
          mirrorOut.flush();

          // 若是是客户端发起的写数据块请求(知足),则存在管道,须要从下游节点读取创建管道的ack
          if (isClient) {
            BlockOpResponseProto connectAck =
              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
            // 将下游节点的管道创建结果做为整个管道的创建结果(要么从尾节点到头结点都是成功的,要么都是失败的)
            mirrorInStatus = connectAck.getStatus();
            firstBadLink = connectAck.getFirstBadLink();
            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
              LOG.info("Datanode " + targets.length +
                       " got response for connect ack " +
                       " from downstream datanode with firstbadlink as " +
                       firstBadLink);
            }
          }

        } catch (IOException e) {
          // 若是是客户端发起的写数据块请求(知足),则当即向上游发送状态ERROR的ack(尽管没法保证上游已收到)
          if (isClient) {
            BlockOpResponseProto.newBuilder()
              .setStatus(ERROR)
               // NB: Unconditionally using the xfer addr w/o hostname
              .setFirstBadLink(targets[0].getXferAddr())
              .build()
              .writeDelimitedTo(replyOut);
            replyOut.flush();
          }
          // 关闭下游的IO流,socket
          IOUtils.closeStream(mirrorOut);
          mirrorOut = null;
          IOUtils.closeStream(mirrorIn);
          mirrorIn = null;
          IOUtils.closeSocket(mirrorSock);
          mirrorSock = null;
          // 若是是客户端发起的写数据块请求(知足),则从新抛出该异常
          // 而后,将跳到外层的catch块
          if (isClient) {
            LOG.error(datanode + ":Exception transfering block " +
                      block + " to mirror " + mirrorNode + ": " + e);
            throw e;
          } else {
            LOG.info(datanode + ":Exception transfering " +
                     block + " to mirror " + mirrorNode +
                     "- continuing without the mirror", e);
          }
        }
      }
      
      // 发送的第一个packet是空的,只用于创建管道。这里当即返回ack表示管道是否创建成功
      // 因为该datanode没有下游节点,则执行到此处,表示管道已经创建成功
      if (isClient && !isTransfer) {
        if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
        }
        BlockOpResponseProto.newBuilder()
          .setStatus(mirrorInStatus)
          .setFirstBadLink(firstBadLink)
          .build()
          .writeDelimitedTo(replyOut);
        replyOut.flush();
      }

      // 接收数据块(也负责发送到下游,不过此处没有下游节点)
      if (blockReceiver != null) {
        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
            mirrorAddr, null, targets, false);

        ...// 数据块复制相关
      }

      ...// 数据块恢复相关
      
      ...// 数据块复制相关
      
    } catch (IOException ioe) {
      // 若是捕获到IOC,则直接抛出
      LOG.info("opWriteBlock " + block + " received exception " + ioe);
      throw ioe;
    } finally {
      // 无论正常仍是异常,都直接关闭IO流、socket
      IOUtils.closeStream(mirrorOut);
      IOUtils.closeStream(mirrorIn);
      IOUtils.closeStream(replyOut);
      IOUtils.closeSocket(mirrorSock);
      IOUtils.closeStream(blockReceiver);
      blockReceiver = null;
    }

    ...// 更新metrics
  }
复制代码

最后的finally块对异常处理相当重要:oop

正常状况不表。对于异常状况,关闭全部到下游的IO流(mirrorOut、mirrorIn)、socket(mirrorSock),关闭到上游的输出流(replyOut),关闭blockReceiver内部封装的大部分资源(经过BlockReceiver#close()完成),剩余资源如到上游的输入流(in)由外层的DataXceiver#run()中的finally块关闭。ui

replyOut只是一个过滤器流,其包装的底层输出流也能够由DataXceiver#run()中的finally块关闭。限于篇幅,本文不展开。

记住此处finally块的做用,后面将屡次重复该处代码,构成总览中的方案1。

下面以三个关键过程为例,分析这三个关键过程当中的异常处理,及其与外层异常处理逻辑的交互。

本地准备:BlockReceiver.<init>()

根据前文的分析,BlockReceiver.<init>()的主要工做比较简单:在rbw目录下建立block文件和meta文件:

BlockReceiver(final ExtendedBlock block, final StorageType storageType,
      final DataInputStream in,
      final String inAddr, final String myAddr,
      final BlockConstructionStage stage, 
      final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
      final String clientname, final DatanodeInfo srcDataNode,
      final DataNode datanode, DataChecksum requestedChecksum,
      CachingStrategy cachingStrategy,
      final boolean allowLazyPersist) throws IOException {
    try{
      ...// 检查,设置参数等

      // 打开文件,准备接收数据块
      if (isDatanode) { // 数据块复制和数据块移动是由数据节点发起的,这是在tmp目录下建立block文件
        replicaInfo = datanode.data.createTemporary(storageType, block);
      } else {
        switch (stage) {
        // 对于客户端发起的写数据请求(只考虑create,不考虑append),在rbw目录下建立数据块(block文件、meta文件,数据块处于RBW状态)
        case PIPELINE_SETUP_CREATE:
          replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
          datanode.notifyNamenodeReceivingBlock(
              block, replicaInfo.getStorageUuid());
          break;
        ...// 其余case
        default: throw new IOException("Unsupported stage " + stage + 
              " while receiving block " + block + " from " + inAddr);
        }
      }
      ...// 略
      
      // 对于数据块复制、数据块移动、客户端建立数据块,本质上都在建立新的block文件。对于这些状况,isCreate为true
      final boolean isCreate = isDatanode || isTransfer 
          || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
      assert streams != null : "null streams!";

      ...// 计算meta文件的文件头
      // 若是须要建立新的block文件,也就须要同时建立新的meta文件,并写入文件头
      if (isCreate) {
        BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
      } 
    } catch (ReplicaAlreadyExistsException bae) {
      throw bae;
    } catch (ReplicaNotFoundException bne) {
      throw bne;
    } catch(IOException ioe) {
      // IOE一般涉及文件等资源,所以要额外清理资源
      IOUtils.closeStream(this);
      cleanupBlock();
      
      // check if there is a disk error
      IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
      DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
          cause);
      
      if (cause != null) { // possible disk error
        ioe = cause;
        datanode.checkDiskErrorAsync();
      }
      
      // 从新抛出IOE
      throw ioe;
    }
  }
复制代码

特别提一下DataNode#checkDiskErrorAsync(),该方法异步检查是否有磁盘错误,若是错误磁盘超过阈值,就关闭datanode。但阈值的计算猴子尚未看懂,看起来是对DataStorage的理解有问题。

BlockReceiver#close()的工做已经介绍过了。须要关注的是_对ReplicaAlreadyExistsException与其余IOException的处理:从新抛出_。

ReplicaAlreadyExistsException是IOException的子类,由FsDatasetImpl#createRbw()抛出。

至于抛出IOException的状况就太多了,无权限、磁盘错误等很是缘由。

从新抛出这些异常块会怎样呢?触发外层DataXceiver#writeBlock()中的catch块与finally块。

因为至今尚未创建下游管道,先让咱们看看因为异常执行finally块,对上游节点产生的恶果:

  • 在DataXceiver线程启动后,DataXceiver#peer中封装了当前节点到上游节点的输出流(out)与上游节点到当前节点的输入流(in)。
  • 这些IO流的本质是socket,关闭当前节点端的socket后,上游节点端的socket也会在一段时间后触发超时关闭,并抛出SocketException(IOException的子类)。
  • 上游节点因为socket关闭捕获到了IOException,因而也执行finally块,重复一遍当前节点的流程。

如此,逐级关闭上游节点的管道,直到客户端对管道关闭的异常做出处理。

若是在建立block文件或meta文件时抛出了异常,目前没有策略及时清理rbw目录下的“无主”数据块。读者可尝试debug执行BlockReceiver.<init>(),在rbw目录下建立数据块后长时间不让线程继续执行,最终管道超时关闭,但rbw目录下的文件依然存在。

不过数据块恢复过程可完成清理工做,此处不展开。

创建管道:if (targets.length > 0) {代码块

若是本地准备没有发生异常,则开始创建管道:

// 下游节点的处理:以当前节点为“客户端”,继续触发下游管道的创建
      if (targets.length > 0) {
        // 链接下游节点
        InetSocketAddress mirrorTarget = null;
        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to datanode " + mirrorNode);
        }
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        // 尝试创建管道(下面展开)
        try {
          // 设置创建socket的超时时间、写packet的超时时间、写buf大小等
          int timeoutValue = dnConf.socketTimeout
              + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
          int writeTimeout = dnConf.socketWriteTimeout + 
                      (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setSoTimeout(timeoutValue);
          mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
          
          // 设置当前节点到下游的输出流mirrorOut、下游到当前节点的输入流mirrorIn等
          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
              writeTimeout);
          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
          DataEncryptionKeyFactory keyFactory =
            datanode.getDataEncryptionKeyFactoryForBlock(block);
          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
          unbufMirrorOut = saslStreams.out;
          unbufMirrorIn = saslStreams.in;
          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(unbufMirrorIn);

          // 向下游节点发送创建管道的请求,将来将继续使用mirrorOut做为写packet的输出流
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
          mirrorOut.flush();

          // 若是是客户端发起的写数据块请求(知足),则存在管道,须要从下游节点读取创建管道的ack
          if (isClient) {
            BlockOpResponseProto connectAck =
              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
            // 将下游节点的管道创建结果做为整个管道的创建结果(要么从尾节点到头结点都是成功的,要么都是失败的)
            mirrorInStatus = connectAck.getStatus();
            firstBadLink = connectAck.getFirstBadLink();
            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
              LOG.info("Datanode " + targets.length +
                       " got response for connect ack " +
                       " from downstream datanode with firstbadlink as " +
                       firstBadLink);
            }
          }

        } catch (IOException e) {
          // 若是是客户端发起的写数据块请求(知足),则当即向上游发送状态ERROR的ack(尽管没法保证上游已收到)
          if (isClient) {
            BlockOpResponseProto.newBuilder()
              .setStatus(ERROR)
               // NB: Unconditionally using the xfer addr w/o hostname
              .setFirstBadLink(targets[0].getXferAddr())
              .build()
              .writeDelimitedTo(replyOut);
            replyOut.flush();
          }
          // 关闭下游的IO流,socket
          IOUtils.closeStream(mirrorOut);
          mirrorOut = null;
          IOUtils.closeStream(mirrorIn);
          mirrorIn = null;
          IOUtils.closeSocket(mirrorSock);
          mirrorSock = null;
          // 若是是客户端发起的写数据块请求(知足),则从新抛出该异常
          // 而后,将跳到外层的catch块
          if (isClient) {
            LOG.error(datanode + ":Exception transfering block " +
                      block + " to mirror " + mirrorNode + ": " + e);
            throw e;
          } else {
            LOG.info(datanode + ":Exception transfering " +
                     block + " to mirror " + mirrorNode +
                     "- continuing without the mirror", e);
          }
        }
      }
复制代码

根据前文对管道创建过程的分析,此处要建立到与下游节点间的部分IO流、socket。

创建资源、发送管道创建请求的过程当中都有可能发生故障,抛出IOException及其子类。catch块处理这些IOException的逻辑采用了方案2:先向上游节点发送ack告知ERROR,而后关闭到下游节点的IO流(mirrorOut、mirrorIn)、关闭到下游的socket(mirrorSock)。最后,从新抛出异常,以触发外层的finally块。

此处执行的清理是外层finally块的子集,重点是多发送了一个ack,对该ack的处理留到PacketResponder线程的分析中。

不过,此时已经开始创建下游管道,再来看看因为异常执行catch块(外层finally块的分析见上),对下游节点产生的恶果:

  • 初始化mirrorOut、mirrorIn、mirrorSock后,下游节点也经过DataXceiverServer创建了配套的IO流、socket等。
  • 这些IO流的本质是socket,关闭当前节点端的socket后,下游节点端的socket也会在一段时间后触发超时关闭,并抛出SocketException(IOException的子类)。
  • 下游节点因为socket关闭捕获到了IOException,因而也执行此处的catch块或外层的finally块,重复一遍当前节点的流程。

如此,逐级关闭下游节点的管道,直到客户端对管道关闭的异常做出处理。同时,因为最终会执行外层finally块,则也会逐级关闭上游节点的管道

IO流mirrorOut、mirrorIn实际上共享TCP套接字mirrorSock;in、out同理。但管子IO流时,除了底层socket,还要清理缓冲区等资源,所以,将它们分别列出是合理的。

管道写:BlockReceiver#receiveBlock()

根据前文的分析,若是管道成功创建,则BlockReceiver#receiveBlock()开始接收packet并响应ack:

void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams, boolean isReplaceBlock) throws IOException {
    ...// 参数设置

    try {
      // 若是是客户端发起的写请求(此处即为数据块create),则启动PacketResponder发送ack
      if (isClient && !isTransfer) {
        responder = new Daemon(datanode.threadGroup, 
            new PacketResponder(replyOut, mirrIn, downstreams));
        responder.start(); // start thread to processes responses
      }

      // 同步接收packet,写block文件和meta文件
      while (receivePacket() >= 0) {}

      // 此时,节点已接收了全部packet,能够等待发送完全部ack后关闭responder
      if (responder != null) {
        ((PacketResponder)responder.getRunnable()).close();
        responderClosed = true;
      }

      ...// 数据块复制相关

    } catch (IOException ioe) {
      if (datanode.isRestarting()) {
        LOG.info("Shutting down for restart (" + block + ").");
      } else {
        LOG.info("Exception for " + block, ioe);
        throw ioe;
      }
    } finally {
      ...// 清理
    }
  }
复制代码

仍旧分接收packet与响应ack两部分讨论。

同步接收packet:BlockReceiver#receivePacket()

根据前文的分析,BlockReceiver#receivePacket()负责接收上游的packet,并继续向下游节点管道写:

private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);

    PacketHeader header = packetReceiver.getHeader();
    ...// 略

    // 检查packet头
    if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
      throw new IOException("Received an out-of-sequence packet for " + block + 
          "from " + inAddr + " at offset " + header.getOffsetInBlock() +
          ". Expecting packet starting at " + replicaInfo.getNumBytes());
    }
    if (header.getDataLen() < 0) {
      throw new IOException("Got wrong length during writeBlock(" + block + 
                            ") from " + inAddr + " at offset " + 
                            header.getOffsetInBlock() + ": " +
                            header.getDataLen()); 
    }

    long offsetInBlock = header.getOffsetInBlock();
    long seqno = header.getSeqno();
    boolean lastPacketInBlock = header.isLastPacketInBlock();
    final int len = header.getDataLen();
    boolean syncBlock = header.getSyncBlock();

    ...// 略
    
    // 若是不须要当即持久化也不须要校验收到的数据,则能够当即委托PacketResponder线程返回 SUCCESS 的ack,而后再进行校验和持久化
    if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    // 管道写相关:将in中收到的packet镜像写入mirrorOut
    if (mirrorOut != null && !mirrorError) {
      try {
        long begin = Time.monotonicNow();
        packetReceiver.mirrorPacketTo(mirrorOut);
        mirrorOut.flush();
        long duration = Time.monotonicNow() - begin;
        if (duration > datanodeSlowLogThresholdMs) {
          LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
        }
      } catch (IOException e) {
        // 假设没有发生中断,则此处仅仅标记mirrorError = true
        handleMirrorOutError(e);
      }
    }
    
    ByteBuffer dataBuf = packetReceiver.getDataSlice();
    ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
    
    if (lastPacketInBlock || len == 0) {    // 收到空packet多是表示心跳或数据块发送
      // 这两种状况均可以尝试把以前的数据刷到磁盘
      if (syncBlock) {
        flushOrSync(true);
      }
    } else {    // 不然,须要持久化packet
      final int checksumLen = diskChecksum.getChecksumSize(len);
      final int checksumReceivedLen = checksumBuf.capacity();

      // packet头有错误,直接抛出IOE
      if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
        throw new IOException("Invalid checksum length: received length is "
            + checksumReceivedLen + " but expected length is " + checksumLen);
      }

      // 若是是管道中的最后一个节点,则持久化以前,要先对收到的packet作一次校验(使用packet自己的校验机制)
      if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
        try {
          // 若是校验失败,抛出IOE
          verifyChunks(dataBuf, checksumBuf);
        } catch (IOException ioe) {
          // 若是校验错误,则委托PacketResponder线程返回 ERROR_CHECKSUM 的ack
          if (responder != null) {
            try {
              ((PacketResponder) responder.getRunnable()).enqueue(seqno,
                  lastPacketInBlock, offsetInBlock,
                  Status.ERROR_CHECKSUM);
              // 等3s,指望PacketResponder线程能把全部ack都发送完(这样就不须要从新发送那么多packet了)
              Thread.sleep(3000);
            } catch (InterruptedException e) {
              // 不作处理,也不清理中断标志,仅仅中止sleep
            }
          }
          // 若是校验错误,则认为上游节点收到的packet也是错误的,直接抛出IOE
          throw new IOException("Terminating due to a checksum error." + ioe);
        }
 
        ...// checksum 翻译相关
      }

      if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
        // checksum is missing, need to calculate it
        checksumBuf = ByteBuffer.allocate(checksumLen);
        diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
      }

      final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
          && streams.isTransientStorage();
      try {
        long onDiskLen = replicaInfo.getBytesOnDisk();
        if (onDiskLen<offsetInBlock) {
          ...// 若是校验块不完整,须要加载并调整旧的meta文件内容,供后续从新计算crc

          // 写block文件
          int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
              + dataBuf.arrayOffset() + dataBuf.position();
          int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
          
          // 写meta文件
          final byte[] lastCrc;
          if (shouldNotWriteChecksum) {
            lastCrc = null;
          } else if (partialCrc != null) {  // 若是是校验块不完整(以前收到过一部分)
            ...// 从新计算crc
            ...// 更新lastCrc
            checksumOut.write(buf);
            partialCrc = null;
          } else { // 若是校验块完整
            ...// 更新lastCrc
            checksumOut.write(checksumBuf.array(), offset, checksumLen);
          }

          ...//略
        }
      } catch (IOException iex) {
        // 异步检查磁盘
        datanode.checkDiskErrorAsync();
        // 从新抛出IOE
        throw iex;
      }
    }

    // 相反的,若是须要当即持久化或须要校验收到的数据,则如今已经完成了持久化和校验,能够委托PacketResponder线程返回 SUCCESS 的ack
    // if sync was requested, put in queue for pending acks here
    // (after the fsync finished)
    if (responder != null && (syncBlock || shouldVerifyChecksum())) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    ...// 若是超过了响应时间,还要主动发送一个IN_PROGRESS的ack,防止超时

    ...// 节流器相关
    
    // 当整个数据块都发送完成以前,客户端会可能会发送有数据的packet,也由于维持心跳或表示结束写数据块发送空packet
    // 所以,当标志位lastPacketInBlock为true时,不能返回0,要返回一个负值,以区分未到达最后一个packet以前的状况
    return lastPacketInBlock?-1:len;
  }
  
  ...
  
  private boolean shouldVerifyChecksum() {
    // 对于客户端写,只有管道中的最后一个节点知足`mirrorOut == null`
    return (mirrorOut == null || isDatanode || needsChecksumTranslation);
  }
  
  ...
  
  private void handleMirrorOutError(IOException ioe) throws IOException {
    String bpid = block.getBlockPoolId();
    LOG.info(datanode.getDNRegistrationForBP(bpid)
        + ":Exception writing " + block + " to mirror " + mirrorAddr, ioe);
    if (Thread.interrupted()) { // 若是BlockReceiver线程被中断了,则从新抛出IOE
      throw ioe;
    } else {    // 不然,仅仅标记下游节点错误,交给外层处理
      mirrorError = true;
    }
  }
复制代码

对管道写过程的分析要分尾节点与中间节点两种状况展开:

  • 若是是尾节点,则持久化以前,要先对收到的packet作一次校验(使用packet自己的校验机制)。若是校验失败,则委托PacketResponder线程发送ERROR_CHECKSUM状态的ack,并再次抛出IOE。
  • 若是是中间节点,则只须要向下游镜像写packet。假设在非中断的状况下发生异常,则仅仅标记mirrorError = true。这形成两个影响:
    1. 后续包都不会再写往下游节点,最终socket超时关闭,并逐级关闭上下游管道。
    2. 上游将经过ack得知下游发生了错误(见后)。

尾节点异常的处理仍是走方案1,中间节点同时走方案1与方案2。

异步发送ack:PacketResponder线程

根据前文的分析,PacketResponder线程负责接收下游节点的ack,并继续向上游管道响应:

public void run() {
      boolean lastPacketInBlock = false;
      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
      while (isRunning() && !lastPacketInBlock) {
        long totalAckTimeNanos = 0;
        boolean isInterrupted = false;
        try {
          Packet pkt = null;
          long expected = -2;
          PipelineAck ack = new PipelineAck();
          long seqno = PipelineAck.UNKOWN_SEQNO;
          long ackRecvNanoTime = 0;
          try {
            // 若是当前节点不是管道的最后一个节点,且下游节点正常,则从下游读取ack
            if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
              ack.readFields(downstreamIn);
              ...// 统计相关
              // 若是下游状态为OOB,则继续向上游发送OOB
              Status oobStatus = ack.getOOBStatus();
              if (oobStatus != null) {
                LOG.info("Relaying an out of band ack of type " + oobStatus);
                sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
                    Status.SUCCESS);
                continue;
              }
              seqno = ack.getSeqno();
            }
            // 若是从下游节点收到了正常的 ack,或当前节点是管道的最后一个节点,则须要从队列中消费pkt(即BlockReceiver#receivePacket()放入的ack)
            if (seqno != PipelineAck.UNKOWN_SEQNO
                || type == PacketResponderType.LAST_IN_PIPELINE) {
              pkt = waitForAckHead(seqno);
              if (!isRunning()) {
                break;
              }
              // 管道写用seqno控制packet的顺序:当且仅当下游正确接收的序号与当前节点正确处理完的序号相等时,当前节点才认为该序号的packet已正确接收;上游同理
              expected = pkt.seqno;
              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
                  && seqno != expected) {
                throw new IOException(myString + "seqno: expected=" + expected
                    + ", received=" + seqno);
              }
              ...// 统计相关
              lastPacketInBlock = pkt.lastPacketInBlock;
            }
          } catch (InterruptedException ine) {
            // 记录异常标记,标志当前InterruptedException
            isInterrupted = true;
          } catch (IOException ioe) {
            ...// 异常处理
            if (Thread.interrupted()) { // 若是发生了中断(与本地变量isInterrupted区分),则记录中断标记
              isInterrupted = true;
            } else {
              // 这里将全部异常都标记mirrorError = true不太合理,但影响不大
              mirrorError = true;
              LOG.info(myString, ioe);
            }
          }

          // 中断退出
          if (Thread.interrupted() || isInterrupted) {
            LOG.info(myString + ": Thread is interrupted.");
            running = false;
            continue;
          }

          // 若是是最后一个packet,将block的状态转换为FINALIZED,并关闭BlockReceiver
          if (lastPacketInBlock) {
            finalizeBlock(startTime);
          }

          // 此时,必然知足 ack.seqno == pkt.seqno,构造新的 ack 发送给上游
          sendAckUpstream(ack, expected, totalAckTimeNanos,
              (pkt != null ? pkt.offsetInBlock : 0), 
              (pkt != null ? pkt.ackStatus : Status.SUCCESS));
          // 已经处理完队头元素,出队
          // 只有一种状况下知足pkt == null:PacketResponder#isRunning()返回false,即PacketResponder线程正在关闭。此时不管队列中是否有元素,都不须要出队了
          if (pkt != null) {
            removeAckHead();
          }
        } catch (IOException e) {
          // 一旦发现IOE,若是不是由于中断引发的,就中断线程
          LOG.warn("IOException in BlockReceiver.run(): ", e);
          if (running) {
            datanode.checkDiskErrorAsync();
            LOG.info(myString, e);
            running = false;
            if (!Thread.interrupted()) { // failure not caused by interruption
              receiverThread.interrupt();
            }
          }
        } catch (Throwable e) {
          // 其余异常则直接中断
          if (running) {
            LOG.info(myString, e);
            running = false;
            receiverThread.interrupt();
          }
        }
      }
      LOG.info(myString + " terminating");
    }
    
    ...
    
    // PacketResponder#sendAckUpstream()封装了PacketResponder#sendAckUpstreamUnprotected()
    private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, Status myStatus) throws IOException {
      Status[] replies = null;
      if (ack == null) { // 发送OOB ack时,要求ack为null,myStatus为OOB。什么破设计。。。
        replies = new Status[1];
        replies[0] = myStatus;
      } else if (mirrorError) { // 前面置为true的mirrorError,在此处派上用场
        replies = MIRROR_ERROR_STATUS;
      } else {  // 不然,正常构造replies
        short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
            .getNumOfReplies();
        replies = new Status[1 + ackLen];
        replies[0] = myStatus;
        for (int i = 0; i < ackLen; i++) {
          replies[i + 1] = ack.getReply(i);
        }
        // 若是下游有ERROR_CHECKSUM,则抛出IOE,中断当前节点的PacketResponder线程(结合后面的代码,能保证从第一个ERROR_CHECKSUM节点开始,上游的全部节点都是ERROR_CHECKSUM的)
        if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
          throw new IOException("Shutting down writer and responder "
              + "since the down streams reported the data sent by this "
              + "thread is corrupt");
        }
      }
      
      // 构造replyAck,发送到上游
      PipelineAck replyAck = new PipelineAck(seqno, replies,
          totalAckTimeNanos);
      if (replyAck.isSuccess()
          && offsetInBlock > replicaInfo.getBytesAcked()) {
        replicaInfo.setBytesAcked(offsetInBlock);
      }
      long begin = Time.monotonicNow();
      replyAck.write(upstreamOut);
      upstreamOut.flush();
      long duration = Time.monotonicNow() - begin;
      if (duration > datanodeSlowLogThresholdMs) {
        LOG.warn("Slow PacketResponder send ack to upstream took " + duration
            + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
            + ", replyAck=" + replyAck);
      } else if (LOG.isDebugEnabled()) {
        LOG.debug(myString + ", replyAck=" + replyAck);
      }

      // 若是当前节点是ERROR_CHECKSUM状态,则发送ack后,抛出IOE
      if (myStatus == Status.ERROR_CHECKSUM) {
        throw new IOException("Shutting down writer and responder "
            + "due to a checksum error in received data. The error "
            + "response has been sent upstream.");
      }
    }
复制代码

对于OOB,还要关注PipelineAck#getOOBStatus():

public Status getOOBStatus() {
    // seqno不等于UNKOWN_SEQNO的话,就必定不是OOB状态
    if (getSeqno() != UNKOWN_SEQNO) {
      return null;
    }
    // 有任何一个下游节点是OOB,则认为下游管道是OOB状态(固然,该机制保证从第一个OOB节点开始,在每一个节点查看ack时,都能发现下游有节点OOB)
    for (Status reply : proto.getStatusList()) {
      // The following check is valid because protobuf guarantees to
      // preserve the ordering of enum elements.
      if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) {
        return reply;
      }
    }
    return null;
  }
复制代码

与以前的分支相比,PacketResponder线程大量使用中断来代替抛异常使线程终止。除此以外,关于OOB状态与ERROR_CHECKSUM状态的处理有些特殊:

  • OOB状态:将第一个OOB节点的状态,传递到客户端。OOB是由datanode重启引发的,所以,第一个OOB节点在发送OOB的ack后,就不会再发送其余ack,最终因为引发socket超时引发整个管道的关闭。
  • ERROR_CHECKSUM状态:只有尾节点可能发出ERROR_CHECKSUM状态的ack,发送后抛出IOE主动关闭PacketResponder线程而后上游节点收到ERROR_CHECKSUM状态的ack后,也将抛出IOE关闭PacketResponder线程,但再也不发送ack;若是还有上游节点,将由于长期收不到ack,socket超时关闭。最终关闭整个管道。

须要注意的,OOB一般能保证传递到客户端;但尾节点发送的ERROR_CHECKSUM没法保证被上游节点发现(先发ack再抛IOE只是一种努力,不过一般能保证),若是多于两个备份,则必定不会被客户端发现。

猴子没明白为何此处要使用中断使线程终止。

总结

尽管总览中列出了两种方案,但能够看到,做为异常处理的主要方式,主要仍是依靠方案1:抛异常关socket,而后逐级致使管道关闭。

关闭管道后,由客户端决定后续处理,如数据块恢复等。


本文连接:源码|HDFS之DataNode:写数据块(3)
做者:猴子007
出处:monkeysayhi.github.io
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布,欢迎转载,演绎或用于商业目的,可是必须保留本文的署名及连接。

相关文章
相关标签/搜索