[TOC]html
在前一篇博客多线程学习笔记三之ReentrantLock与AQS实现分析分析了基于同步器AQS实现的独占锁ReentrantLock,AQS同步器做为JUC组件实现锁的框架,基于AQS除了能够实现独占锁,还能够实现共享锁。 ReentrantReadWriteLock是基于AQS实现的读写锁,内部维护了一个读锁(共享锁)和写锁(独占锁)。若是咱们要在程序中提供共享的缓存数据结构,缓存确定是读操做(数据查询)多而写操做(数据更新)少,只要保证写操做对后续的读操做是可见的就好了,这种状况下使用独占锁就不如读写锁的吞吐量大,读写锁中的读锁容许多个线程得到读锁对资源进行读操做,写锁是传统的独占锁,只容许单个线程得到写锁对资源进行更新。如下是JDK提供基于ReentrantReadWriteLock简单实现缓存结构的Demo:java
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必须先释放读锁再获取写锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次检查cacheValid防止其余线程得到写锁改变cacheValid值 if (!cacheValid) { data = ... cacheValid = true; } // 写锁降级为读锁 rwl.readLock().lock(); } finally { //释放写锁 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
ReentranReadWriteLock的关系图: node
ReentrantReadWriteLock没有实现Lock接口,实现了ReadWriteLock接口。内部类ReadLock和WriteLock实现Lock接口,ReadLock和WriteLock包含了继承了AQS的Sync对象,从而提供了共享锁和独占锁特性的实现。读写锁ReentrantReadWriteLock具备如下特性:缓存
在实现ReentrantLock时,当一个线程去尝试获取锁时,线程会去检查同步器AQS中维护的int型变量state是否为0,同步状态加一表示当前线程成功获取锁。而读写锁ReentrantReadWriteLock维护了读锁和写锁,那么一个线程得到了锁,怎么经过state代表究竟是读锁仍是写锁呢?答案是把int型变量切位两部分,高16位表示读状态,低16位表示写状态。ReentrantReadWriteLock在内部类Sync定义了如下常量用以区分读写状态:数据结构
//偏移量 static final int SHARED_SHIFT = 16; //线程得到读锁,state加SHARED_UNIT,state高16位SHARED_UNIT个数表明了有多少个共享锁 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读写锁重入最多不超过65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
经过把32位int型变量state按位切割成两部分维护读写两种状态,具体划分如图: 多线程
从图中能够看到,当前线程获取了写锁,重进入了3次,连续得到了两次读锁,每次得到写锁,就把state加1,而低16位总共最大是65535,就是MAX_COUNT的值。每得到一次读锁,就把state加SHARED_COUNT。那么如何获取读写状态呢?只要经过位运算取出高16位或低16位就好了,对于读状态,state>>>SHARED_SHIFT(无符号补0右移16位)就能够获得加了多少次SHARED_UNIT从而得到读状态;对于写状态,state & EXCLUSIVE_MASK(0X0000FFFF,高16位都变为0,低16位不变)就能够得到写状态。并发
因为ReentrantReadWriteLock支持读写锁的重入,而写锁是独占锁,只要取出同步状态state低16位对应的数值就是得到写锁的重入次数;而读锁是共享锁,每一个线程得到读锁就会把state加上SHARED_UNIT(包括读锁重入),取出state高16位的对应的数值表示是全部线程得到读锁的次数,可是如何得到单个线程得到共享锁的次数呢?内部类Sync为同步器维护了一个读锁计数器,专门统计每一个线程得到读锁的次数。Sync内部有两个内部类分别为HoldCounter和ThreadLocalHoldCounter:框架
abstract static class Sync extends AbstractQueuedSynchronizer { static final class HoldCounter { //计数器,用于统计线程重入读锁次数 int count = 0; // Use id, not reference, to avoid garbage retention //线程TID,区分线程,能够惟一标识一个线程 final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //重写初始化方法,在没有进行set的状况下,获取的都是该HoldCounter值 public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { //本地线程读锁计数器 readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } }
获取读锁,由内部类ReadLock提供lock方法,调用了Sync父类AQS的方法:oop
//获取读锁 public void lock() { sync.acquireShared(1); } //获取共享锁 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
尝试获取共享锁:学习
protected final int tryAcquireShared(int unused) { //当前线程 Thread current = Thread.currentThread(); //同步状态state int c = getState(); //检查独占锁是否被占据,若是被占据,是不是当前线程获取了独占锁 //若是是当前线程获取了写锁,能够继续获取读锁,若是都不是返回-1表示获取失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁数量 int r = sharedCount(c); //!readerShouldBlock() 根据公平与否策略和队列是否含有等待节点决定当前线程是否继续获取锁 //不能大于65535且CAS修改为功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //若是没有线程获取过读锁 if (r == 0) { //将当前线程设置为第一个读锁线程 firstReader = current; // 计数器为一 firstReaderHoldCount = 1; //读锁重入 } else if (firstReader == current) { //计数器加一 firstReaderHoldCount++; } else { // 若是不是第一个线程,获取锁成功 // cachedHoldCounter 表明的是最后一个获取读锁的线程的计数器 HoldCounter rh = cachedHoldCounter; // 若是计数器是 null 或者不指向当前线程,那么就新建一个 HoldCounter 对象 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //计数器为0,保存到readHolds中 else if (rh.count == 0) readHolds.set(rh); //计数器加一 rh.count++; } return 1; } return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //死循环 for (;;) { //同步状态 int c = getState(); //检查写锁获取状况 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //进入到这里,说明没有其余线程获取写锁 //公平锁策略检查 } else if (readerShouldBlock()) { //readerShouldBlock()返回true,应该堵塞,检查是否获取过读锁 // 第一个获取读锁线程是当前线程,重入 if (firstReader == current) { } else { //循环中,若计数器为null if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } //须要阻塞且是非重入(还未获取读锁的),获取失败。 if (rh.count == 0) return -1; } } //检查读锁总数量是否超过最大值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS设置同步状态state if (compareAndSetState(c, c + SHARED_UNIT)) { //当前线程得到第一个读锁 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; //读锁重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { //从缓存读入计数器,提升效率 if (rh == null) rh = cachedHoldCounter; //计数器为空或不是指向当前线程 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
当tryAcquireShared尝试获取共享锁失败,返回-1,进入AQS同步队列等待获取共享锁
private void doAcquireShared(int arg) { //将当前节点以共享型类型加入同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //前驱节点获取到锁,可能占据锁,也可能已经释放锁,调用tryAcquireShared尝试获取锁 if (p == head) { int r = tryAcquireShared(arg); //获取成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //与独占锁ReentrantLock堵塞逻辑一致 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //因中断/超时,取消获取锁 if (failed) cancelAcquire(node); } }
释放读锁,由内部类ReadLock提供unlock方法,调用了Sync父类AQS的方法:
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared返回true,即同步状态为0,不存在线程占据读锁或写锁。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //当前线程是第一个得到读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; //不是firstReader,更新计数器 } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; //彻底释放锁 if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } //重入锁退出 --rh.count; } //CAS更新同步状态, for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
tryReleaseShared方法成功释放锁,调用doReleaseShared唤醒后继节点。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //若是节点状态为 Node.SIGNAL,将状态设置为0,设置成功,唤醒线程。 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //若是自己头结点的waitStatus是出于重置状态(waitStatus==0)的, //将其设置为“传播”状态。意味着须要将状态向后一个节点传播。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
获取写锁,由内部类WriteLock提供lock方法,调用了Sync父类AQS的方法,重点解析一下tryAcquire实现:
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
内部类Sync重写的tryAcquire方法:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //同步状态不为0 if (c != 0) { //其余线程得到写锁,获取失败;w为0而同步状态不为0,没有线程占据写锁,有线程占据读锁 //注意:不存在读锁与写锁同时被多个线程获取的状况。 if (w == 0 || current != getExclusiveOwnerThread()) return false; //当前线程已经得到写锁,重入次数超过MAX_COUNT,失败 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 锁重入 setState(c + acquires); return true; } //公平策略检查 //CAS设置同步状态成功则得到写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
当同步状态state为0时,tryRelease方法返回true。
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //独占锁,只有当前线程释放同步状态,不须要考虑并发 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
读写锁ReentrantReadWriteLock支持写锁降级,从下面能够看到线程得到写锁后,在没有释放写锁的状况下得到了读锁(锁降级),而后在手动释放写锁。这更像是一种特殊的锁重入,因为得到写锁有继续得到读锁的须要,相对于释放写锁再获取读锁,直接去获取读锁没有其余线程竞争,免去了因为其余线程得到写锁进入等待状态的可能,效率更高。注意:锁降级后须要手动释放写锁,不然线程会一直持有独占锁 读写锁ReentrantReadWriteLock是不支持锁升级的,若是一个得到了读锁的线程在持有读锁的状况下尝试获取写锁,是不可能成功得到读锁的,由于得到写锁会判断当前有没有线程持有读锁,而尝试锁升级的线程自己读锁没有释放,因此会进入同步队列等待同步状态为0获取写锁,因为读锁一直不释放会致使其余线程没法获取写锁(获取写锁条件不能有其余线程占据读锁或写锁),只能获取共享锁读锁。所以ReentrantReadWriteLock是不支持读写锁的。
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必须先释放读锁再获取写锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } // 写锁未释放得到读锁 rwl.readLock().lock(); } finally { //释放写锁,降级为读锁 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } }
读写锁内部维护了共享锁读锁和独占锁写锁,读锁和写锁都支持重进入,当读锁已经被获取(state高16位不为0)或写锁已被其余线程获取,获取写锁的线程进入等待状态;当写锁已经被其余线程获取,获取读锁的线程进入等待状态。读写锁支持由独占锁(写锁)降级到(读锁),但不支持读锁升级到写锁,在使用时要考虑手动释放好读锁与写锁的释放,不然程序可能会出现意想不到的问题。