有了以上两篇文章的铺垫,来理解本文要介绍的既有独占式,又有共享式获取同步状态的 ReadWriteLock
,就很是轻松了html
ReadWriteLock
直译过来为【读写锁】。现实中,读多写少的业务场景是很是广泛的,好比应用缓存java
一个线程将数据写入缓存,其余线程能够直接读取缓存中的数据,提升数据查询效率
以前提到的互斥锁都是排他锁,也就是说同一时刻只容许一个线程进行访问,当面对可共享读的业务场景,互斥锁显然是比较低效的一种处理方式。为了提升效率,读写锁模型就诞生了编程
效率提高是一方面,但并发编程更重要的是在保证准确性的前提下提升效率api
一个写线程改变了缓存中的值,其余读线程必定是能够 “感知” 到的,不然可能致使查询到的值不许确
因此关于读写锁模型就了下面这 3 条规定:缓存
ReadWriteLock
是一个接口,其内部只有两个方法:安全
public interface ReadWriteLock { // 返回用于读的锁 Lock readLock(); // 返回用于写的锁 Lock writeLock(); }
因此要了解整个读/写锁的整个应用过程,须要从它的实现类 ReentrantReadWriteLock
提及多线程
直接对比ReentrantReadWriteLock 与 ReentrantLock的类结构并发
他们又很类似吧,根据类名称以及类结构,按照我们前序文章的分析,你也就能看出 ReentrantReadWriteLock 的基本特性:oracle
其中黄颜色标记的的 锁降级 是看不出来的, 这里先有个印象,下面会单独说明app
另外,不知道你是否还记得,Java AQS队列同步器以及ReentrantLock的应用 说过,Lock 和 AQS 同步器是一种组合形式的存在,既然这里是读/写两种锁,他们的组合模式也就分红了两种:
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
这里只是提醒你们,模式没有变,不要被读/写两种锁迷惑
说了这么多,若是你忘了前序知识,总体理解感受应该是有断档的,因此先来看个示例(模拟使用缓存)让你们对 ReentrantReadWriteLock 有个直观的使用印象
public class ReentrantReadWriteLockCache { // 定义一个非线程安全的 HashMap 用于缓存对象 static Map<String, Object> map = new HashMap<String, Object>(); // 建立读写锁对象 static ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 构建读锁 static Lock rl = readWriteLock.readLock(); // 构建写锁 static Lock wl = readWriteLock.writeLock(); public static final Object get(String key) { rl.lock(); try{ return map.get(key); }finally { rl.unlock(); } } public static final Object put(String key, Object value){ wl.lock(); try{ return map.put(key, value); }finally { wl.unlock(); } } }
你瞧,使用就是这么简单。可是你知道的,AQS 的核心是锁的实现,即控制同步状态 state 的值,ReentrantReadWriteLock 也是应用AQS的 state 来控制同步状态的,那么问题来了:
一个 int 类型的 state 怎么既控制读的同步状态,又能够控制写的同步状态呢?
显然须要一点设计了
若是要在一个 int 类型变量上维护多个状态,那确定就须要拆分了。咱们知道 int 类型数据占32位,因此咱们就有机会按位切割使用state了。咱们将其切割成两部分:
因此,要想准确的计算读/写各自的状态值,确定就要应用位运算了,下面代码是 JDK1.8,ReentrantReadWriteLock 自定义同步器 Sync 的位操做
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
乍一看真是有些复杂的可怕,别慌,我们经过几道小小数学题就能够搞定整个位运算过程
整个 ReentrantReadWriteLock 中 读/写状态的计算就是反复应用这几道数学题,因此,在阅读下面内容以前,但愿你搞懂这简单的运算
基础铺垫足够了,咱们进入源码分析吧
因为写锁是排他的,因此确定是要重写 AQS 中 tryAcquire
方法
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); // 获取 state 总体的值 int c = getState(); // 获取写状态的值 int w = exclusiveCount(c); if (c != 0) { // w=0: 根据推理二,总体状态不等于零,写状态等于零,因此,读状态大于0,即存在读锁 // 或者当前线程不是已获取写锁的线程 // 两者之一条件成真,则获取写状态失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 根据推理一第 1 条,更新写状态值 setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
上述代码 第 19 行 writerShouldBlock 也并无什么神秘的,只不过是公平/非公平获取锁方式的判断(是否有前驱节点来判断)
你瞧,写锁获取方式就是这么简单
因为读锁是共享式的,因此确定是要重写 AQS 中 tryAcquireShared
方法
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 写状态不等于0,而且锁的持有者不是当前线程,根据约定 3,则获取读锁失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取读状态值 int r = sharedCount(c); // 这个地方有点不同,咱们单独说明 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } // 若是获取读锁失败则进入自旋获取 return fullTryAcquireShared(current); }
readerShouldBlock
和 writerShouldBlock
在公平锁的实现上都是判断是否有前驱节点,可是在非公平锁的实现上,前者是这样的:
final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && // 等待队列头节点的下一个节点 (s = h.next) != null && // 若是是排他式的节点 !s.isShared() && s.thread != null; }
简单来讲,若是请求读锁的当前线程发现同步队列的 head 节点的下一个节点为排他式节点,那么就说明有一个线程在等待获取写锁(争抢写锁失败,被放入到同步队列中),那么请求读锁的线程就要阻塞,毕竟读多写少,若是尚未这点判断机制,写锁可能会发生【饥饿】
上述条件都知足了,也就会进入
tryAcquireShared
代码的第 14 行到第 25 行,这段代码主要是为了记录线程持有锁的次数。读锁是共享式的,还想记录每一个线程持有读锁的次数,就要用到 ThreadLocal 了,由于这不影响同步状态 state 的值,因此就不分析了, 只把关系放在这吧
到这里读锁的获取也就结束了,比写锁稍稍复杂那么一丢丢,接下来就说明一下那个可能让你迷惑的锁升级/降级问题吧
我的理解:读锁是能够被多线程共享的,写锁是单线程独占的,也就是说写锁的并发限制比读锁高,因此
在真正了解读写锁的升级与降级以前,咱们须要完善一下本文开头 ReentrantReadWriteLock 的例子
public static final Object get(String key) { Object obj = null; rl.lock(); try{ // 获取缓存中的值 obj = map.get(key); }finally { rl.unlock(); } // 缓存中值不为空,直接返回 if (obj!= null) { return obj; } // 缓存中值为空,则经过写锁查询DB,并将其写入到缓存中 wl.lock(); try{ // 再次尝试获取缓存中的值 obj = map.get(key); // 再次获取缓存中值仍是为空 if (obj == null) { // 查询DB obj = getDataFromDB(key); // 伪代码:getDataFromDB // 将其放入到缓存中 map.put(key, obj); } }finally { wl.unlock(); } return obj; }
有童鞋可能会有疑问
在写锁里面,为何代码第19行还要再次获取缓存中的值呢?不是画蛇添足吗?
其实这里再次尝试获取缓存中的值是颇有必要的,由于可能存在多个线程同时执行 get 方法,而且参数 key 也是相同的,执行到代码第 16 行 wl.lock()
,好比这样:
线程 A,B,C 同时执行到临界区 wl.lock(), 只有线程 A 获取写锁成功,线程B,C只能阻塞,直到线程A 释放写锁。这时,当线程B 或者 C 再次进入临界区时,线程 A 已经将值更新到缓存中了,因此线程B,C不必再查询一次DB,而是再次尝试查询缓存中的值
既然再次获取缓存颇有必要,我可否在读锁里直接判断,若是缓存中没有值,那就再次获取写锁来查询DB不就能够了嘛,就像这样:
public static final Object getLockUpgrade(String key) { Object obj = null; rl.lock(); try{ obj = map.get(key); if (obj == null){ wl.lock(); try{ obj = map.get(key); if (obj == null) { obj = getDataFromDB(key); // 伪代码:getDataFromDB map.put(key, obj); } }finally { wl.unlock(); } } }finally { rl.unlock(); } return obj; }
这还真是不能够的,由于获取一个写入锁须要先释放全部的读取锁,若是有两个读取锁试图获取写入锁,且都不释放读取锁时,就会发生死锁,因此在这里,锁的升级是不被容许的
读写锁的升级是不能够的,那么锁的降级是能够的嘛?这个是 Oracle 官网关于锁降级的示例 ,我将代码粘贴在此处,你们有兴趣能够点进去链接看更多内容
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必须在获取写锁以前释放读锁,由于锁的升级是不被容许的 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // 再次检查,缘由多是其余线程已经更新过缓存 if (!cacheValid) { data = ... cacheValid = true; } //在释放写锁前,降级为读锁 rwl.readLock().lock(); } finally { //释放写锁,此时持有读锁 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
代码中声明了一个 volatile 类型的 cacheValid 变量,保证其可见性。
这个过程就是一个完整的锁降级的过程,目的是保证数据可见性,听起来颇有道理的样子,那么问题来了:
上述代码为何在释放写锁以前要获取读锁呢?
若是当前的线程A在修改完cache中的数据后,没有获取读锁而是直接释放了写锁;假设此时另外一个线程B 获取了写锁并修改了数据,那么线程A没法感知到数据已被修改,但线程A还应用了缓存数据,因此就可能出现数据错误
若是遵循锁降级的步骤,线程A 在释放写锁以前获取读锁,那么线程B在获取写锁时将被阻塞,直到线程A完成数据处理过程,释放读锁,从而保证数据的可见性
那问题又来了:
使用写锁必定要降级吗?
若是你理解了上面的问题,相信这个问题已经有了答案。假如线程A修改完数据以后, 通过耗时操做后想要再使用数据时,但愿使用的是本身修改后的数据,而不是其余线程修改后的数据,这样的话确实是须要锁降级;若是只是但愿最后使用数据的时候,拿到的是最新的数据,而不必定是本身刚修改过的数据,那么先释放写锁,再获取读锁,而后使用数据也无妨
在这里我要额外说明一下你可能存在的误解:
相信你到这里也理解了锁的升级与降级过程,以及他们被容许或被禁止的缘由了
本文主要说明了 ReentrantReadWriteLock 是如何应用 state 作位拆分实现读/写两种同步状态的,另外也经过源码分析了读/写锁获取同步状态的过程,最后又了解了读写锁的升级/降级机制,相信到这里你对读写锁已经有了必定的理解。若是你对文中的哪些地方以为理解有些困难,强烈建议你回看本文开头的两篇文章,那里铺垫了很是多的内容。接下来咱们就看看在应用AQS的最后一个并发工具类 CountDownLatch 吧
// WriteLock public Condition newCondition() { return sync.newCondition(); } // ReadLock public Condition newCondition() { throw new UnsupportedOperationException(); }
日拱一兵 | 原创