Standby namenode (sbn)的EditLogTailer的功能之一就是触发Active Namenode(nn) rollEditLog。每隔${dfs.ha.tail-edits.period}秒(默认60)秒,EditLogTailer检测一次当前时间距离上一次roll的时间是否超过${dfs.ha.log-roll.period}秒(默认120),若是超过就经过rpc请求触发nn 进行rollEditLog。nn经过NameNodeRpcServer接收rpc请求,以后由FSNamesystem处理:java
rollEditLog周期性将事务日志记录到一个个独立的小文件,长此以往,必然会在nn和全部jn上产生大量的文件,一旦checkpoint,这些文件大都没什么用,所以须要有一个清理策略。node
sbn在完成checkpoint【参考checkpoint过程】 后会触发sbn、nn和jn对保留在磁盘上的历史版本的元数据文件:fsimage文件和edits文件进行清理。清理过程就是根据配置的事物保留策略和镜像保留策略将超出保留范围的fsimage文件和edits文件直接删除。ide
操做的入口为Fsimage的purgeOldStorage(NameNodeFile nnf)方法,实际管理fsimage文件和edits文件的是NNStorageRetentionManager,由它经过配置计算出哪些文件保留、哪些文件删除。ui
public static final String DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY = "dfs.namenode.num.checkpoints.retained"; public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2; public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained"; public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained"; public static final int DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
public NNStorageRetentionManager( Configuration conf, NNStorage storage, LogsPurgeable purgeableLogs, StoragePurger purger) { this.numCheckpointsToRetain = conf.getInt( DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT); this.numExtraEditsToRetain = conf.getLong( DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT); this.maxExtraEditsSegmentsToRetain = conf.getInt( DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT); Preconditions.checkArgument(numCheckpointsToRetain > 0, "Must retain at least one checkpoint"); Preconditions.checkArgument(numExtraEditsToRetain >= 0, DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY + " must not be negative"); this.storage = storage; this.purgeableLogs = purgeableLogs; this.purger = purger; }
第一步:肯定fsimage文件的清理范围。NNStorageRetentionManager经过getImageTxIdToRetain( )方法找出全部的fsimage文件,并按照TxId升序存入集合,根据配置的fsimage文件保留数量(numCheckpointsToRetain)和集合size肯定起始保留范围minTxId,TxId大于等于minTxId的fsimage文件保留,Txid小于minTxid的fsimage文件将被删除。this
private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) { //找出全部的fsimage文件,根据文件名解析出txid,而后构建出FSImageFile对象 List<FSImageFile> images = inspector.getFoundImages(); //使用TreeSet 保证imageTxIds 内保存的txid 按升序排序 TreeSet<Long> imageTxIds = Sets.newTreeSet(); for (FSImageFile image : images) { imageTxIds.add(image.getCheckpointTxId()); } List<Long> imageTxIdsList = Lists.newArrayList(imageTxIds); if (imageTxIdsList.isEmpty()) { return 0; } //imageTxIdsList保存的txid 降序排序,保证全部的fsimage按txid由大到小(生成时间由近及远)的顺序排序,方便肯定删除的位置 Collections.reverse(imageTxIdsList); int toRetain = Math.min(numCheckpointsToRetain, imageTxIdsList.size()); long minTxId = imageTxIdsList.get(toRetain - 1); //txid小于minTxId的fsimage文件将被删除,其余的保留 LOG.info("Going to retain " + toRetain + " images with txid >= " + minTxId); return minTxId; }
第二步:清理fsimage文件。NNStorageRetentionManager调用purgeCheckpointsOlderThan( )方法进行fsimage文件清理。遍历存储目录下的每个fsimage文件,只要其Txid小于minTxid,就直接删除。删除过程由DeletionStoragePurger完成,先删除fsimsge文件,而后删除对应保存md5值的文件。spa
第三步:肯定edits文件的清理范围。根据第一步算出的minTxid(minImageTxId)、配置的事物保留数量numExtraEditsToRetain(默认1000000)和配置的最大edits文件保留数量maxExtraEditsSegmentsToRetain(默认10000)肯定清理范围purgeLogsFrom.net
// If fsimage_N is the image we want to keep, then we need to keep // all txns > N. We can remove anything < N+1, since fsimage_N // reflects the state up to and including N. However, we also // provide a "cushion" of older txns that we keep, which is // handy for HA, where a remote node may not have as many // new images. // // First, determine the target number of extra transactions to retain based // on the configured amount. long minimumRequiredTxId = minImageTxId + 1; long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain); ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>(); purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false); Collections.sort(editLogs, new Comparator<EditLogInputStream>() { @Override public int compare(EditLogInputStream a, EditLogInputStream b) { return ComparisonChain.start() .compare(a.getFirstTxId(), b.getFirstTxId()) .compare(a.getLastTxId(), b.getLastTxId()) .result(); } }); // Remove from consideration any edit logs that are in fact required. while (editLogs.size() > 0 && editLogs.get(editLogs.size() - 1).getFirstTxId() >= minimumRequiredTxId) { editLogs.remove(editLogs.size() - 1); } // Next, adjust the number of transactions to retain if doing so would mean // keeping too many segments around. while (editLogs.size() > maxExtraEditsSegmentsToRetain) { purgeLogsFrom = editLogs.get(0).getLastTxId() + 1; editLogs.remove(0); } // Finally, ensure that we're not trying to purge any transactions that we // actually need. if (purgeLogsFrom > minimumRequiredTxId) { throw new AssertionError("Should not purge more edits than required to " + "restore: " + purgeLogsFrom + " should be <= " + minimumRequiredTxId); } purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
第四步:清理edits文件。rest
sbn:sbn每完成一次checkpoint就会清理一次。日志
nn:当sbn在完成checkpoint后会将新的fsimaeg文件上传到nn,nn经过ImageServlet接收sbn上传的fsimage文件,以后便会对fsimage调用purgeOldStorage(NameNodeFile nnf)方法进行清理。code
jn:nn在清理过程当中会向全部的jn发送包含清理位置minTxIdToKeep的rpc请求,jn在收到请求后将txid小于minTxIdToKeep的edits文件所有清理。