compact一中介绍了HBASE compact的调度流程,本篇文章主要介绍实际进行compact的过程。先从上文中的chore中接入,在HRegionserver中的compactChecker chore方法中,会判断是否须要compact,以下:java
protected void chore() { //遍历instance下的全部online的region 进行循环检测 //onlineRegions是HRegionServer上存储的全部可以提供有效服务的在线Region集合; for (HRegion r : this.instance.onlineRegions.values()) { if (r == null) continue; //取出每一个region的store for (Store s : r.getStores().values()) { try { //检查是否须要compact的时间间隔,通常状况是在好比memstore flush后或者其余事件触发compact的,可是有时也须要不一样的compact策略, // 因此须要周期性的检查具体间隔=hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,默认1000; long multiplier = s.getCompactionCheckMultiplier(); assert multiplier > 0; // 未到整数倍,跳过,每当迭代因子iteration为合并检查倍增器multiplier的整数倍时,才会发起检查 if (iteration % multiplier != 0) continue; if (s.needsCompaction()) {//// 须要合并的话,发起SystemCompaction请求 // Queue a compaction. Will recognize if major is needed. this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + " requests compaction"); } else if (s.isMajorCompaction()) {//若是是majorcompact会走 requestCompaction方法 if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); } else { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", this.majorCompactPriority, null); } } } catch (IOException e) { LOG.warn("Failed major compaction check on " + r, e); } } } iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); }
当判断s.needsCompaction(),则调用compactsplitThread.requstSystemCompaction()方法进行compact;若是判断此时不须要进行compact,则会调用isMajorCompaction判断是否须要进行major compact,若是是major compact会调用CompactSplitThread.requestCompaction()方法。不论是requestSystemCompaction方法也好,仍是requestCompaction方法也好,最终都是调用的requestCompactionInternal方法,只是方法参数不一样。下面咱们从requestSystemCompaction开始继续深刻了解。requestSystemCompaction的具体逻辑以下:算法
public void requestSystemCompaction( final HRegion r, final Store s, final String why) throws IOException { requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false); }
继续跟进到requestCompactionInternal方法:shell
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s, final String why, int priority, CompactionRequest request, boolean selectNow) throws IOException { //首选作一些必要的环境判断,好比HRegionServer是否已中止、HRegion对应的表是否容许Compact操做 if (this.server.isStopped() || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) { return null; } CompactionContext compaction = null; //系统自动触发的system compaction,selectNow参数为false,若是是hbase shell等人为触发的合并,则selectNow为true if (selectNow) { // 经过hbase shell触发的major compaction,selectNow为true.这里进行实际的选取待合并文件操做 compaction = selectCompaction(r, s, priority, request); if (compaction == null) return null; // message logged inside } // We assume that most compactions are small. So, put system compactions into small // pool; we will do selection there, and move to large pool if necessary. // 咱们假设大部分合并都是small。因此,将系统引起的合并放进small pool, // 在那里咱们会作出选择,若是有必要的话会挪至large pool // 也就是说,若是selectNow为false,即系统自身引起的合并,好比MemStore flush、compact检查线程等,统一放入到shortCompactions中,即small pool // 而若是是人为触发的,好比HBase shell,则还要看HStore中合并请求大小是否超过阈值,超过则放入longCompactions,即large pool,不然仍是small pool //size为compact的全部hfile文件总大小 long size = selectNow ? compaction.getRequest().getSize() : 0; ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? longCompactions : shortCompactions; pool.execute(new CompactionRunner(s, r, compaction, pool)); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); } return selectNow ? compaction.getRequest() : null; }
在requestCompactionInternal方法中,逻辑过程总结以下:app
CompactionRunner的详细流程以下:ide
private class CompactionRunner implements Runnable, Comparable<CompactionRunner> { private final Store store; private final HRegion region; private CompactionContext compaction; private int queuedPriority; private ThreadPoolExecutor parent; public CompactionRunner(Store store, HRegion region, CompactionContext compaction, ThreadPoolExecutor parent) { super(); this.store = store; this.region = region; this.compaction = compaction; // 合并排队的优先级,若是合并上下文compaction为空,则经过HStore的getCompactPriority()方法获取,不然直接从合并请求中获取, // 而合并请求中的,实际上也是经过调用requestCompactionInternal()方法的priority传入的 this.queuedPriority = (this.compaction == null) ? store.getCompactPriority() : compaction.getRequest().getPriority(); this.parent = parent; } @Override public String toString() { return (this.compaction != null) ? ("Request = " + compaction.getRequest()) : ("Store = " + store.toString() + ", pri = " + queuedPriority); } @Override public void run() { Preconditions.checkNotNull(server); // 首选作一些必要的环境判断,好比HRegionServer是否已中止、HRegion对应的表是否容许Compact操做 if (server.isStopped() || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { return; } // Common case - system compaction without a file selection. Select now. // 常见的,系统合并尚未选择待合并的文件。如今选择下。 if (this.compaction == null) { int oldPriority = this.queuedPriority; this.queuedPriority = this.store.getCompactPriority(); // 若是当前优先级queuedPriority大于以前的oldPriority if (this.queuedPriority > oldPriority) { // Store priority decreased while we were in queue (due to some other compaction?), // requeue with new priority to avoid blocking potential higher priorities. // 将该CompactionRunner在扔回线程池 this.parent.execute(this); return; } try { //选择候选hfile this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); return; } if (this.compaction == null) return; // nothing to do // Now see if we are in correct pool for the size; if not, go to the correct one. // We might end up waiting for a while, so cancel the selection. assert this.compaction.hasSelection(); ThreadPoolExecutor pool = store.throttleCompaction( compaction.getRequest().getSize()) ? longCompactions : shortCompactions; if (this.parent != pool) {// 换池了 this.store.cancelRequestedCompaction(this.compaction); // HStore取消合并请求 this.compaction = null; // 复位compaction为null this.parent = pool; // 换池 this.parent.execute(this); // 放入线程池,后续会再初始化compaction return; } } // Finally we can compact something. assert this.compaction != null; // 执行以前 this.compaction.getRequest().beforeExecute(); try { // Note: please don't put single-compaction logic here; // put it into region/store/etc. This is CST logic. long start = EnvironmentEdgeManager.currentTime(); // 调用HRegion的compact,针对store执行compact boolean completed = region.compact(compaction, store, compactionThroughputController); long now = EnvironmentEdgeManager.currentTime(); LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); if (completed) { // degenerate case: blocked regions require recursive enqueues if (store.getCompactPriority() <= 0) { // 若是优先级Priority小于等于0,意味着当前文件已经太多,则须要发起一次SystemCompaction requestSystemCompaction(region, store, "Recursive enqueue"); } else { // 请求分裂,其实是看Region的大小是否超过阈值,从而引发分裂 // see if the compaction has caused us to exceed max region size requestSplit(region); } } } catch (IOException ex) { IOException remoteEx = RemoteExceptionHandler.checkIOException(ex); LOG.error("Compaction failed " + this, remoteEx); if (remoteEx != ex) { LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); } server.checkFileSystem(); } catch (Exception ex) { LOG.error("Compaction failed " + this, ex); server.checkFileSystem(); } finally { LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); } this.compaction.getRequest().afterExecute(); }
如上所示,在CompactRunner中:post
接下来咱们挨个看每一个阶段具体作了啥,首先是selectCompaction方法。该方法选取要进行compact的file,并构造一个compactionContext对象返回,具体逻辑以下:ui
private CompactionContext selectCompaction(final HRegion r, final Store s, int priority, CompactionRequest request) throws IOException { // 调用HStore的requestCompaction()方法,获取CompactionContext CompactionContext compaction = s.requestCompaction(priority, request); if (compaction == null) { if(LOG.isDebugEnabled()) { LOG.debug("Not compacting " + r.getRegionNameAsString() + " because compaction request was cancelled"); } return null; } // 确保CompactionContext中合并请求request不为空 assert compaction.hasSelection(); if (priority != Store.NO_PRIORITY) { compaction.getRequest().setPriority(priority); } return compaction; }
可见,最终是调用store的requestCompaction方法获取compactionContext的。继续跟进到里面看一下发生了啥。this
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException { // don't even select for compaction if writes are disabled // 若是对应HRegion不可写,直接返回null if (!this.areWritesEnabled()) { return null; } // Before we do compaction, try to get rid of unneeded files to simplify things. // 在咱们作合并以前,试着摆脱没必要要的文件来简化事情 removeUnneededFiles(); // 经过存储引擎storeEngine建立合并上下文CompactionContext CompactionContext compaction = storeEngine.createCompaction(); CompactionRequest request = null; // 加读锁 this.lock.readLock().lock(); try { synchronized (filesCompacting) { // First, see if coprocessor would want to override selection. if (this.getCoprocessorHost() != null) { // 经过CompactionContext的preSelect()方法,选择StoreFile,返回StoreFilel列表 List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); boolean override = this.getCoprocessorHost().preCompactSelection( this, candidatesForCoproc, baseRequest); if (override) { // Coprocessor is overriding normal file selection. compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); } } // Normal case - coprocessor is not overriding file selection. if (!compaction.hasSelection()) {// 若是合并请求为空,即不存在协处理器 // 是否为UserCompaction boolean isUserCompaction = priority == Store.PRIORITY_USER; boolean mayUseOffPeak = offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); try { // 调用CompactionContext的select()方法 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); } catch (IOException e) { if (mayUseOffPeak) { offPeakCompactionTracker.set(false); } throw e; } assert compaction.hasSelection(); if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { // Compaction policy doesn't want to take advantage of off-peak. offPeakCompactionTracker.set(false); } } if (this.getCoprocessorHost() != null) { this.getCoprocessorHost().postCompactSelection( this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest); } // Selected files; see if we have a compaction with some custom base request. // 若是以前传入的请求不为空,则合并之 if (baseRequest != null) { // Update the request with what the system thinks the request should be; // its up to the request if it wants to listen. compaction.forceSelect( baseRequest.combineWith(compaction.getRequest())); } // Finally, we have the resulting files list. Check if we have any files at all. // 获取合并请求request request = compaction.getRequest(); // 从合并请求request中获取待合并文件集合selectedFiles final Collection<StoreFile> selectedFiles = request.getFiles(); if (selectedFiles.isEmpty()) { return null; } // 将选择的文件集合加入到filesCompacting中,解答了以前文章的疑问 addToCompactingFiles(selectedFiles); // 是否为major合并 // If we're enqueuing a major, clear the force flag. this.forceMajor = this.forceMajor && !request.isMajor(); // Set common request properties. // Set priority, either override value supplied by caller or from store. request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); } } finally { this.lock.readLock().unlock(); } LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + (request.isAllFiles() ? " (all files)" : "")); // 调用HRegion的reportCompactionRequestStart()方法,汇报一个compact请求开始 this.region.reportCompactionRequestStart(request.isMajor()); // 返回合并上下文compaction return compaction; }
咱们总结一下上面流程的逻辑过程。线程
下面先看下removeUnneededFiles方法,其主要是根据file的最大时间戳排除一些不必的文件,将已经expired的file加入到compactingfiles中:debug
private void removeUnneededFiles() throws IOException { if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return; if (getFamily().getMinVersions() > 0) { LOG.debug("Skipping expired store file removal due to min version being " + getFamily().getMinVersions()); return; } this.lock.readLock().lock(); Collection<StoreFile> delSfs = null; try { synchronized (filesCompacting) {
//获取设置的ttl时间,若是没设置,默认为long.maxnium long cfTtl = getStoreFileTtl(); if (cfTtl != Long.MAX_VALUE) {//若是不是forever
//最终调用getUnneededFiles delSfs = storeEngine.getStoreFileManager().getUnneededFiles( EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
//将unneede以后的file加入到compactingfiles中 addToCompactingFiles(delSfs); } } } finally { this.lock.readLock().unlock(); } if (delSfs == null || delSfs.isEmpty()) return; Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files. writeCompactionWalRecord(delSfs, newFiles); replaceStoreFiles(delSfs, newFiles); completeCompaction(delSfs); LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1)); }
getUnneededFiles方法逻辑以下
public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) { Collection<StoreFile> expiredStoreFiles = null; ImmutableList<StoreFile> files = storefiles; // 1) We can never get rid of the last file which has the maximum seqid. // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
for (int i = 0; i < files.size() - 1; ++i) { StoreFile sf = files.get(i); long fileTs = sf.getReader().getMaxTimestamp();
//若是文件的最大时间戳小于设置的ttl大小且不在compactingfile中 if (fileTs < maxTs && !filesCompacting.contains(sf)) { LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); if (expiredStoreFiles == null) { expiredStoreFiles = new ArrayList<StoreFile>(); } expiredStoreFiles.add(sf); } }
//返回须要排除的文件列表 return expiredStoreFiles; }
走下去可见是调用compactionContext的select方法进行文件的选取
public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, boolean forceMajor) throws IOException { // 利用合并策略compactionPolicy的selectCompaction()方法,获取合并请求request request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor); // 返回是否获得request的标志,true or false return request != null; }
可见,select中,根据指定的compactpolicy策略进行selectCompaction,选取文件。咱们的线上环境没有指定,则采用的是default的ratio,以下:
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles, final List<StoreFile> filesCompacting, final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { // Preliminary compaction subject to filters // 初步压缩过滤器,即根据传入的参数candidateFiles,建立一个候选的StoreFile列表 ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles); // Stuck and not compacting enough (estimate). It is not guaranteed that we will be // able to compact more if stuck and compacting, because ratio policy excludes some // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). // 肯定futureFiles,若是filesCompacting为空则为0,不然为1 int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
//根据blockingstorefiles配置,判断是否阻塞 boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) >= storeConfigInfo.getBlockingFileCount(); // 从候选列表candidateSelection中排除正在合并的文件,即filesCompacting中的文件 candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); // If we can't have all files, we cannot do major anyway // 验证是否包含全部文件,设置标志位isAllFiles,判断的条件就是此时的候选列表candidateSelection大小是否等于初始的candidateFiles列表大小, // 而candidateFiles表明了Store下的所有文件 boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); // 若是没有包含全部文件,则不可能为一个Major合并 if (!(forceMajor && isAllFiles)) { // 若是不是强制的Major合并,且不包含全部的文件,则调用skipLargeFiles()方法,跳过较大文件 candidateSelection = skipLargeFiles(candidateSelection); // 再次肯定标志位isAllFiles isAllFiles = candidateFiles.size() == candidateSelection.size(); } // Try a major compaction if this is a user-requested major compaction, // or if we do not have too many files to compact and this was requested as a major compaction // 肯定isTryingMajor,共三种状况: // 一、强制Major合并为true,且包含全部问文件,且是一个用户合并 // 二、强制Major合并,且包含全部问文件,或者自己判断后就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目 boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction) || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection)) && (candidateSelection.size() < comConf.getMaxFilesToCompact())); // Or, if there are any references among the candidates. // candidates中存在引用的话,则视为是在分裂后的文件 boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); // 若是不是TryingMajor,且不是在分裂后 if (!isTryingMajor && !isAfterSplit) { // We're are not compacting all files, let's see what files are applicable // 再次筛选文件 //经过filterBulk()方法取出不该该位于Minor合并的文件; candidateSelection = filterBulk(candidateSelection); // 经过applyCompactionPolicy()方法,使用必定的算法,进行文件的筛选; candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); //经过checkMinFilesCriteria()方法,判断是否知足合并时最小文件数的要求; candidateSelection = checkMinFilesCriteria(candidateSelection); } // candidateSelection中移除过量的文件 candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor); // Now we have the final file list, so we can determine if we can do major/all files. // 查看是否为所有文件 isAllFiles = (candidateFiles.size() == candidateSelection.size()); // 利用candidateSelection构造合并请求CompactionRequest对象result CompactionRequest result = new CompactionRequest(candidateSelection); result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak); result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); return result; }
其中最主要的逻辑在filterbulk、applyCOmpactPolicy、checkMinFilesCriteria中,下面依次介绍。
private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) { candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() { @Override public boolean apply(StoreFile input) { return input.excludeFromMinorCompaction(); } })); return candidates; }
在filterbulk中主要是经过hfile的fileinfo字段判断,是否将其排除在mincompact以外。
重要的是applyCompactionPolicy方法,该方法具体逻辑以下:
ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { if (candidates.isEmpty()) { return candidates; } // we're doing a minor compaction, let's see what files are applicable int start = 0; // 获取文件合并比例:取参数hbase.hstore.compaction.ratio,默认为1.2 double ratio = comConf.getCompactionRatio(); if (mayUseOffPeak) { // 取参数hbase.hstore.compaction.ratio.offpeak,默认为5.0 ratio = comConf.getCompactionRatioOffPeak(); LOG.info("Running an off-peak compaction, selection ratio = " + ratio); } // get store file sizes for incremental compacting selection. final int countOfFiles = candidates.size(); long[] fileSizes = new long[countOfFiles]; long[] sumSize = new long[countOfFiles]; for (int i = countOfFiles - 1; i >= 0; --i) { StoreFile file = candidates.get(i); fileSizes[i] = file.getReader().length(); // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo // tooFar表示后移动最大文件数位置的文件大小,其实也就是刚刚知足达到最大文件数位置的那个文件, // 也就是说,从i至tooFar数目为合并时容许的最大文件数 int tooFar = i + comConf.getMaxFilesToCompact() - 1; sumSize[i] = fileSizes[i] + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); } // 倒序循环,若是文件数目知足最小合并时容许的最小文件数,且该位置的文件大小, // 大于合并时容许的文件最小大小与下一个文件窗口文件总大小乘以必定比例中的较大者,则继续, // 实际上就是选择出一个文件窗口内能最小能知足的文件大小的一组文件 while (countOfFiles - start >= comConf.getMinFilesToCompact() && fileSizes[start] > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio))) { ++start; } if (start < countOfFiles) { LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + " files from " + countOfFiles + " candidates"); } else if (mayBeStuck) { // We may be stuck. Compact the latest files if we can. // 保证最小文件数目的要求 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact(); if (filesToLeave >= 0) { start = filesToLeave; } } candidates.subList(0, start).clear(); return candidates; }
上述过程能够参照ratioCompactionPolicy策略,应该有大量文章介绍,此处再也不详细介绍其过程
下面是checkMinFilesCriteria方法,判断applyCompactionPolicy策略选出的file是否知足合并时的最小文件数要求。若是不知足要求,则直接清空candidates。
private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) { int minFiles = comConf.getMinFilesToCompact(); if (candidates.size() < minFiles) { if(LOG.isDebugEnabled()) { LOG.debug("Not compacting files because we only have " + candidates.size() + " files ready for compaction. Need " + minFiles + " to initiate."); } candidates.clear(); } return candidates; }
选出candidatesfile后,须要经过removeExcessFiles方法判断选出的文件数是否大于了配置中的compact.files.max参数的值。若是超过,则删除值知足配置要求。
最后根据candidatesfiles构造compactionRequest
说了这么多,都是CompactRunner run方法中的selectCompaction部分,下面是真正的执行compact的环节,该环节是经过region.compact方法执行。
public boolean compact(CompactionContext compaction, Store store, CompactionThroughputController throughputController) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); //若是这个region正在执行close操做或者已经closed,则取消compact if (this.closing.get() || this.closed.get()) { LOG.debug("Skipping compaction on " + this + " because closing/closed"); store.cancelRequestedCompaction(compaction); return false; } MonitoredTask status = null; boolean requestNeedsCancellation = true; // block waiting for the lock for compaction lock.readLock().lock(); try { byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); //执行一系列检查 if (stores.get(cf) != store) { LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this + " has been re-instantiated, cancel this compaction request. " + " It may be caused by the roll back of split transaction"); return false; } status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); if (this.closed.get()) { String msg = "Skipping compaction on " + this + " because closed"; LOG.debug(msg); status.abort(msg); return false; } boolean wasStateSet = false; try { synchronized (writestate) { if (writestate.writesEnabled) {//该状态不许读,默认是readonly为false writeEnabled为true //将writestate的compacting值加一 wasStateSet = true; ++writestate.compacting; } else { String msg = "NOT compacting region " + this + ". Writes disabled."; LOG.info(msg); status.abort(msg); return false; } } LOG.info("Starting compaction on " + store + " in region " + this + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":"")); doRegionCompactionPrep(); try { status.setStatus("Compacting store " + store); // We no longer need to cancel the request on the way out of this // method because Store#compact will clean up unconditionally requestNeedsCancellation = false; //最终调用store的compact方法进行compact store.compact(compaction, throughputController); } catch (InterruptedIOException iioe) { String msg = "compaction interrupted"; LOG.info(msg, iioe); status.abort(msg); return false; } } finally { if (wasStateSet) { synchronized (writestate) { --writestate.compacting; if (writestate.compacting <= 0) { writestate.notifyAll(); } } } } status.markComplete("Compaction complete"); return true; } finally { try { if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); if (status != null) status.cleanup(); } finally { lock.readLock().unlock(); } } }
下面是store.compact方法,该方法须要花费必定的时间,里面调用的是compactContext的compact方法,里面又是调用的compactor执行compact。具体逻辑待续