上一篇文章咱们谈到了Java并发的基石:
AQS的实现原理 juejin.im/post/5e6f93…。
这两天就有朋友说ReentrantLock中的Condition条件等待以及ReentrantReadWriteLock读写锁的实现有点繁琐,问我能不能讲下其关键实现原理。这篇文章咱们就来谈谈这个主题。html
这篇文章咱们来Java中的读写锁以及ReentrantLock中Condition条件等待的实现。阅读完本篇文章,你将了解到:node
读写锁的使用场景和优缺点算法
读写锁的实现原理数据库
如何使用读写锁编程
ReentrantLock的Condition底层是如何实现的后端
若是你不知道ReentrantLock的Condition是干吗的,能够阅读下个人这篇文章:
ReentrantLock介绍与使用:
https://www.itzhai.com/cpj/introduction-and-use-of-reentrantlock.html缓存
有这样一种场景:bash
若是对一个共享资源的写操做没有读操做那么频繁,这个时候能够容许多个线程同时读取共享资源;网络
可是若是有一个线程想去写这些共享资源,那么其余线程此刻就不该该对这些资源进行读和写操做了。数据结构
Java中的ReentrantReadWriteLock
正是为这种场景提供的锁。该类里面包括了读锁和写锁。
没有其余线程正在持有写锁;
尝试获取读锁的线程同时持有写锁。
没有其余线程正在持有读锁;
没有其余线程正在持有写锁。
容许并发读:只要没有线程正在更新数据,那么多个线程就能够同时读取数据;
只能独占写:只要有一个线程正在写数据,那么就会致使其余线程的读或者写均被阻塞;但写的线程能够获取读锁,并经过释放写锁,让锁降级为读锁;(不能由读锁升级为写锁)
只要有一个线程正在读数据,那么其余线程的写入就会阻塞,直到读锁被释放;
公平性:支持非公平锁和公平锁,非公平锁吞吐量较高;
可重入:不管是读锁仍是写锁都是支持可重入的。
读写锁能够增长更新不频繁而读取频繁的共享数据结构的吞吐量。
ReentrantReadWriteLock是可重入读写锁的实现。咱们先来看看涉及到的类:
咱们能够看到,ReentrantReadWriteLock中也具备非公平锁NonfairSync和公平锁FairSync的实现。同时ReentrantReadWriteLock组合了两把锁:写锁WriteLock和读锁ReadLock。
咱们来看看具体的构造函数:
1public ReentrantReadWriteLock(boolean fair) {2 sync = fair ? new FairSync() : new NonfairSync();3 readerLock = new ReadLock(this);4 writerLock = new WriteLock(this);5}复制代码
能够发现,经过参数fair控制是建立非公平锁仍是公平锁。同时ReentrantReadWriteLock持有了写锁和读锁。
而本质上,读锁和写锁都是经过持有ReentrantReadWriteLock.sync来进行加锁和释放锁的,用的是同一个AQS,Sync类提供类对ReentrantReadWriteLock的支持:
1protected ReadLock(ReentrantReadWriteLock lock) {2 sync = lock.sync; // 引用的是ReentrantReadWriteLock的sync实例3}复制代码
1protected WriteLock(ReentrantReadWriteLock lock) {2 sync = lock.sync; // 引用的是ReentrantReadWriteLock的sync实例3}复制代码
基于对AQS原理的理解,咱们知道sync是读写锁实现的关键,而aqs中核心是state字段和双端等待队列。下面咱们来看看具体的实现。
在查看ReentrantReadWriteLock以前,您须要了解如下内容:
读锁计数器类,为每一个获取读锁的线程进行计数。Sync类中有一个cachedHoldCounter字段,该字段主要是缓存上一个线程的读锁计数器,节省ThreadLocal查找次数。
1static final class HoldCounter {2 // 某个读线程的重入次数3 int count = 0;4 // 某个线程的tid字段5 // Use id, not reference, to avoid garbage retention6 final long tid = getThreadId(Thread.currentThread());7}复制代码
当前线程持有的可重入读锁的数量,当数量降低到0的时候进行删除。
1static final class ThreadLocalHoldCounter2 extends ThreadLocal<HoldCounter> {3 public HoldCounter initialValue() {4 return new HoldCounter();5 }6}复制代码
AQS中的state为了可以同时记录读锁和写锁的状态,把32位变量分为了两部分:
如上图,高16位存储读状态,读锁是共享锁,这里记录持有读锁的线程数;低16位是写状态,写锁是排他锁,这里0表示没有线程持有,大于0表示持有线程对锁的重入次数。
虽然读写锁看起来有两把锁,可是底层用的都是同一个state,同一个等待队列。只不过是经过ReadLock和WriteLock分别提供了读锁和写锁的API,底层仍是用同一个AQS。以下图:
因为读写锁是互斥的,因此线程1获取写锁,线程2获取读锁,并发执行的时候,必定有一个会失败;
若是是已经获取了读锁的线程尝试获取写锁,则会获取成功;
公平模式下,先进入等待队列的线程先被处理;非公平模式下,若是尝试获取写锁的线程节点在头节点后面,尝试获取读锁的线程要让步,进入等待队列;
线程节点获取到读锁以后,会判断下一个节点是否处于共享模式,若是是则会一直传播并唤醒后续共享模式节点;
若是有其余线程获取了写锁,那么获取写锁就会被阻塞。
公平和非公平是针对等待队列中的线程节点的处理来讲的:
公平模式通常都是从队列头开始处理,而且若是等待队列还有待处理节点,新的线程所有都入等待队列;
非公平模式通常无论等待队列里面有没有待处理节点,都会先尝试竞争获取锁;特殊状况:若是等待队列中有写锁线程,那么新来的读锁线程必须排队让写锁线程先进行处理。
其实关于读写锁的原理就差很少是这么多了。
如下是详细的代码分析,可能会比较枯燥,为了不让你们一头陷入源码中,因而在上面先把源码作的事情都给讲出来了。建议感兴趣的同窗打开电脑跟踪源码一块儿来阅读。
查看ReadLock的lock相关方法,调用的是AQS的acquireShared方法,该方法会以共享模式获取锁:
1public final void acquireShared(int arg) {2 // 尝试获取锁3 if (tryAcquireShared(arg) < 0)4 // 若是获取锁失败了,那么会进入ASQ的等待队列,等待被唤醒后从新尝试获取锁5 doAcquireShared(arg);6}复制代码
下面看看关键获取锁的tryAcquireShared方法,该方法主要处理逻辑:
由于读写是互斥的,若是另外一个线程持有写锁,则失败;
不然,此线程具有锁定write状态的条件,所以判断是否应该进入阻塞。若是不是,请尝试CAS获取读锁许可并更新读锁计数。请注意,该步骤不检查重入,这将推迟到最后fullTryAcquireShared方法;
若是第2步失败,或者因为线程不符合锁定条件或者CAS失败或读锁计数饱和,将会使用fullTryAcquireShared进一步重试。
下面是详细的说明:
1protected final int tryAcquireShared(int unused) { 2 Thread current = Thread.currentThread(); 3 int c = getState(); 4 // 若是存在写锁,而且写锁不是当前线程,则直接失败让线程进入等待队列 5 if (exclusiveCount(c) != 0 && 6 getExclusiveOwnerThread() != current) 7 return -1; 8 int r = sharedCount(c); 9 // 判断读锁是否应该被阻塞,公平模式下,先进入等待队列则先被处理;非公平模式下写锁优先级比较高,若是头节点的下一个节点不是共享模式,便是尝试获取写锁的线程,读锁须要让步10 if (!readerShouldBlock() &&11 // 读锁是否已到达获取上线12 r < MAX_COUNT &&13 // CAS修改读锁状态,+114 compareAndSetState(c, c + SHARED_UNIT)) {15 // 获取读锁成功16 if (r == 0) {17 // 若是是第一个获取读锁的线程,也就是把读锁状态从0变到1的那个线程,那么存入firstReader中18 firstReader = current;19 // firstReader持有锁=120 firstReaderHoldCount = 1;21 } else if (firstReader == current) {22 // firstReader已是当前线程,则firstReaderHoldCount++23 firstReaderHoldCount++;24 } else { // 读锁数量不为0,而且第一个读线程不为当前线程25 // 获取缓存读锁计数器26 HoldCounter rh = cachedHoldCounter;27 if (rh == null || rh.tid != getThreadId(current))28 // 缓存读锁计数器为空或者计数器不是当前线程的,则尝试经过ThreadLocal获取当前线程对应的计数器29 cachedHoldCounter = rh = readHolds.get();30 else if (rh.count == 0)31 readHolds.set(rh);32 rh.count++;33 }34 return 1;35 }36 // 以上执行失败,则进入该逻辑37 return fullTryAcquireShared(current);38}复制代码
让咱们接着看fullTryAcquireShared方法,这个方法可知,只有其余线程持有写锁,或者使用的是公平锁而且头节点后面还有其余等待的线程,或者头节点后面的节点不是共享模式,或者读锁计数器达到了上限,则阻塞,不然一直会循环尝试获取锁:
1final int fullTryAcquireShared(Thread current) { 2 HoldCounter rh = null; 3 for (;;) { 4 int c = getState(); 5 // 若是存在写锁,而且写锁不是当前线程,则返回false 6 if (exclusiveCount(c) != 0) { 7 if (getExclusiveOwnerThread() != current) 8 return -1; 9 // else we hold the exclusive lock; blocking here10 // would cause deadlock.11 // 不存在写锁,继续判断是否应该阻塞:若是是公平锁而且头节点后有其余等待的线程,则阻塞,若是是非公平锁,判断头节点后面的节点是否共享模式,若是不是则阻塞12 } else if (readerShouldBlock()) {13 // Make sure we're not acquiring read lock reentrantly14 // 若是当前线程是firstReader,说明是重入15 if (firstReader == current) {16 // assert firstReaderHoldCount > 0;17 } else {18 // 进入该分支,说明没有读写锁冲突,而且不是重入,当前线程也不是firstReader19 if (rh == null) {20 rh = cachedHoldCounter;21 // 判断上一个获取到锁的线程是否当前线程,不是则进入AQS等待队列22 if (rh == null || rh.tid != getThreadId(current)) {23 rh = readHolds.get();24 if (rh.count == 0)25 readHolds.remove();26 }27 }28 // rh.count == 0 表示rh是刚新获取到的,直接返回,进入等待队列29 if (rh.count == 0)30 return -1;31 }32 }33 // 共享锁达到上限了34 if (sharedCount(c) == MAX_COUNT)35 throw new Error("Maximum lock count exceeded");36 // 读锁自增,如下代码与上一个方法中的相似37 if (compareAndSetState(c, c + SHARED_UNIT)) {38 if (sharedCount(c) == 0) {39 firstReader = current;40 firstReaderHoldCount = 1;41 } else if (firstReader == current) {42 firstReaderHoldCount++;43 } else {44 if (rh == null)45 rh = cachedHoldCounter;46 if (rh == null || rh.tid != getThreadId(current))47 rh = readHolds.get();48 else if (rh.count == 0)49 readHolds.set(rh);50 rh.count++;51 cachedHoldCounter = rh; // cache for release52 }53 return 1;54 }55 }56}复制代码
最后咱们来看看doAcquireShared方法:
1private void doAcquireShared(int arg) { 2 // 添加一个共享等待节点 3 final Node node = addWaiter(Node.SHARED); 4 boolean failed = true; 5 try { 6 boolean interrupted = false; 7 for (;;) { 8 // 判断新增的节点的前一个节点是否头节点 9 final Node p = node.predecessor();10 if (p == head) { // 是头节点,那么在此尝试获取共享锁11 int r = tryAcquireShared(arg);12 if (r >= 0) { 13 // 获取成功,把当前节点变为新的head节点,而且检查后续节点是否能够在共享模式下等待,而且容许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁14 setHeadAndPropagate(node, r);15 p.next = null; // help GC16 if (interrupted)17 selfInterrupt();18 failed = false;19 return;20 }21 }22 // 阻塞节点23 if (shouldParkAfterFailedAcquire(p, node) &&24 parkAndCheckInterrupt())25 interrupted = true;26 }27 } finally {28 if (failed)29 // 取消获取锁30 cancelAcquire(node);31 }32}复制代码
接下来咱们看看释放锁的代码。
1public void unlock() {2 sync.releaseShared(1);3}复制代码
AbstractQueuedSynchronizer.releaseShared()
1public final boolean releaseShared(int arg) {2 if (tryReleaseShared(arg)) {3 doReleaseShared();4 return true;5 }6 return false;7}复制代码
主要处理方法是tryReleaseShared,该方法主要是清理ThreadLocal中的锁计数器,而后CAS修改读锁个数减1:
1protected final boolean tryReleaseShared(int unused) { 2 Thread current = Thread.currentThread(); 3 if (firstReader == current) { 4 // assert firstReaderHoldCount > 0; 5 if (firstReaderHoldCount == 1) 6 firstReader = null; 7 else 8 firstReaderHoldCount--; 9 } else {10 HoldCounter rh = cachedHoldCounter;11 if (rh == null || rh.tid != getThreadId(current))12 rh = readHolds.get();13 int count = rh.count;14 if (count <= 1) {15 readHolds.remove();16 if (count <= 0)17 throw unmatchedUnlockException();18 }19 --rh.count;20 }21 for (;;) {22 int c = getState();23 int nextc = c - SHARED_UNIT;24 if (compareAndSetState(c, nextc))25 // Releasing the read lock has no effect on readers,26 // but it may allow waiting writers to proceed if27 // both read and write locks are now free.28 return nextc == 0;29 }30}复制代码
查看WriteLock的lock锁相关方法,调用的是sync.acquire方法,该方法直接继承了ASQ的acquire()方法的实现:
1public void lock() {2 sync.acquire(1);3}复制代码
与ReentrantLock的实现区别在具体的tryAcquire()方法的实现,咱们来看看ReentrantReadWriteLock.Sync中该方法的实现,主要作了如下事情:
若是读锁数量>0,或者写锁数量>0,而且不是重入的,那么直接失败了;
若是锁数量为0,那么该线程有资格获取到写锁,进而尝试获取。
1protected final boolean tryAcquire(int acquires) { 2 Thread current = Thread.currentThread(); 3 int c = getState(); 4 int w = exclusiveCount(c); 5 if (c != 0) { // 存在读锁或者写锁 6 // 不存在写锁,或者当前线程不是写锁持有的线程,那么直接失败 7 if (w == 0 || current != getExclusiveOwnerThread()) 8 return false; 9 // 写锁超多最大数量限制,也直接失败10 if (w + exclusiveCount(acquires) > MAX_COUNT)11 throw new Error("Maximum lock count exceeded");12 // Reentrant acquire13 // 写锁持有的线程重入,直接修改state便可14 setState(c + acquires);15 return true;16 }17 // 判断是否应该阻塞:非公平模式,无需阻塞,公平模式若是前面有其余节点则须要排队阻塞18 if (writerShouldBlock() ||19 // 尝试获取写锁20 !compareAndSetState(c, c + acquires))21 return false;22 setExclusiveOwnerThread(current);23 return true;24}复制代码
查看WriteLock的unlock相关方法,调用的是sync.release方法,该方法直接继承了AQS的release实现:
1public void unlock() {2 sync.release(1);3}复制代码
如下是release方法:
1public final boolean release(int arg) { 2 // 尝试释放锁 3 if (tryRelease(arg)) { 4 // 释放锁成功,则唤醒队列中头节点后的一个线程 5 Node h = head; 6 if (h != null && h.waitStatus != 0) 7 unparkSuccessor(h); 8 return true; 9 }10 return false;11}复制代码
释放锁的逻辑主要在tryRelease方法,下面是详细代码:
1protected final boolean tryRelease(int releases) { 2 // 若是当前线程没有获取写锁,则释放直接抛异常 3 if (!isHeldExclusively()) 4 throw new IllegalMonitorStateException(); 5 int nextc = getState() - releases; 6 boolean free = exclusiveCount(nextc) == 0; 7 // 若是当前线程彻底释放了写锁,则去除独占标识 8 if (free) 9 setExclusiveOwnerThread(null);10 // 修改state11 setState(nextc);12 return free;13}复制代码
下面是读写锁的使用例子,该例子实现了一个支持并发访问的ArrayList。
由于读写锁是互斥的,保证了不会由于写致使读取出现的不一致。
代码以下:
1public class ReentrantReadWriteLockTest { 2 3 static final int READER_SIZE = 10; 4 static final int WRITER_SIZE = 2; 5 6 public static void main(String[] args) { 7 Integer[] initialElements = {33, 28, 86, 99}; 8 9 ReadWriteList<Integer> sharedList = new ReadWriteList<>(initialElements); 10 11 for (int i = 0; i < WRITER_SIZE; i++) { 12 new Writer(sharedList).start(); 13 } 14 15 for (int i = 0; i < READER_SIZE; i++) { 16 new Reader(sharedList).start(); 17 } 18 19 } 20 21} 22 23class Reader extends Thread { 24 private ReadWriteList<Integer> sharedList; 25 26 public Reader(ReadWriteList<Integer> sharedList) { 27 this.sharedList = sharedList; 28 } 29 30 public void run() { 31 Random random = new Random(); 32 int index = random.nextInt(sharedList.size()); 33 Integer number = sharedList.get(index); 34 35 System.out.println(getName() + " -> get: " + number); 36 37 try { 38 Thread.sleep(100); 39 } catch (InterruptedException ie ) { ie.printStackTrace(); } 40 41 } 42} 43 44class Writer extends Thread { 45 private ReadWriteList<Integer> sharedList; 46 47 public Writer(ReadWriteList<Integer> sharedList) { 48 this.sharedList = sharedList; 49 } 50 51 public void run() { 52 Random random = new Random(); 53 int number = random.nextInt(100); 54 sharedList.add(number); 55 56 try { 57 Thread.sleep(100); 58 System.out.println(getName() + " -> put: " + number); 59 } catch (InterruptedException ie ) { ie.printStackTrace(); } 60 } 61} 62 63/** 64 * 支持并发读写的ArrayList 65 */ 66class ReadWriteList<E> { 67 private List<E> list = new ArrayList<>(); 68 private ReadWriteLock rwLock = new ReentrantReadWriteLock(); 69 70 public ReadWriteList(E... initialElements) { 71 list.addAll(Arrays.asList(initialElements)); 72 } 73 74 public void add(E element) { 75 Lock writeLock = rwLock.writeLock(); 76 writeLock.lock(); 77 78 try { 79 list.add(element); 80 } finally { 81 writeLock.unlock(); 82 } 83 } 84 85 public E get(int index) { 86 Lock readLock = rwLock.readLock(); 87 readLock.lock(); 88 89 try { 90 return list.get(index); 91 } finally { 92 readLock.unlock(); 93 } 94 } 95 96 public int size() { 97 Lock readLock = rwLock.readLock(); 98 readLock.lock(); 99100 try {101 return list.size();102 } finally {103 readLock.unlock();104 }105 }106107}复制代码
接下来咱们来ReentrantLock中的Condition实现原理。
有以下的ReentrantLock和Condition:
1// 锁和条件变量2private final Lock lock = new ReentrantLock();3// 条件4private final Condition condition1 = lock.newCondition();复制代码
下面来看看执行await和signal的流程。
通常地,只有线程获取到lock以后,才可使用condition的await方法。假设此时线程1获取到了ReentrantLock锁,在执行代码逻辑的时候,发现某些条件不符合,因而调用了如下代码:
1while(xxx条件不知足) {2 condition1.await();3}复制代码
此时AQS主要执行如下动做:
线程1把本身包装成节点,waitStatus设为CONDITION(-2),追加到ConditionObject中的条件队列(每一个ConditionObject有一个本身的条件队列);
线程1释放锁,把state设置为0;
而后唤醒等待队列中head节点的下一个节点;
以下:
接下来进入一个循环,若是判断到当前线程的节点不在等待队列,那么会一直让当前线程阻塞,代码以下:
1while (!isOnSyncQueue(node)) {2 LockSupport.park(this);3 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)4 break;5}复制代码
这个时候已经唤醒其余线程继续处理了,只有其余线程执行了condition1.signal或者condition1.signalAll以后,才会唤醒线程1进行处理后续的流程。
当另外一个线程执行了 condition1.signal以后,主要是作了如下事情:
把条件队列中的第一个节点追加到等待队列中;
把等待队列原来尾节点的waitStatus设置为SIGNAL。
而后继续处理本身的事情,本身的事情处理完成以后,会释放锁,唤醒等待队列中head节点的下一个节点线程进行工做。
被唤醒的若是是以前执行了await方法的线程,那么该线程会接着就像往await方法里面阻塞处的下面继续执行,下面是源码:
1// 若是当前节点不在等待队列,会一直进行阻塞 2while (!isOnSyncQueue(node)) { 3 LockSupport.park(this); 4 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 5 break; 6} 7// 该方法主要作如下事情: 8// 1.尝试获取ReentrantLock锁 9// 2.获取成功,把如今线程节点变为新的head节点10// 3. 不然根据继续休眠等待11if (acquireQueued(node, savedState) && interruptMode != THROW_IE)12 interruptMode = REINTERRUPT;13if (node.nextWaiter != null) // 若是等待节点被取消了,那么从条件队列中移除14 unlinkCancelledWaiters();15if (interruptMode != 0)16 reportInterruptAfterWait(interruptMode);复制代码
能够发现,这里主要是判断到当前线程节点已经放入等待队列了,那么会尝试获取锁,获取成功则继续往下执行代码。
第一节咱们知道只有线程获取到ReentrantLock的锁以后才能够继续往下执行,中间可能会由于执行await而进入条件队列并释放锁,最后又会被唤醒从新获取锁,继续往下执行。最后按照书写规范,咱们必定会在代码中执行ReentrantLock.unlock()释放锁,而后继续唤醒等待队列后续线程继续执行。
这篇文章的内容就差很少介绍到这里了,可以阅读到这里的朋友真的是颇有耐心,为你点个赞。
本文为arthinking基于相关技术资料和官方文档撰写而成,确保内容的准确性,若是你发现了有何错漏之处,烦请高抬贵手帮忙指正,万分感激。
你们能够关注个人博客:itzhai.com
获取更多文章,我将持续更新后端相关技术,涉及JVM、Java基础、架构设计、网络编程、数据结构、数据库、算法、并发编程、分布式系统等相关内容。
若是您以为读完本文有所收获的话,能够关注个人帐号,或者点个赞吧,码字不易,您的支持就是我写做的最大动力,再次感谢!
关注个人公众号,及时获取最新的文章。
更多文章
JVM系列专题:公众号发送 JVM
·END·
访问IT宅(itzhai.com)查看个人博客更多文章
扫码关注及时获取新内容↓↓↓
Java后端技术架构 · 技术专题 · 经验分享
码字不易,若有收获,点个「赞」哦~
本文做者:arthinking
博客连接:
ReentrantLock的Conditiion原理解析 https://www.itzhai.com/cpj/analysis-of-reentrantlocks-condition-principle.html
ReentrantReadWriteLock介绍与使用 https://www.itzhai.com/cpj/introduction-and-use-of-reentrantreadwritelock.html版权声明:BY-NC-SA许可协议:创做不易,如需转载,请务必附加上博客连接,谢谢!