ReentantReadWriteLock读写锁,在读线程多余写线程的并发环境中能体现出优异的性能,相比于synchronized与ReentrantLock这种独占式锁的模型,ReentantReadWriteLock采用独占式写锁与共享式读锁的方式,大大提升了针对于读多写少的多线程环境的系统性能。html
在分析ReentantReadWriteLock源码前,咱们须要先熟悉下独占锁与共享锁的基本模型。java
共享式获取与独占式获取最主要的区别在于同一时刻可否有多个线程同时获取到同步状态。以文件的读写为例,若是一个程序在对文件进行读操做,那么这一时刻对于该文件的写操做均被阻塞,而读操做能够同时进行。写操做要求对资源的独占式访问,而读操做能够是共享式访问,两种不一样的访问模式在同一时刻对文件或资源的访问状况。node
在图中,左半部分,共享式访问资源时,其余共享式的访问均被允许,而独占式访问被阻塞,右半部分是独占式访问资源时,同一时刻其余访问均被阻塞。缓存
下面咱们来消息分析下ReentantReadWriteLock的实现过程:多线程
ReentrantReadWriteLock并无继承ReentrantLock,也并无实现Lock接口,而是实现了ReadWriteLock接口,该接口提供readLock()方法获取读锁,writeLock()获取写锁。并发
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } } public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
默认构造方法为非公平模式 ,开发者也能够经过指定fair为true设置为 公平模式。app
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public static class ReadLock implements Lock, java.io.Serializable {} public static class WriteLock implements Lock, java.io.Serializable {}
而公平模式和非公平模式分别由内部类FairSync和NonfairSync实现,这两个类继承自另外一个内部类Sync,该Sync继承自AbstractQueuedSynchronizer(之后简称 AQS ),这里基本同ReentrantLock的内部实现一致。性能
abstract static class Sync extends AbstractQueuedSynchronizer { } static final class FairSync extends Sync { } static final class NonfairSync extends Sync { }
而ReentrantReadWriteLock针对FairSync于NonfairSync也有着本身的内部类实现,与ReentrantLock同样。优化
在ReentrantLock的分析中得知,其独占性和重入性都是经过CAS操做维护AQS内部的state变量实现的。而对于ReentantReadWriteLock由于要维护两个锁(读/写),可是同步状态state只有一个,因此ReentantReadWriteLock采用“按位切割”的方式,所谓“按位切割”就是将这个32位的int型state变量分为高16位和低16位来使用,高16位表明读状态,低16位表明写状态。ui
高16位 | 低16位 |
读状态 | 写状态 |
0000 0000 0000 0011 | 0000 0000 0000 0000 |
上面的一个32位的int表示有3个线程获取了读锁,0个线程获取了写锁
读/写锁如何肯定和改变状态的呢?答案是位运算 !
假设当前同步状态为state
//读状态:无符号右移16位 state >>> 16 //写状态:高16位都和0按位与运算,抹去高16位 state & Ox0000FFFF //读状态加1 state + (1 << 16) //写状态加1 state + 1 //判断写状态大于0,也就是写锁是已经获取 state & Ox0000FFFF > 0 //判断读状态大于0,也就是读锁是已经获取 state != 0 && (state & Ox0000FFFF == 0)
在ReentrantReadWriteLock的源码中咱们能看到关于锁标记位state的变量
//定义一个偏移量 static final int SHARED_SHIFT = 16; //1个读锁读锁单位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //Ox00010000 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //Ox0000FFFF读锁上限 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //Ox0000FFFF用于抹去高16位 //返回读状态 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //返回写状态 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
首先,咱们先来分析写锁的获取与释放
首先,写锁WriteLock的lock方法以下
public void lock() { sync.acquire(1); }
acquire方法实现以下,经过远吗咱们能够看出,acquire方法首先会调用tryAcquire方法尝试获取写锁,若是获取成功则返回,失败则会调用addWaiter方法将线程添加到CLH队列末尾,并调用acquireQueued方法阻塞当前线程,咱们能够看到addWaiter依旧是一独占的模式添加的节点,此处与ReentrantLock实现同样具体addWaiter方法与acquireQueued方法独占式的实现可详见个人一篇博客Synchronized与Lock的底层实现解析
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这里主要讲解tryAcquire方法,代码以下:
protected final boolean tryAcquire(int acquires) { //获取当前线程 Thread current = Thread.currentThread(); //获取当前同步状态 int c = getState(); //获取写锁状态(w>0表示已经有线程获取写锁) int w = exclusiveCount(c); //若是同步状态不为0,说明有线程已经获取到了同步状态,多是读锁,多是写锁 if (c != 0) { //写锁状态0(表示有线程已经获取读锁(共享锁获取时阻塞独占锁))或者当前线程不是已经获取写锁的线程(独占锁只容许本身持有锁) //返回false //此处的处理逻辑也间接验证了获取了读锁的线程不能同时获取写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; //大于最大线程数则抛出错误 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //若是写锁状态>0而且当前线程为尺有所线程,则表示写锁重入,以重入锁的方式设置同步状态(写状态直接加) //返回true setState(c + acquires); return true; } //若是同步状态等于0 //在尝试获取同步状态以前先调用writerShouldBlock()写等待策略 //ReentrantReadWriteLock中经过FairSync(公平锁)和NonfairSync(非公品锁)重写writerShouldBlock()方法来达到公平与非公平的实现 //NonfairSync(非公品锁)中直接返回false表示不进行阻塞直接获取 //FairSync(公平锁)中需调用hasQueuedPredecessors()方法判断当前线程节点是否为等待队列的head结点的后置节点,是才能够获取锁 //获取成功则将当前线程设置为持有锁线程,并返回true if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
其实写锁的实现与ReentrantLock差异不大,主要区别在于state的判断,在获取写锁的时候须要判断读锁的状态,而且
if (w == 0 || current != getExclusiveOwnerThread()) return false;
这段代码的逻辑也验证了读锁不能升级写锁的缘由,当写锁个数不为0,而且持有写锁的线程不是当前线程,直接返回false,阻塞了其余线程抢占写锁(固然包括持有读锁的线程)
写锁的释放
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
这里写锁的释放逻辑与ReentrantLock同样,先tryRelease释放锁,直到写锁state等于0(全部重入锁都释放),唤醒后续节点线程
protected final boolean tryRelease(int releases) { //当前线程不是获取了同步状态的线程则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //释放一次锁,则将状态-1 int nextc = getState() - releases; //判断写状态是否为0,当写锁的全部重入都释放完锁后,状态归为0 boolean free = exclusiveCount(nextc) == 0; //若是释放完成,将持有锁的线程设置为null if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
写锁的读取和释放仍是很好理解的,尤为针对比较熟悉ReentrantLock逻辑的人更好理解,可是读锁的实现相对来讲就比较复杂了
读锁ReadLock的lock方法
public void lock() { sync.acquireShared(1); }
acquireShared方法实现以下,能够看到读锁的lock方法是基于AQS共享锁实现的
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } }
acquireShared方法的逻辑为首先调用tryAcquireShared尝试获取写锁,若是获取失败(返回结果<0)则调用doAcquireShared阻塞该线程,tryAcquireShared方法与doAcquireShared方法咱们逐个分析
首先咱们须要先了解ReentrantReadWriteLock中的一些变量,方便咱们后续理解代码
读锁的获取相对比较复杂,由于读锁还有相似于获取当前线程得到的锁的个数等方法。
//用于记录每一个线程获取到的锁的数量 //使用id和count记录 static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); } //这里使用了ThreadLocal为每一个线程都单独维护了一个HoldCounter来记录获取的锁的数量 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } //获取到的读锁的数量 private transient ThreadLocalHoldCounter readHolds; //最后一次成功获取到读锁的线程的HoldCounter对象 private transient HoldCounter cachedHoldCounter; //第一个获取到读锁的线程 private transient Thread firstReader = null; //第一个获取到读锁的线程拥有的读锁数量 private transient int firstReaderHoldCount;
上面的4个变量,其实就是完成一件事情,将获取读锁的线程放入线程本地变量(ThreadLocal),方便从整个上 下文,根据当前线程获取持有锁的次数信息。其实 firstReader,firstReaderHoldCount ,cachedHoldCounter 这三个变量就是为readHolds变量服务的,是一个优化手段,尽可能减小直接使用readHolds.get方法的次数,firstReader与firstReadHoldCount保存第一个获取读锁的线程,也就是readHolds中并不会保存第一个获取读锁的线程;cachedHoldCounter 缓存的是最后一个获取线程的HolderCount信息,该变量主要是在若是当前线程屡次获取读锁时,减小从readHolds中获取HoldCounter的次数。
tryAcquireShared方法
protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //获取同步状态 int c = getState(); //若是已经有写锁被获取而且获取写锁的线程不是当前线程则获取失败 //此处判断逻辑隐含了一个条件,就是当有写锁获取而且是获取写锁的是当前线程,那么不返回-1,容许此写锁获取读锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取读状态(读锁被获取的数量) int r = sharedCount(c); //根据是不是公平锁来判断是否须要进入阻塞,同时判断当前读锁数量是否小于读锁容许最大数量(0xFFFF个),并进行一次CAS获取 //一个线程获取也好,多个线程争抢也好,if中的一次CAS获取可以保证只有一个线程获取到读锁 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //若是是第一个获取读状态的线程 if (r == 0) { //设置firstReader和firstReaderHoldCount firstReader = current; firstReaderHoldCount = 1; //若是当前线程和第一个获取读锁的线程是同一个线程那么它的获取的读锁数量加1(可见读锁也是一个重入锁) } else if (firstReader == current) { firstReaderHoldCount++; //是别的线程 } else { //获取最后一次获取到读状态的线程 HoldCounter rh = cachedHoldCounter; //rh == null(当前线程是第二个获取的),或者当前线程和rh不是同一个 //那么获取到当前线程的HoldCounter并设置到cachedHoldCounter if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //若是rh就是当前线程的HoldCounter而且当前线程获取到的读状态位0那么给当前线程的HoldCounter设置为rh else if (rh.count == 0) readHolds.set(rh); //获取到的读锁数加1 rh.count++; } return 1; } //获取读锁失败调用该方法进行CAS循环获取 return fullTryAcquireShared(current); }
fullTryAcquireShared方法的解读
//第一次获取读锁失败,有两种状况: //1)没有写锁被占用时,尝试经过一次CAS去获取锁时,更新失败(说明有其余读锁在申请) //2)当前线程占有读锁,而且有其余写锁在当前线程的下一个节点等待获取写锁,最后在fullTryAcquireShared中获取到读锁 //3)当前线程占有写锁,而且有其余写锁在当前线程的下一个节点等待获取写锁,除非当前线程的下一个节点被取消,不然fullTryAcquireShared也获取不到读锁fullTryAcquireShared也获取不到读锁 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //自旋 for (;;) { int c = getState(); //若是已经有写锁被获取 if (exclusiveCount(c) != 0) { //若是获取写锁的线程不是当前线程则获取失败 if (getExclusiveOwnerThread() != current) return -1; //若是获取写锁的线程是当前线程则继续保持这个写锁,而且这个写锁能够在获取读锁 //若是此时应该进入阻塞 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { //第一次循环 if (rh == null) { //获取最后一次获取到读状态的线程 rh = cachedHoldCounter; //rh == null(当前线程是第二个获取的),或者当前线程和rh不是同一个 //那么获取到当前线程的HoldCounter并设置到cachedHoldCounter if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); //若是当前线程的读锁为0就remove,由于后面会set if (rh.count == 0) readHolds.remove(); } } //不是第一次循环 if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //尝试CAS设置同步状态 //后续操做和tryAquireShared基本一致 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
readerShouldBlock阻塞判断方法的解读
/** * 非公平锁的读锁获取策略 */ final boolean readerShouldBlock() { //若是当前线程的后续节点为独占式写线程,则返回true(表示当前线程在tryAcquireShared方法中不能马上获取读锁,须要后续经过fullTryAcquireShared方法取判断是否须要阻塞线程) //在fullTryAcquireShared方法中会经过判断当前获取读锁线程的读锁数量来判断当前尝试获取读锁的线程是否持有写锁,若是持有写锁辨明所降级,须要将当前锁降级的线程添加到阻塞队列中从新获取读锁 //这么作是为了让后续的写线程有抢占写锁的机会,不会由于一直有读线程或者锁降级状况的存在而形成后续写线程的饥饿等待 return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } /** * 公平锁的读锁获取策略 */ final boolean readerShouldBlock() { //若是当前线程不是同步队列头结点的next节点(head.next) (判断是否有前驱节点,若是有则返回false,不然返回true。遵循FIFO) //则阻塞当前线程 return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
doAcquireShared方法
/** * 跟独占锁很像,只不过共享锁初始化时有传入一个count,count为1 */ private void doAcquireShared(int arg) { //把当前线程封装到一个SHARE类型Node中,添加到SyncQueue尾巴上 final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) {//前继节点是head节点,下一个就到本身了 int r = tryAcquireShared(arg);//非公平锁实现,再尝试获取锁 //state==0时tryAcquireShared会返回>=0(CountDownLatch中返回的是1)。state为0说明共享次数已经到了,能够获取锁了 //注意上面说的, 等于0表示不用唤醒后继节点,大于0须要 if (r >= 0) {//r>0表示state==0,前继节点已经释放锁,锁的状态为可被获取 setHeadAndPropagate(node, r);//这一步设置node为head节点设置node.waitStatus->Node.PROPAGATE,而后唤醒node.thread //唤醒head节点线程后,从这里开始继续往下走 p.next = null; //head已经指向node节点,oldHead.next索引置空,方便p节点对象回收 if (interrupted) selfInterrupt(); return; } } //前继节点非head节点,将前继节点状态设置为SIGNAL,经过park挂起node节点的线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (Throwable t) { cancelAcquire(node); throw t; } }
setHeadAndPropagate方法
/** * 把node节点设置成head节点,且node.waitStatus->Node.PROPAGATE */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head;//h用来保存旧的head节点 setHead(node);//head引用指向node节点 /* 这里意思有两种状况是须要执行唤醒操做 * 1.propagate > 0 表示调用方指明了后继节点须要被唤醒 * 2.头节点后面的节点须要被唤醒(waitStatus<0),不管是老的头结点仍是新的头结点*/ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())//node是最后一个节点或者 node的后继节点是共享节点 /* 若是head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0 * head节点状态为0(第一次添加时是0),设置head.waitStatus->Node.PROPAGATE表示状态须要向后继节点传播 */ doReleaseShared();//对于这个方法,其实就是把node节点设置成Node.PROPAGATE状态 } }
doReleaseShared方法
/** * 把当前结点设置为SIGNAL或者PROPAGATE * 唤醒head.next(B节点),B节点唤醒后能够竞争锁,成功后head->B,而后又会唤醒B.next,一直重复直到共享节点都唤醒 * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁 * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示须要将状态向后继节点传播 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//head是SIGNAL状态 /* head状态是SIGNAL,重置head节点waitStatus为0,这里不直接设为Node.PROPAGATE, * 是由于unparkSuccessor(h)中,若是ws < 0会设置为0,因此ws先设置为0,再设置为PROPAGATE * 这里须要控制并发,由于入口有setHeadAndPropagate跟release两个,避免两次unpark */ if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue;//设置失败,从新循环 /* head状态为SIGNAL,且成功设置为0以后,唤醒head.next节点线程 * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点, * 也就是head发生了变化。看最底下一行代码可知,head发生变化后会从新循环,继续唤醒head的下一个节点 */ unparkSuccessor(h); /* * 若是自己头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。 * 意味着须要将状态向后一个节点传播 */ } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; } if (h == head)//若是head变了,从新循环 break; } }
读锁的释放
releaseShared方法
public final boolean releaseShared(int arg) { //只有共享锁彻底释放,才能调用下面的doReleaseShared方法唤醒head节点的后继节点 //加入多个读锁同时在获取到了读锁,即便不按顺序释放锁,也不会影响head后继节点的唤醒 //由于共享锁能够有多个线程组成,可是释放锁的条件只有一个,就是读锁标记为0, //即便最后释放锁的节点不是head,可是也能保证head后继节点正常被唤醒 if (tryReleaseShared(arg)) { //此处的doReleaseShared方法与setHeadAndPropagate方法中锁唤醒的节点有所差异 //setHeadAndPropagate方法只唤醒head后继的共享锁节点 //doReleaseShared方法则会唤醒head后继的独占锁或共享锁 doReleaseShared(); return true; } return false; }
releaseShared方法首先会调用tryReleaseShared方法尝试释放共享锁,成功的话会唤醒head节点的后继节点
tryReleaseShared方法
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //若是当前线程是第一个获取读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; //若是第一个获取读锁的线程只获取了一个锁那么firstReader=null //不然firstReaderHoldCount-- if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { //若是当前线程不是第一个获取读锁的线程 HoldCounter rh = cachedHoldCounter; //获取当前线程的HoldCounter if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { //当前线程获取的读锁小于等于1那么就将remove当前线程的HoldCounter readHolds.remove(); //当前线程获取的读锁小于等于0抛出异常 if (count <= 0) throw unmatchedUnlockException(); } //当前线程拥有的读锁数量减1 --rh.count; } //自旋 for (;;) { int c = getState(); //释放后的同步状态 int nextc = c - SHARED_UNIT; //CAS设置同步状态,成功则返回是否同步状态为0 if (compareAndSetState(c, nextc)) return nextc == 0; } }
tryReleaseShared方法的逻辑在于将读锁的标记位-1,而且同时将firstReader和非firstReader的获取锁的次数-1,直到读锁标记位为0时表示读锁释放完毕
锁降级
锁降级指的是先获取到写锁,而后获取到读锁,而后释放了写锁的过程。
由于在获取读锁的时候的判断条件是:
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;
因此当前线程是能够在获取了写锁的状况下再去获取读锁的。
那么在写锁释放了以后应该还能继续持有读锁。
最后:锁是不支持锁升级的(先获取写锁,再获取读锁而后释放读锁),
由于第一步获取读锁的时候可能有多个线程获取了读锁,这样若是锁升级的话将会致使写操做对其余已经获取了读锁的线程不可见。
参考:
https://blog.csdn.net/LightOfMiracle/article/details/73184755
https://www.cnblogs.com/zaizhoumo/p/7782941.html
https://blog.csdn.net/u010577768/article/details/79995811