重入锁 ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程能够进行访问,但在大多数场景下,大部分时间都是提供度服务,而写服务占有的时间较少。然而读服务不存在数据竞争问题,若是一个线程在读时禁止其余线程势必会致使性能下降。因此就提供了读写锁。java
读写锁维护者一对锁,一个读锁和一个写锁。经过分离读锁和写锁,使得并发性比通常的排他锁有了较大的提高:在同一时间能够容许多个读线程同时访问,可是在写线程访问时,全部读线程和写线程都会被阻塞。并发
读写锁的主要特性:性能
读写锁ReentrantReadWriteLock实现接口ReadWriteLock,该接口维护了一对相关的锁,一个用于只读操做,另外一个用于写入操做。只要没有 writer,读取锁能够由多个 reader 线程同时保持。写入锁是独占的。ui
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
ReadWriteLock定义了两个方法。readLock()返回用于读操做的锁,writeLock()返回用于写操做的锁。ReentrantReadWriteLock定义以下:this
/** 内部类 读锁 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 内部类 写锁 */ private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; /** 使用默认(非公平)的排序属性建立一个新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock() { this(false); } /** 使用给定的公平策略建立一个新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } /** 返回用于写入操做的锁 */ public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } /** 返回用于读取操做的锁 */ public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer { /** * 省略其他源代码 */ } public static class WriteLock implements Lock, java.io.Serializable{ /** * 省略其他源代码 */ } public static class ReadLock implements Lock, java.io.Serializable { /** * 省略其他源代码 */ }
ReentrantReadWriteLock与ReentrantLock同样,其锁主体依然是Sync,它的读锁、写锁都是依靠Sync来实现的。因此ReentrantReadWriteLock实际上只有一个锁,只是在获取读取锁和写入锁的方式上不同而已,它的读写锁其实就是两个类:ReadLock、writeLock,这两个类都是lock实现。线程
高16位表示读,表示持有读锁的线程数(sharedCount)设计
低16位表示写。写锁的重入次数 (exclusiveCount)code
& 运算:1 1 为1 ,其他为0.对象
在ReentrantLock中使用一个int类型的state来表示同步状态,该值表示锁被一个线程重复获取的次数。可是读写锁ReentrantReadWriteLock内部维护着两个一对锁,须要用一个变量维护多种状态。因此读写锁采用“按位切割使用”的方式来维护这个变量,将其切分为两部分,高16为表示读,低16为表示写。分割以后,读写锁是如何迅速肯定读锁和写锁的状态呢?经过为运算。假如当前同步状态为S,那么写状态等于 S & 0x0000FFFF(将高16位所有抹去),读状态等于S >>> 16(无符号补0右移16位)。代码以下:排序
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
写锁就是一个支持可重入的排他锁。
写锁的获取最终会调用tryAcquire(int arg),该方法在内部类Sync中实现:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //当前锁个数 int c = getState(); //写锁 int w = exclusiveCount(c); if (c != 0) { //c != 0 && w == 0 表示存在读锁 //当前线程不是已经获取写锁的线程 if (w == 0 || current != getExclusiveOwnerThread()) return false; //超出最大范围 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } //是否须要阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //设置获取锁的线程为当前线程 setExclusiveOwnerThread(current); return true; }
写锁的获取过程
该方法和ReentrantLock的tryAcquire(int arg)大体同样,在判断重入时增长了一项条件:读锁是否存在。由于要确保写锁的操做对读锁是可见的,若是在存在读锁的状况下容许获取写锁,那么那些已经获取读锁的其余线程可能就没法感知当前写线程的操做。所以只有等读锁彻底释放后,写锁才可以被当前线程所获取,一旦写锁获取了,全部其余读、写线程均会被阻塞。
获取了写锁用完了则须要释放,WriteLock提供了unlock()方法释放写锁:
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
写锁的释放最终仍是会调用AQS的模板方法release(int arg)方法,该方法首先调用tryRelease(int arg)方法尝试释放锁,tryRelease(int arg)方法为读写锁内部类Sync中定义了,以下:
protected final boolean tryRelease(int releases) { //释放的线程不为锁的持有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //若写锁的新线程数为0,则将锁的持有者设置为null boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
写锁释放锁的整个过程和独占锁ReentrantLock类似,每次释放均是减小写状态,当写状态为0时表示 写锁已经彻底释放了,从而等待的其余线程能够继续访问读写锁,获取同步状态,同时这次写线程的修改对后续的线程可见。
读锁为一个可重入的共享锁,它可以被多个线程同时持有,在没有其余写线程访问时,读锁老是或获取成功。
读锁的获取能够经过ReadLock的lock()方法:
public void lock() { sync.acquireShared(1); }
Sync的acquireShared(int arg)定义在AQS中:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcqurireShared(int arg)尝试获取读同步状态,该方法主要用于获取共享式同步状态,获取成功返回 >= 0的返回结果,不然返回 < 0 的返回结果。
protected final int tryAcquireShared(int unused) { //当前线程 Thread current = Thread.currentThread(); int c = getState(); //exclusiveCount(c)计算写锁 //若是存在写锁,且锁的持有者不是当前线程,直接返回-1 //存在锁降级问题,后续阐述 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁 int r = sharedCount(c); /* * readerShouldBlock():读锁是否须要等待(公平锁原则) * r < MAX_COUNT:持有线程小于最大数(65535) * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态 */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { /* * holdCount部分后面讲解 */ if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
说明: readHolds 保存的是当前线程的重入次数。 firstReader :第一个获取读锁的线程是单独存放的,提升效率避免查找 readHolds。
读锁的获取过程。 1.持有写锁的当前线程能够获取读锁,其他的都不能够,返回-1. 2.尝试获取读锁,设置读锁的获取次数。
读锁获取的过程相对于独占锁而言会稍微复杂下,整个过程以下:
由于存在锁降级状况,若是存在写锁且锁的持有者不是当前线程则直接返回失败,不然继续
依据公平性原则,判断读锁是否须要阻塞,读锁持有线程数小于最大值(65535),且设置锁状态成功,执行如下代码(对于HoldCounter下面再阐述),并返回1。若是不知足改条件,执行fullTryAcquireShared()。
fullTryAcquireShared: // 获取读锁失败,放到循环里重试。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //锁降级 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } //读锁须要阻塞 else if (readerShouldBlock()) { //列头为当前线程 if (firstReader == current) { } //HoldCounter后面讲解 else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //读锁超出最大范围 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS设置读锁成功 if (compareAndSetState(c, c + SHARED_UNIT)) { //若是是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } //若是想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,则将firstReaderHoldCount+1 else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //更新线程的获取“读取锁”的共享计数 rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
循环里重试的过程:
fullTryAcquireShared(Thread current)会根据“是否须要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。若是不须要阻塞等待,而且锁的共享计数没有超过限制,则经过CAS尝试获取锁,并返回1
与写锁相同,读锁也提供了unlock()释放读锁:
public void unlock() { sync.releaseShared(1); }
unlcok()方法内部使用Sync的releaseShared(int arg)方法,该方法定义在AQS中:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
调用tryReleaseShared(int arg)尝试释放读锁,该方法定义在读写锁的Sync内部类中:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //若是想要释放锁的线程为第一个获取锁的线程 if (firstReader == current) { //仅获取了一次,则须要将firstReader 设置null,不然 firstReaderHoldCount - 1 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } //获取rh对象,并更新“当前线程获取锁的信息” else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //CAS更新同步状态 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
在读锁获取锁和释放锁的过程当中,咱们一直均可以看到一个变量rh (HoldCounter ),该变量在读锁中扮演着很是重要的做用。 咱们了解读锁的内在机制其实就是一个共享锁,为了更好理解HoldCounter ,咱们暂且认为它不是一个锁的几率,而至关于一个计数器。一次共享锁的操做就至关于在该计数器的操做。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操做。因此HoldCounter的做用就是当前线程持有共享锁的数量,这个数量必需要与线程绑定在一块儿,不然操做其余线程锁就会抛出异常。咱们先看HoldCounter的定义:
static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }