hbase compact流程较多,这里分章节介绍,首先介绍compact在regionserver中的调用流程,并不会涉及真正的compact读取合并文件的流程,后续介绍。java
在regionserver启动时,会初始化compactsplitthread以及CompactionChecker。ide
/* * Check for compactions requests. * 检查合并请求 */ ScheduledChore compactionChecker; // Compactions public CompactSplitThread compactSplitThread;
compactionChecker是ScheduledChore类型,而ScheduledChore是hbase按期执行的一个task,以下所示,由注释可知,是hbase周期性执行的一个task。在Regionserver中能够看到flushChecker成员变量也是ScheduledChore类型的。ScheduledChore继承自Runable,所以是一个线程,主要逻辑在其run方法中。函数
/** * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for * access to the threads in the core thread pool. If an unhandled exception occurs, the chore * cancellation is logged. Implementers should consider whether or not the Chore will be able to * execute within the defined period. It is bad practice to define a ScheduledChore whose execution * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s * thread pool. * <p> * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as * an entry being added to a queue, etc. */ //scheduledChore继承自Runnable 因此Chore是一个线程 //1. 是hbase按期执行的一个task, 2.在它所在的线程内执行 3.提供了loop循环和sleep机制 @InterfaceAudience.Private public abstract class ScheduledChore implements Runnable { }
ScheduledChore中的比较重要和成员变量稍做说明,以下:oop
//睡眠周期 private final int period; //上一次执行改task的时间 private long timeOfLastRun = -1; //本次执行的时间 private long timeOfThisRun = -1; //该ScheduledChore是否完成初始化,在第一次执行该check时会执行,调用的是initChore()方法,该方法直接返回true,不作任何逻辑处理。 private boolean initialChoreComplete = false
其中还有一个重要的成员变量stopper,stopper是实现了Stopper接口的任意一个对象。根据注释可知,stopper是中止ScheduledChore的一种方式,一旦chore察觉到已经stopped了,会cancel它本身。在Regionserver初始化实例化compactionChecker的时候,会将该stopper设置为this,所以,此处觉得这当RS stop时,该chore会感知到,自动cancel其compact。具体的代码:ui
在ScheduledChore中
/** * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been * stopped, it will cancel itself. This is particularly useful in the case where a single stopper * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} * command can cause many chores to stop together. */ private final Stoppable stopper;
在RegionServer中
this.compactionChecker = new CompactionChecker(this,this.frequency, stopper: this)
ScheduledChore中最核心的部分,即其run方法,run()方法经过一系列的判断 而后周期性执行chore()方法。下面咱们一行行解释。this
public void run() { //将timeOfLastRun设置为当前timeOfThisRun,同时将timeOfThisRun设置为当前时间 updateTimeTrackingBeforeRun(); if (missedStartTime() && isScheduled()) { onChoreMissedStartTime(); if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time"); } else if (stopper.isStopped() || !isScheduled()) { cancel(false); cleanup(); if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped"); } else { try { if (!initialChoreComplete) { initialChoreComplete = initialChore(); } else { chore(); } } catch (Throwable t) { if (LOG.isErrorEnabled()) LOG.error("Caught error", t); if (this.stopper.isStopped()) { cancel(false); cleanup(); } } } }
在run方法中,首先调用updateTimeTrackingBeforeRun()方法,该方法很简单,只是简单的update timeOfLastRun和timeOfthsiRun(这两个变量初始化为-1)。每次周期性执行时都会更新。spa
/** * Update our time tracking members. Called at the start of an execution of this chore's run() * method so that a correct decision can be made as to whether or not we missed the start time */ private synchronized void updateTimeTrackingBeforeRun() { timeOfLastRun = timeOfThisRun; timeOfThisRun = System.currentTimeMillis(); }
而后对时间进行判断missedStartTime() && isScheduled(),在compact中isScheduled返回fasle。跳到else if分支,当该chore所依托的载体(此处即为RS)stop了,该chore会自动退出。最终会进入
最后的else分支。在第一次运行时,initialChoreComplete是false,所以会执行initialChore方法,该方法直接返回true,不会作任何处理。
在一切都准备好后,会周期执行chore方法,在Regionserver中有CompactionChecker,继承自ScheduledChore, 实现了本身的chore方法,在该方法中会根据判断执行具体的requestCompact方法,下次介绍,
逻辑中也能够看到,首先是判断是否须要compact,若是须要则不会再判断是否须要majorcompact。以下:
/* * Inner class that runs on a long period checking if regions need compaction. */ private static class CompactionChecker extends ScheduledChore { private final HRegionServer instance; private final int majorCompactPriority; private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; private long iteration = 0; CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { //调用父类的构造方法 super("CompactionChecker", stopper, sleepTime); //将载体h赋值给instance this.instance = h; LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); /* MajorCompactPriority is configurable. * If not set, the compaction will use default priority. */ //设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,默认为Integer.MAX_VALUE this.majorCompactPriority = this.instance.conf. getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); } //ScheduledChore的run方法会一直调用chore函数 @Override protected void chore() { //遍历instance下的全部online的region 进行循环检测 //onlineRegions是HRegionServer上存储的全部可以提供有效服务的在线Region集合; for (HRegion r : this.instance.onlineRegions.values()) { if (r == null) continue; //取出每一个region的store for (Store s : r.getStores().values()) { try { //检查是否须要compact的时间间隔,通常状况是在好比memstore flush后或者其余事件触发compact的,可是有时也须要不一样的compact策略, // 因此须要周期性的检查具体间隔=hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,默认1000; long multiplier = s.getCompactionCheckMultiplier(); assert multiplier > 0; // 未到整数倍,跳过,每当迭代因子iteration为合并检查倍增器multiplier的整数倍时,才会发起检查 if (iteration % multiplier != 0) continue; if (s.needsCompaction()) {//// 须要合并的话,发起SystemCompaction请求,此处最终比较的是是否当前hfile数量减去正在compacting的文件数大于设置的compact min
//值。若知足则执行systemcompact // Queue a compaction. Will recognize if major is needed. this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + " requests compaction"); } else if (s.isMajorCompaction()) { if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); } else { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", this.majorCompactPriority, null); } } } catch (IOException e) { LOG.warn("Failed major compaction check on " + r, e); } } } iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); } }
其中判断是否须要compact比较简单,主要是isMajorCompaction的判断。最主要的逻辑以下:()线程
获取下一次majorcompact的时间mcdebug
获取全部须要compact的file的modify time,已获得全部的file中最小的时间戳lowTimestamp,若是lowTimestamp<now - mc觉得这须要进行major compact了。code
若是此时只有一个file,则进行以下判断
若是未过时,且其block的本定性不要求知足,则进行majorcompact,不然不进行major compact
若是过时,则进行major compact
public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact) throws IOException { boolean result = false; //获取下一次major compact的时间 long mcTime = getNextMajorCompactTime(filesToCompact); if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { return result; } // TODO: Use better method for determining stamp of last major (HBASE-2990) //获取待合并文件中modify的最小时间戳 以及当前时间 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); long now = System.currentTimeMillis(); if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { //lowTimestamp < (now - mcTime)即意味着当前时间位于进行major compact的时间范围以内,要进行compact // Major compaction time has elapsed. long cfTtl = this.storeConfigInfo.getStoreFileTtl(); if (filesToCompact.size() == 1) { // Single file StoreFile sf = filesToCompact.iterator().next(); Long minTimestamp = sf.getMinimumTimestamp(); //文件存留时间oldest long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue(); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( RSRpcServices.getHostname(comConf.conf, false) ); if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { if (LOG.isDebugEnabled()) { LOG.debug("Major compaction triggered on only store " + this + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); } result = true; } else { if (LOG.isDebugEnabled()) { LOG.debug("Skipping major compaction of " + this + " because one (major) compacted file only, oldestTime " + oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); } } } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {////只有一个hfile(最先的ts>ttl)整个文件过时 => 进行marjor compact LOG.debug("Major compaction triggered on store " + this + ", because keyvalues outdated; time since last major compaction " + (now - lowTimestamp) + "ms"); result = true; } } else { if (LOG.isDebugEnabled()) { LOG.debug("Major compaction triggered on store " + this + "; time since last major compaction " + (now - lowTimestamp) + "ms"); } result = true; } } return result;}