ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。读锁能够在没有写锁的时候被多个线程同时持有,写锁是独占的。相对于互斥锁而言,ReadWriteLoc容许更高的并发量。java
全部读写锁的实现必须确保写操做对读操做的内存影响。换句话说,一个得到了读锁的线程必须能看到前一个释放的写锁所更新的内容。缓存
public interface ReadWriteLock { Lock readLock();//获取读锁 Lock writeLock();//获取写锁 }
ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。并发
class ReadWriteLockTest { String data;//缓存中的对象 volatile boolean cacheValid;//缓存是否还有效 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() throws InterruptedException { rwl.readLock().lock(); if (!cacheValid) {//缓存失效须要再次读取缓存 //在读取缓存前,必须释放读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次检查是否须要再次读取缓存,若是须要则 if (!cacheValid) { data = Thread.currentThread().getName()+":缓存数据测试,实际开发多是一个对象!"; cacheValid = true; } //在释放以前,经过获取读锁降级写锁 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); //释放写锁,持有读锁 } } try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"【"+data+"】"); } finally { rwl.readLock().unlock(); } } public static void main(String[] args) throws InterruptedException { ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest(); for(int i=0;i<10;i++){ Thread thread = new Thread(()->{ try { readWriteLockTest.processCachedData(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } } }
public ReentrantReadWriteLock() { this(false);//默认为非公平模式 } public ReentrantReadWriteLock(boolean fair) { //决定了Sync是FairSync仍是NonfairSync。Sync继承了AbstractQueuedSynchronizer,而Sync是一个抽象类,NonfairSync和FairSync继承了Sync,并重写了其中的抽象方法。 sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
writerShouldBlock和readerShouldBlock方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。app
对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。工具
static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // 写线程无需阻塞 } final boolean readerShouldBlock() { //apparentlyFirstQueuedIsExclusive在当前线程是写锁占用的线程时,返回true;不然返回false。也就说明,若是当前有一个写线程正在写,那么该读线程应该阻塞。 return apparentlyFirstQueuedIsExclusive(); } }
继承AQS的类都须要使用state变量表明某种资源,ReentrantReadWriteLock中的state表明了读锁的数量和写锁的持有与否,整个结构以下: 能够看到state的高16位表明读锁的个数;低16位表明写锁的状态。源码分析
public void lock() { sync.acquireShared(1); }
读锁使用的是AQS的共享模式,AQS的acquireShared方法以下:测试
if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
当tryAcquireShared()方法小于0时,那么会执行doAcquireShared方法将该线程加入到等待队列中。ui
Sync实现了tryAcquireShared方法,以下:this
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //若是当前有写线程而且本线程不是写线程,不符合重入,失败. //在获取读锁时,若是有写线程,则获取失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获得读锁的个数 int r = sharedCount(c); //若是读不该该阻塞而且读锁的个数小于最大值65535,而且能够成功更新状态值,成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) {//若是当前读锁为0 firstReader = current;//第一个读线程就是当前线程 firstReaderHoldCount = 1;//第一个线程持有读锁的个数 } //若是当前线程重入了,记录firstReaderHoldCount else if (firstReader == current) { firstReaderHoldCount++; } //当前读线程和第一个读线程不一样,记录每个线程读的次数 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //不然,循环尝试 return fullTryAcquireShared(current); }
从上面的代码以及注释能够看到,分为三步:线程
fullTryAcquiredShared方法
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //一旦有别的线程得到了写锁,而且得到写锁的线程不是本线程,返回-1,失败 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } //若是读线程须要阻塞 else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } //说明有别的读线程占有了锁 else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //若是读锁达到了最大值,抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //若是成功更改状态,成功返回 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
public void lock() { sync.acquire(1); }
AQS的acquire方法以下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
从上面能够看到,写锁使用的是AQS的独占模式。首先尝试获取锁,若是获取失败,那么将会把该线程加入到等待队列中。
Sync实现了tryAcquire方法用于尝试获取一把锁,以下:
protected final boolean tryAcquire(int acquires) { //获得调用lock方法的当前线程 Thread current = Thread.currentThread(); int c = getState(); //获得写锁的个数 int w = exclusiveCount(c); //若是当前有写锁或者读锁.(对于读锁而言,若是当前写线程能够进行写操做,那么读线程读到的数据可能有误) if (c != 0) { // 若是写锁为0或者当前线程不是独占线程(不符合重入),返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //若是写锁的个数超过了最大值(65535),抛出异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 写锁重入,返回true setState(c + acquires); return true; } //若是当前没有写锁或者读锁,若是写线程应该阻塞或者CAS失败,返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //不然将当前线程置为得到写锁的线程,返回true setExclusiveOwnerThread(current); return true; }
ReadLock的unlock方法以下:
public void unlock() { sync.releaseShared(1); }
调用了Sync的releaseShared方法,该方法在AQS中提供,以下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
调用tryReleaseShared方法尝试释放锁,若是释放成功,调用doReleaseShared尝试唤醒下一个节点。
AQS的子类须要实现tryReleaseShared方法,Sync中的实现以下:
protected final boolean tryReleaseShared(int unused) { //获得调用unlock的线程 Thread current = Thread.currentThread(); //若是是第一个得到读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } //不然,是HoldCounter中计数-1 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //死循环 for (;;) { int c = getState(); //释放一把读锁 int nextc = c - SHARED_UNIT; //若是CAS更新状态成功,返回读锁是否等于0;失败的话,则重试 if (compareAndSetState(c, nextc)) //释放读锁对读线程没有影响,可是可能会使等待的写线程解除挂起开始运行。因此,一旦没有锁了,就返回true,不然false;返回true后,那么则须要释放等待队列中的线程,这时读线程和写线程都有可能再得到锁。 return nextc == 0; } }
WriteLock的unlock方法以下:
public void unlock() { sync.release(1); }
Sync的release方法使用的AQS中的,以下:
public final boolean release(int arg) { if (tryRelease(arg)) {//尝试释放锁 Node h = head; if (h != null && h.waitStatus != 0)//若是等待队列中有线程再等待 unparkSuccessor(h);//将下一个线程解除挂起。 return true; } return false; }
Sync须要实现tryRelease方法,以下:
protected final boolean tryRelease(int releases) { //若是没有线程持有写锁,可是仍要释放,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //若是没有写锁了,那么将AQS的线程置为null if (free) setExclusiveOwnerThread(null); //更新状态 setState(nextc); return free;//此处返回当且仅当free为0时返回,若是当前是写锁被占有了,只有当写锁的数据降为0时才认为释放成功;不然失败。由于只要有写锁,那么除了占有写锁的那个线程,其余线程即不能够得到读锁,也不能得到写锁 }
getOwner方法用于返回当前得到写锁的线程,若是没有线程占有写锁,那么返回null。实现以下:
protected Thread getOwner() { return sync.getOwner(); }
能够看到直接调用了Sync的getOwner方法,下面是Sync的getOwner方法:
final Thread getOwner() { // Must read state before owner to ensure memory consistency //若是独占锁的个数为0,说明没有线程占有写锁,那么返回null;不然返回占有写锁的线程。 return ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread()); }
getReadLockCount()方法用于返回读锁的个数,实现以下:
public int getReadLockCount() { return sync.getReadLockCount(); }
Sync的实现以下:
final int getReadLockCount() { return sharedCount(getState()); } //要想获得读锁的个数,就是看AQS的state的高16位。这和前面讲过的同样,高16位表示读锁的个数,低16位表示写锁的个数。 static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
getReadHoldCount()方法用于返回当前线程所持有的读锁的个数,若是当前线程没有持有读锁,则返回0。直接看Sync的实现便可:
final int getReadHoldCount() { //若是没有读锁,天然每一个线程都是返回0 if (getReadLockCount() == 0) return 0; //获得当前线程 Thread current = Thread.currentThread(); //若是当前线程是第一个读线程,返回firstReaderHoldCount参数 if (firstReader == current) return firstReaderHoldCount; //若是当前线程不是第一个读线程,获得HoldCounter,返回其中的count HoldCounter rh = cachedHoldCounter; //若是缓存的HoldCounter不为null而且是当前线程的HoldCounter,直接返回count if (rh != null && rh.tid == getThreadId(current)) return rh.count; //若是缓存的HoldCounter不是当前线程的HoldCounter,那么从ThreadLocal中获得本线程的HoldCounter,返回计数 int count = readHolds.get().count; //若是本线程持有的读锁为0,从ThreadLocal中移除 if (count == 0) readHolds.remove(); return count; }
从上面的代码中,能够看到两个熟悉的变量,firstReader和HoldCounter类型。这两个变量在读锁的获取中接触过,前面没有细说,这里细说一下。HoldCounter类的实现以下:
static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); }
readHolds是ThreadLocalHoldCounter类,定义以下:
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
能够看到,readHolds存储了每个线程的HoldCounter,而HoldCounter中的count变量就是用来记录线程得到的写锁的个数。因此能够得出结论:Sync维持总的读锁的个数,在state的高16位;因为读线程能够同时存在,因此每一个线程还保存了得到的读锁的个数,这个是经过HoldCounter来保存的。 除此以外,对于第一个读线程有特殊的处理,Sync中有以下两个变量:
private transient Thread firstReader = null;//第一个获得读锁的线程 private transient int firstReaderHoldCount;//第一个线程得到的写锁
其他获取到读锁的线程的信息保存在HoldCounter中。
看完了HoldCounter和firstReader,再来看一下getReadLockCount的实现,主要有三步:
getWriteLockCount()方法返回写锁的个数,Sync的实现以下:
final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; }
能够看到若是没有线程持有写锁,那么返回0;不然返回AQS的state的低16位。
当分析ReentranctReadWriteLock时,或者说分析内部使用AQS实现的工具类时,须要明白的就是AQS的state表明的是什么。ReentrantLockReadWriteLock中的state同时表示写锁和读锁的个数。为了实现这种功能,state的高16位表示读锁的个数,低16位表示写锁的个数。AQS有两种模式:共享模式和独占模式,读写锁的实现中,读锁使用共享模式;写锁使用独占模式;另一点须要记住的即便,当有读锁时,写锁就不能得到;而当有写锁时,除了得到写锁的这个线程能够得到读锁外,其余线程不能得到读锁。