ReentrantReadWriteLock便可重入读写锁,一样也依赖于AQS来实现。在介绍ReentrantLock咱们知道其依托AQS的同步状态来判断锁是否占有,而ReentrantReadWriteLock既有读锁又有写锁,是如何依靠一个状态来维持的?
编程
ReentrantReadWriteLock读写锁,与ReentrantLock同样默认非公平,内部定义了读锁ReadLock()和写锁WriteLock(),在同一时间容许被多个读线程访问,但在写线程访问时,全部读线程和写线程都会被阻塞。读写锁主要特性:公平性、可重入性、锁降级安全
写锁是一个支持重进入的排它锁,其获取的核心方法:并发
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取ReentrantReadWriteLock锁总体同步状态
int c = getState();
// 获取写锁同步状态
int w = exclusiveCount(c);
// 存在读锁或写锁
if (c != 0) {
// c != 0 && w == 0 即若存在读锁或写锁持有线程不是当前线程,获取写锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 最多65535次重入,若超过报错
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 可重入,设置同步状态
setState(c + acquires);
return true;
}
// 公平与非公平,同步队列是否有节点,同时cas设置状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置获取锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
复制代码
从源码中咱们能够发现getState()获取的是读锁与写锁总同步状态,再经过exclusiveCount()方法单独获取写锁同步状态post
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
复制代码
ReentrantReadWriteLock经过按位切割state变量,同步状态的低16位表示写锁获取次数,高16位表示读锁获取次数,如图示意 ui
因此这解释了为何写锁获取次数最多65535次
spa
写锁获取总体思路:当读锁已经被读线程获取或者写锁已经被其余写线程获取,则写锁获取失败;不然,获取成功并可重入,增长写锁同步状态线程
protected final boolean tryRelease(int releases) {
// 若释放的线程不为锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 从新设置同步状态
int nextc = getState() - releases;
// 若新的写锁持有线程数为0,则将锁的持有线程置为null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 更新同步状态
setState(nextc);
return free;
}
复制代码
写锁的释放与ReentrantLock的释放过程基本相似,每次释放均减小写状态,当写状态为0 时表示写锁已被释放,从而等待的读写线程可以继续访问读写锁,同时前次写线程的修改对后续读写线程可见
读锁相对于写锁(独占锁或排他锁),读锁是一个支持重进入的共享锁,它可以被多个线程同时获取,在没有其余写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所作的也只是(线程安全的)增长读状态
3d
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState();
// 若写锁已被占有,且写锁占有线程不是当前线程,读锁获取失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁状态
int r = sharedCount(c);
// 判断读锁是否须要公平,读锁持有线程数是否小于极值,CAS设置读锁状态
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 若读锁未被线程占有,则更新firstReader和firstReaderHoldCount
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 若是获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
复制代码
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 若写锁已被占有,且写锁占有线程不是当前线程
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 公平性
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 读锁占有线程达到极值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// cas设置成功
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 若读锁未被线程占有,则更新firstReader和firstReaderHoldCount
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 若是获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
复制代码
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 若当前线程为第一个获取读锁的线程
if (firstReader == current) {
// 若只有获取一次,将firstReader置为null
if (firstReaderHoldCount == 1)
firstReader = null;
// 若屡次,firstReaderHoldCount-1
else
firstReaderHoldCount--;
} else {
// 更新当前线程获取锁次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 自旋CAS更新读锁同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
code
static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }复制代码static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient HoldCounter cachedHoldCounter; private transient int firstReaderHoldCount; private transient Thread firstReader = null; 复制代码复制代码static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient HoldCounter cachedHoldCounter; private transient int firstReaderHoldCount; private transient Thread firstReader = null; 复制代码
锁降级指的是写锁降级成为读锁,即先获取写锁、获取读锁在释放写锁的过程,目的为了保证数据的可见性。假设有两个线程A、B,若线程A获取到写锁,不获取读锁而是直接释放写锁,这时线程B获取了写锁并修改了数据,那么线程A没法知道线程B的数据更新。若是线程A获取读锁,即遵循锁降级的步骤,则线程B将会被阻塞,直到线程A使用数据并释放读锁以后,线程B才能获取写锁进行数据更新。cdn
当有线程获取读锁时,不容许再有线程得到写锁
当有线程得到写锁时,不容许其余线程得到读锁和写锁
写锁能降级为读锁,读锁没法升级成写锁
《Java并发编程的艺术》 http://cmsblogs.com/?p=2213