解决线程安全问题使用ReentrantLock就能够了,可是ReentrantLock是独占锁,某一时刻只有一个线程能够获取该锁,而实际中会有写少读多的场景,显然ReentrantLock知足不了这个需求,因此ReentrantReadWriteLock应运而生。ReentrantReadWriteLock采用读写分离的策略,容许多个线程能够同时获取读锁。html
由类图可知,读写锁内部维护了一个ReadLock和一个WriteLock,他们依赖Sync实现具体功能,而Sync继承自AQS,而且提供了公平和非公平的实现。java
咱们先看下ReentrantReadWriteLock类的总体结构编程
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock;//读锁🔒 private final ReentrantReadWriteLock.WriteLock writerLock;//写锁🔒 final Sync sync; public ReentrantReadWriteLock() {//使用默认(非公平)的排序属性建立一个新的ReentrantReadWriteLock this(false); } public ReentrantReadWriteLock(boolean fair) {//使用给定的公平策略建立一个新的ReentrantReadWriteLock sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }//返回用于写入操做的锁🔒 public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }//返回用于读取操做的锁🔒 abstract static class Sync extends AbstractQueuedSynchronizer { .... static final class NonfairSync extends Sync {....}//非公平策略 static final class FairSync extends Sync {.... }//公平策略 public static class ReadLock implements Lock, java.io.Serializable {}//读锁🔒 .... public static class WriteLock implements Lock, java.io.Serializable {}//写锁🔒 .... }
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}缓存
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
说明:能够看到ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口规范了读写锁方法,具体操做由子类去实现,同时还实现了Serializable接口,表示能够进行序列化操做。安全
abstract static class Sync extends AbstractQueuedSynchronizer {}
说明:Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持。并发
Sync类的内部类存在两个,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用,HoldCounter源码以下函数
static final class HoldCounter {//计数器 int count = 0;//计数 // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread());//线程id }
说明:HoldCounter有两个属性,count和tid,其中count表示某个读线程重入次数,tid表示该线程的tid字段的值,该字段能够用来惟一标识一个线程。源码分析
ThreadLocalHoldCounter源码以下:ui
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {//本地线程计数器 public HoldCounter initialValue() {//重写初始化方法,在没有进行set的状况下,获取的都是该HoldCounter的值 return new HoldCounter(); } }
说明:ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类能够将线程与对象相关联。在没用进行set的状况下,get到的均是initialValue方法里面生成的那个HoldCounter对象。this
abstract static class Sync extends AbstractQueuedSynchronizer { //版本序号 private static final long serialVersionUID = 6317671515068378041L; //高16位为读锁🔒,低16位为写锁🔒 static final int SHARED_SHIFT = 16; //读锁🔒单位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读锁🔒的最大数量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //写锁🔒的最大数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //本地线程计数器 private transient ThreadLocalHoldCounter readHolds; //缓存计数器 private transient HoldCounter cachedHoldCounter; //第一个读线程 private transient Thread firstReader = null; //第一个读线程的计数 private transient int firstReaderHoldCount;
说明:该属性中包括了读锁,写锁线程的最大量。本地线程计数器等。
//构造函数 Sync() { //本地线程计数器 readHolds = new ThreadLocalHoldCounter(); //设置AQS的状态 setState(getState()); // ensures visibility of readHolds }
说明:在Sync的构造函数中设置了本地线程计数器和AQS的状态state。
读写锁须要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
读写锁对于同步状态的实现是在一个整形变量上经过“按位切割使用”:将变量切割成两部分,高16位表示读状态,也就是获取到读锁的次数,低16位表示获取到写线程的可重入次数。
假设当前同步状态值为S,get和set的操做以下:
(1)获取写状态:
S&0x0000FFFF:将高16位所有抹去
(2)获取读状态:
S>>>16:无符号补0,右移16位
(3)写状态加1:
S+1
(4)读状态加1:
S+(1<<16)即S + 0x00010000
在代码层的判断中,若是S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。
看下WriteLock类中的lock和unlock方法:
public void lock() {
sync.acquire(1);
}public void unlock() {
sync.release(1);
}
说明:能够看到就是调用独占式同步状态的获取与释放,所以真实的实现就是Sync的tryAcquire和tryRelease。
protected final boolean tryAcquire(int acquires) { //当前线程 Thread current = Thread.currentThread(); //获取状态 int c = getState(); //写线程数量(即获取独占锁的重入数) int w = exclusiveCount(c); //c!=0说明读锁或者写锁已经被某线程获取 if (c != 0) { //w=0说明已经有线程获取了读锁返回false,w!=0而且当前线程不是写锁的拥有者,则返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //说明当前线程获取了写锁,判断可重入次数(最大次数65535) if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //次数当前线程已经持有写锁,设置可重入次数 setState(c + acquires); return true; } //到这里说明此时c=0,读锁和写锁都没有被获取,writerShouldBlock表示是否阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //设置锁位当前线程所持有 setExclusiveOwnerThread(current); return true; }
其中exclusiveCount方法表示占有写锁的线程数量,源码以下:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static final int SHARED_SHIFT = 16;
说明:直接将状态state和(2^16 - 1)作与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示。
writerShouldBlock()判断是否须要阻塞,公平和非公平方式实现不一样。
非公平实现方式:
在非公平策略方式下老是不会被阻塞
final boolean writerShouldBlock() { return false; // writers can always barge }
公平实现方式,在公平策略下会进行判断,判断同步队列中是否有等待时间更长的线程,若存在,则须要被阻塞,不然无需阻塞。
final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
说明:若是不须要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明当前锁🔒被别的线程抢去了,返回false,若是须要阻塞也返回false。
成功获取写锁🔒后,将当前线程设置位占有写锁的线程,返回true。
方法流程图以下:
1.4.2 写锁的释放,tryRelease方法:
protected final boolean tryRelease(int releases) { //若锁的持有则不是当前线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //写锁的新线程数 int nextc = getState() - releases; //若是独占模式重入数为0了,说明独占模式被释放 boolean free = exclusiveCount(nextc) == 0; if (free) //若写锁的新线程数为0,则将锁的持有则设置为null setExclusiveOwnerThread(null); //设置写锁的新线程数,无论独占模式是否被释放,更新独占重入数 setState(nextc); return free; }
说明: 写锁的释放过程仍是相对而言比较简单的:首先查看当前线程是否为写锁的持有者,若是不是抛出异常。而后检查释放后写锁的线程数是否为0,若是为0则表示写锁空闲了,
释放锁资源将锁的持有线程设置为null,不然释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。 此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,
则抛出异常,不然,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,不然,表示资源还被占用。其方法流程图以下。
相似于写锁,读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。
protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //获取状态 int c = getState(); //若是写锁线程数!=0,且独占锁不是当前线程则返回失败,由于存在锁🔒降级 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁数量 int r = sharedCount(c); if (!readerShouldBlock() &&//读锁是否须要等待(公平锁原则) r < MAX_COUNT &&//持有线程小于最大数(65535) compareAndSetState(c, c + SHARED_UNIT)) {//设置读取锁状态 //r=0,表示第一个读锁线程,第一个读锁firstReader是不会加入到readHolds中 if (r == 0) { firstReader = current;//设置第一个读线程 firstReaderHoldCount = 1;//读线程占用的资源数为1 } else if (firstReader == current) {//当前线程为第一个读线程,表示第一个读锁线程重入 firstReaderHoldCount++;//占用资源数+1 } else {//读锁数量不为0,而且不为当前线程 HoldCounter rh = cachedHoldCounter;//获取计数器 if (rh == null || rh.tid != getThreadId(current))//计数器为空,或者计数器的tid不为当前正在运行的线程的tid cachedHoldCounter = rh = readHolds.get();//获取当前线程的计数器 else if (rh.count == 0)//计数器为0 readHolds.set(rh);//加入到readHolds中 rh.count++;//计数器+1 } return 1; } return fullTryAcquireShared(current); }
其中sharedCount方法表示占有读锁的线程数量,源码以下:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
说明:直接将state右移16位,就能够获得读锁的线程数量,由于state的高16位表示读锁,对应的第十六位表示写锁数量。
读锁获取锁的过程比写锁稍微复杂些,首先判断写锁是否为0而且当前线程不占有独占锁,直接返回;不然,判断读线程是否须要被阻塞而且读锁数量是否小于最大值而且比较设置状态成功,
若当前没有读锁,则设置第一个读线程firstReader和firstReaderHoldCount;若当前线程线程为第一个读线程,则增长firstReaderHoldCount;不然,将设置当前线程对应的HoldCounter对象的值。流程图以下。
注意:更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)本地线程副本中记录当前线程的重入数,若是当前只有一个线程的话,还不须要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,
当第二个线程来的时候,就要动用ThreadLocal变量的readHolds了,每一个线程拥有本身的副本,用来保存本身的重入数。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { // 无限循环
// 获取状态
int c = getState();
if (exclusiveCount(c) != 0) { // 写线程数量不为0
if (getExclusiveOwnerThread() != current) // 不为当前线程
return -1;
} else if (readerShouldBlock()) { // 写线程数量为0而且读线程被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // 当前线程为第一个读线程
// assert firstReaderHoldCount > 0;
} else { // 当前线程不为第一个读线程
if (rh == null) { // 计数器不为空
//
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较而且设置成功
if (sharedCount(c) == 0) { // 读线程数量为0
// 设置第一个读线程
firstReader = current;
//
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
说明:在tryAcquireShared函数中,若是下列三个条件不知足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操做能够成功。其逻辑与tryAcquireShared逻辑相似,再也不累赘。
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
if (firstReader == current) { // 当前线程为第一个读线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 读线程占用的资源数为1
firstReader = null;
else // 减小占用的资源
firstReaderHoldCount--;
} else { // 当前线程不为第一个读线程
// 获取缓存的计数器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
// 获取当前线程对应的计数器
rh = readHolds.get();
// 获取计数
int count = rh.count;
if (count <= 1) { // 计数小于等于1
// 移除
readHolds.remove();
if (count <= 0) // 计数小于等于0,抛出异常
throw unmatchedUnlockException();
}
// 减小计数
--rh.count;
}
for (;;) { // 无限循环
// 获取状态
int c = getState();
// 获取状态
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 比较并进行设置
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
说明:此方法表示读锁线程释放锁。首先判断当前线程是否为第一个读线程firstReader,如果,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,如果,则设置第一个读线程firstReader为空,不然,将第一个读线程占有的资源数firstReaderHoldCount减1;若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器,若是计数器的计数count小于等于1,则移除当前线程对应的计数器,若是计数器的计数count小于等于0,则抛出异常,以后再减小计数便可。不管何种状况,都会进入无限循环,该循环能够确保成功设置状态state。其流程图以下。
在读锁的获取、释放过程当中,老是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。
要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实咱们能够稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。
一次共享锁操做就至关于一次计数器的操做,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操做。因此HoldCounter的做用就是当前线程持有共享锁的数量,这个数量必需要与线程绑定在一块儿,不然操做其余线程锁就会抛出异常。
先看读锁获取锁的部分:
if (r == 0) {//r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//第一个读锁线程重入
firstReaderHoldCount++;
} else { //非firstReader计数
HoldCounter rh = cachedHoldCounter;//readHoldCounter缓存
//rh == null 或者 rh.tid != current.getId(),须要获取rh
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh); //加入到readHolds中
rh.count++; //计数+1
}
这里为何要搞一个firstRead、firstReaderHoldCount呢?而不是直接使用else那段代码?这是为了一个效率问题,firstReader是不会放入到readHolds中的,
若是读锁仅有一个的状况下就会避免查找readHolds。可能就看这个代码还不是很理解HoldCounter。咱们先看firstReader、firstReaderHoldCount的定义:
private transient Thread firstReader = null; private transient int firstReaderHoldCount;
这两个变量比较简单,一个表示线程,固然该线程是一个特殊的线程,一个是firstReader的重入计数。
HoldCounter的定义:
static final class HoldCounter { int count = 0; final long tid = Thread.currentThread().getId(); }
在HoldCounter中仅有count和tid两个变量,其中count表明着计数器,tid是线程的id。可是若是要将一个对象和线程绑定起来仅记录tid确定不够的,并且HoldCounter根本不能起到绑定对象的做用,只是记录线程tid而已。
诚然,在java中,咱们知道若是要将一个线程和对象绑定在一块儿只有ThreadLocal才能实现。因此以下:
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
ThreadLocalHoldCounter继承ThreadLocal,而且重写了initialValue方法。
故而,HoldCounter应该就是绑定线程上的一个计数器,而ThradLocalHoldCounter则是线程绑定的ThreadLocal。从上面咱们能够看到ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是不是当前线程。这样作的好处是能够减小ThreadLocal.get()的次数,由于这也是一个耗时操做。须要说明的是这样HoldCounter绑定线程id而不绑定线程对象的缘由是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们(尽管GC可以智能的发现这种引用而回收它们,可是这须要必定的代价),因此其实这样作只是为了帮助GC快速回收对象而已。
经过上面的源码分析,咱们能够发现一个现象:
在线程持有读锁的状况下,该线程不能取得写锁(由于获取写锁的时候,若是发现当前的读锁被占用,就立刻获取失败,无论读锁是否是被当前线程持有)。
在线程持有写锁的状况下,该线程能够继续获取读锁(获取读锁时若是发现写锁被占用,只有写锁没有被当前线程占用的状况才会获取失败)。
仔细想一想,这个设计是合理的:由于当线程获取读锁的时候,可能有其余线程同时也在持有读锁,所以不能把获取读锁的线程“升级”为写锁;而对于得到写锁的线程,它必定独占了读写锁,所以能够继续让它获取读锁,当它同时获取了写锁和读锁后,还能够先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁.
一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁能够“降级”为读锁;读锁不能“升级”为写锁。
Java并发编程之美