hbase mutation操做,好比delete put等,都须要先获取行锁,而后再进行操做,在获取行锁时,是经过HRegion.getRowLockInternal(byte[] row, boolean waitForLock)进行的,所以,咱们先大致浏览一下这个方法的流程,以下。能够看到,该方法中主要涉及到行锁相关的内容为RowLock和RowLockContext两个类。这两个都是HRegion的内部类,下面详细看一下这两个类是咋实现的。java
protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException { HashedBytes rowKey = new HashedBytes(row); RowLockContext rowLockContext = new RowLockContext(rowKey); // loop until we acquire the row lock (unless !waitForLock) while (true) { RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); if (existingContext == null) { // Row is not already locked by any thread, use newly created context. break; } else if (existingContext.ownedByCurrentThread()) { // Row is already locked by current thread, reuse existing context instead. rowLockContext = existingContext; break; } else { if (!waitForLock) { return null; } try { // Row is already locked by some other thread, give up or wait for it if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out waiting for lock for row: " + rowKey); } } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); throw iie; } } } // allocate new lock for this thread return rowLockContext.newLock(); }
首先看RowLock类,该类主要逻辑是release方法,是用来释放行锁的。同时有一个布尔类型参数release,默认为false,表明该行锁是否被释放掉了。app
public static class RowLock { @VisibleForTesting final RowLockContext context; private boolean released = false; @VisibleForTesting RowLock(RowLockContext context) { this.context = context; } /** * Release the given lock. If there are no remaining locks held by the current thread * then unlock the row and allow other threads to acquire the lock. * @throws IllegalArgumentException if called by a different thread than the lock owning thread */ public void release() { if (!released) { context.releaseLock(); released = true; } } }
可是在RowLock中,并无看到实际涉及到锁的信息,这是咋回事呢,别急,细细看下release方法,里面有一个context,是RowLockContext类型。同时其构造方法中也传了一个context对象,所以怀疑是在RowLockContext中new出了一个rowlock,进RowLockContext中看下:less
@VisibleForTesting class RowLockContext { private final HashedBytes row;
//经过计数以及CountDownLatch实现对行锁的condition。这里之因此将countdownlatch设置为一,是由于hbase本身也不知道到底有多少condition来竞争锁,因此加一个计数lockCount,
//当lockCount为零时,再把latch.coutDown。不然会在getRowLockInternal中await。
private final CountDownLatch latch = new CountDownLatch(1); private final Thread thread; private int lockCount = 0; RowLockContext(HashedBytes row) { this.row = row; this.thread = Thread.currentThread(); } boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } RowLock newLock() { lockCount++; return new RowLock(this); } void releaseLock() { if (!ownedByCurrentThread()) { throw new IllegalArgumentException("Lock held by thread: " + thread + " cannot be released by different thread: " + Thread.currentThread()); } lockCount--; if (lockCount == 0) { // no remaining locks by the thread, unlock and allow other threads to access RowLockContext existingContext = lockedRows.remove(row); if (existingContext != this) { throw new RuntimeException( "Internal row lock state inconsistent, should not happen, row: " + row); } latch.countDown(); } } }
经过计数以及CountDownLatch实现对行锁的condition。这里之因此将countdownlatch设置为一,是由于hbase本身也不知道到底有多少condition来竞争锁,因此加一个计数lockCount,
当lockCount为零时,再把latch.coutDown。不然会在getRowLockInternal中await。
在HRegion中还有一个关键的成员变量: lockedrows,用来存储当前已经获取了行锁的全部行信息,key为rowkey,value为RowLockContext。
// map from a locked row to the context for that lock including: // - CountDownLatch for threads waiting on that row // - the thread that owns the lock (allow reentrancy) // - reference count of (reentrant) locks held by the thread // - the row itself private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows = new ConcurrentHashMap<HashedBytes, RowLockContext>();
好啦,行锁涉及到的内容,咱们都大致浏览了,再从getRowLockInternal中开始通一遍逻辑:
上面是获取行锁的流程,释放行锁呢,是经过HRegion的releaseRowLocks方式实现,咱们看下代码:oop
/** * If the given list of row locks is not null, releases all locks. */ public void releaseRowLocks(List<RowLock> rowLocks) { if (rowLocks != null) { for (RowLock rowLock : rowLocks) { rowLock.release(); } rowLocks.clear(); } }
可见是调用RowLock.release实现,该方法代码在上面有,具体的逻辑以下:ui
在lockedrows中将该行锁删除。this
判断release是否为false,若是为false,则调用context.releaseLock,context.releaseLock逻辑以下spa
首先判断释放该行锁的线程是不是该行锁的持有者,若不是则抛出异常线程
将count--;对象
若是count==0了,则直接调用latch.countDown,这个方法会触发其余线程去获取行锁。当count==0了也就是说该线程已经不须要改行锁,已经释放blog
将release设置为true。
注意:
这里在getRowLockInternal中,只要lockedRows.putIfAbsent(rowKey, rowLockContext)成功,其余线程将不会获取成功,由concurrentMap保证。