公平模式ReentrantLock实现原理
前面的文章研究了AbstractQueuedSynchronizer的独占锁和共享锁,有了前两篇文章的基础,就能够乘胜追击,看一下基于AbstractQueuedSynchronizer的并发类是如何实现的。java
ReentrantLock显然是一种独占锁,首先是公平模式的ReentrantLock,Sync是ReentractLock中的基础类,继承自AbstractQueuedSynchronizer,看一下代码实现:算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
abstract
static
class
Sync
extends
AbstractQueuedSynchronizer {
private
static
final
long
serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract
void
lock();
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final
boolean
nonfairTryAcquire(
int
acquires) {
final
Thread current = Thread.currentThread();
int
c = getState();
if
(c ==
0
) {
if
(compareAndSetState(
0
, acquires)) {
setExclusiveOwnerThread(current);
return
true
;
}
}
else
if
(current == getExclusiveOwnerThread()) {
int
nextc = c + acquires;
if
(nextc <
0
)
// overflow
throw
new
Error(
"Maximum lock count exceeded"
);
setState(nextc);
return
true
;
}
return
false
;
}
protected
final
boolean
tryRelease(
int
releases) {
int
c = getState() - releases;
if
(Thread.currentThread() != getExclusiveOwnerThread())
throw
new
IllegalMonitorStateException();
boolean
free =
false
;
if
(c ==
0
) {
free =
true
;
setExclusiveOwnerThread(
null
);
}
setState(c);
return
free;
}
protected
final
boolean
isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return
getExclusiveOwnerThread() == Thread.currentThread();
}
final
ConditionObject newCondition() {
return
new
ConditionObject();
}
// Methods relayed from outer class
final
Thread getOwner() {
return
getState() ==
0
?
null
: getExclusiveOwnerThread();
}
final
int
getHoldCount() {
return
isHeldExclusively() ? getState() :
0
;
}
final
boolean
isLocked() {
return
getState() !=
0
;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private
void
readObject(java.io.ObjectInputStream s)
throws
java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(
0
);
// reset to unlocked state
}
}
|
Sync属于一个公共类,它是抽象的说明Sync会被继承,简单整理一下Sync主要作了哪些事(由于Sync不是ReentrantLock公平锁的关键):多线程
- 定义了一个lock方法让子类去实现,咱们平时之因此能调用ReentrantLock的lock()方法,就是由于Sync定义了它
- 实现了非公平锁tryAcquira的方法
- 实现了tryRelease方法,比较简单,状态-1,独占锁的线程置空
- 实现了isHeldExclusively方法
- 定义了newCondition方法,让开发者能够利用Condition实现通知/等待
接着,看一下公平锁的实现,FairSync类,它继承自Sync:并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
static
final
class
FairSync
extends
Sync {
private
static
final
long
serialVersionUID = -3000897897090466540L;
final
void
lock() {
acquire(
1
);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected
final
boolean
tryAcquire(
int
acquires) {
final
Thread current = Thread.currentThread();
int
c = getState();
if
(c ==
0
) {
if
(!hasQueuedPredecessors() &&
compareAndSetState(
0
, acquires)) {
setExclusiveOwnerThread(current);
return
true
;
}
}
else
if
(current == getExclusiveOwnerThread()) {
int
nextc = c + acquires;
if
(nextc <
0
)
throw
new
Error(
"Maximum lock count exceeded"
);
setState(nextc);
return
true
;
}
return
false
;
}
}
|
整理一下要点:less
1. 每次acquire的时候,state+1,若是当前线程lock()以后又lock()了,state不断+1,相应的unlock()的时候state-1,直到将state减到0为之,说明当前线程释放完全部的状态,其它线程能够竞争性能
2. state=0的时候,经过hasQueuedPredecessors方法作一次判断,hasQueuedPredecessors的实现为”h != t && ((s = h.next) == null || s.thread != Thread.currentThread());”,其中h是head、t是tail,因为代码中对结果取反,所以取反以后的判断为”h == t || ((s = h.next) != null && s.thread == Thread.currentThread());”,总结起来有两种状况能够经过!hasQueuedPredecessors()这个判断:ui
- h==t,h==t的状况为要么当前FIFO队列中没有任何数据,要么只构建出了一个head还没日后面连过任何一个Node,所以head就是tail
- (s = h.next) != null && s.thread == Thread.currentThread(),当前线程为正在等待的第一个Node中的线程
3. 若是没有线程比当前线程等待更久去执行acquire操做,那么经过CAS操做将state从0变为1的线程tryAcquire成功this
4. 没有tryAcquire成功的线程,按照tryAcquire的前后顺序,构建为一个FIFO队列,即第一个tryAcquire失败的排在head的后一位,第二个tryAcquire失败的排在head的后二位spa
5. 当tryAcquire成功的线程release完毕,第一个tryAcquire失败的线程第一个尝试tryAcquire,这就是先到先得,典型的公平锁线程
非公平模式ReentrantLock实现原理
看完了公平模式ReentrantLock,接着咱们看一下非公平模式ReentrantLock是如何实现的。NonfairSync类,一样是继承自Sync类,实现为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
static
final
class
NonfairSync
extends
Sync {
private
static
final
long
serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final
void
lock() {
if
(compareAndSetState(
0
,
1
))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(
1
);
}
protected
final
boolean
tryAcquire(
int
acquires) {
return
nonfairTryAcquire(acquires);
}
}
|
结合nonfairTryAcquire方法一块儿讲解,nonfairTryAcquire方法的实现为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
final
boolean
nonfairTryAcquire(
int
acquires) {
final
Thread current = Thread.currentThread();
int
c = getState();
if
(c ==
0
) {
if
(compareAndSetState(
0
, acquires)) {
setExclusiveOwnerThread(current);
return
true
;
}
}
else
if
(current == getExclusiveOwnerThread()) {
int
nextc = c + acquires;
if
(nextc <
0
)
// overflow
throw
new
Error(
"Maximum lock count exceeded"
);
setState(nextc);
return
true
;
}
return
false
;
}
|
看到差异就在于非公平锁lock()的时候会先尝试经过CAS看看能不能把state从0变为1(即获取锁),若是能够的话,直接获取锁而不须要排队。举个实际例子就很好理解了:
- 线程一、线程二、线程3竞争锁,线程1竞争成功获取锁,线程二、线程3依次排队
- 线程1执行完毕,释放锁,state变为0,唤醒了第一个排队的线程2
- 此时线程4来尝试获取锁了,因为线程2被唤醒了,所以线程2与线程4竞争锁
- 线程4成功将state从0变为1,线程2竞争锁失败,继续park
看到整个过程当中,后来的线程4反而比先来的线程2先获取锁,至关因而一种非公平的模式,
那为何非公平锁效率会比公平锁效率高?上面第(3)步若是线程2和线程4不竞争锁就是答案。为何这么说,后面的解释很重要,但愿你们能够理解:
线程1是先将state设为0,再去唤醒线程2,这两个过程之间是有时间差的。
那么若是线程1将state设置为0的时候,线程4就经过CAS算法获取到了锁,且在线程1唤醒线程2以前就已经使用完毕锁,那么至关于线程2获取锁的时间并无推迟,在线程1将state设置为0到线程1唤醒线程2的这段时间里,反而有线程4获取了锁执行了任务,这就增长了系统的吞吐量,至关于单位时间处理了更多的任务。
从这段解释咱们也应该能看出来了,非公平锁比较适合加锁时间比较短的任务。这是由于加锁时间长,至关于线程2将state设为0并去唤醒线程2的这段时间,线程4没法完成释放锁,那么线程2被唤醒因为无法获取到锁,又被阻塞了,这种唤醒-阻塞的操做会引发线程的上下文切换,继而影响系统的性能。
Semaphore实现原理
Semaphore即信号量,用于控制代码块的并发数,将Semaphore的permits设置为1至关于就是synchronized或者ReentrantLock,Semaphore具体用法可见Java多线程19:多线程下的其余组件之CountDownLatch、Semaphore、Exchanger。信号量容许多条线程获取锁,显然它的锁是一种共享锁,信号量也有公平模式与非公平模式,相信看懂了上面ReentrantLock的公平模式与非公平模式的朋友应该对Semaphore的公平模式与非公平模式理解起来会更快,这里就放在一块儿写了。
首先仍是看一下Semaphore的基础设施,它和ReentrantLock同样,也有一个Sync:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
abstract
static
class
Sync
extends
AbstractQueuedSynchronizer {
private
static
final
long
serialVersionUID = 1192457210091910933L;
Sync(
int
permits) {
setState(permits);
}
final
int
getPermits() {
return
getState();
}
final
int
nonfairTryAcquireShared(
int
acquires) {
for
(;;) {
int
available = getState();
int
remaining = available - acquires;
if
(remaining <
0
||
compareAndSetState(available, remaining))
return
remaining;
}
}
protected
final
boolean
tryReleaseShared(
int
releases) {
for
(;;) {
int
current = getState();
int
next = current + releases;
if
(next < current)
// overflow
throw
new
Error(
"Maximum permit count exceeded"
);
if
(compareAndSetState(current, next))
return
true
;
}
}
final
void
reducePermits(
int
reductions) {
for
(;;) {
int
current = getState();
int
next = current - reductions;
if
(next > current)
// underflow
throw
new
Error(
"Permit count underflow"
);
if
(compareAndSetState(current, next))
return
;
}
}
final
int
drainPermits() {
for
(;;) {
int
current = getState();
if
(current ==
0
|| compareAndSetState(current,
0
))
return
current;
}
}
}
|
和ReentrantLock的Sync差很少,Semaphore的Sync定义了如下的一些主要内容:
- getPermits方法获取当前的许可剩余量还剩多少,即还有多少线程能够同时得到信号量
- 定义了非公平信号量获取共享锁的逻辑nonfairTryAcquireShared
- 定义了公平模式释放信号量的逻辑tryReleaseShared,至关于释放一次信号量,state就向上+1(信号量每次的获取与释放都是以1为单位的)
再看下公平信号量的实现,一样的FairSync,继承自Sync,代码为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
static
final
class
FairSync
extends
Sync {
private
static
final
long
serialVersionUID = 2014338818796000944L;
FairSync(
int
permits) {
super
(permits);
}
protected
int
tryAcquireShared(
int
acquires) {
for
(;;) {
if
(hasQueuedPredecessors())
return
-
1
;
int
available = getState();
int
remaining = available - acquires;
if
(remaining <
0
||
compareAndSetState(available, remaining))
return
remaining;
}
}
}
|
首先第10行的hasQueuedPredecessors方法,前面已经说过了,若是已经有了FIFO队列或者当前线程不是FIFO队列中在等待的第一条线程,返回-1,表示没法获取共享锁成功。
接着获取available,available就是state,用volatile修饰,因此线程中能够看到最新的state,信号量的acquires是1,每次获取信号量都对state-1,两种状况直接返回:
- remaining减完<0
- 经过cas设置成功
以后就是和以前说过的共享锁的逻辑了,若是返回的是一个<0的数字,那么构建FIFO队列,线程阻塞,直到前面的执行完才能唤醒后面的。
接着看一下非公平信号量的实现,NonfairSync继承Sync:
1
2
3
4
5
6
7
8
9
10
11
|
static
final
class
NonfairSync
extends
Sync {
private
static
final
long
serialVersionUID = -2694183684443567898L;
NonfairSync(
int
permits) {
super
(permits);
}
protected
int
tryAcquireShared(
int
acquires) {
return
nonfairTryAcquireShared(acquires);
}
}
|
nonfairTryAcquireShared在父类已经实现了,再贴一下代码:
1
2
3
4
5
6
7
8
9
|
final
int
nonfairTryAcquireShared(
int
acquires) {
for
(;;) {
int
available = getState();
int
remaining = available - acquires;
if
(remaining <
0
||
compareAndSetState(available, remaining))
return
remaining;
}
}
|
看到这里和公平Semaphore只有一点差异:不会前置进行一次hasQueuedPredecessors()判断。即当前有没有构建为一个FIFO队列,队列里面第一个等待的线程是否是自身都无所谓,对于非公平Semaphore都同样,反正线程调用Semaphore的acquire方法就将当前state-1,若是获得的remaining设置成功或者CAS操做成功就返回,这种操做没有遵循先到先得的原则,即非公平信号量。
至于非公平信号量对比公平信号量的优势,和ReentrantLock的非公平锁对比ReentrantLock的公平锁同样,就不说了。
CountDownLatch实现原理
CountDownLatch即计数器自减的一种闭锁,某线程阻塞,对一个计数器自减到0,此线程被唤醒,CountDownLatch具体用法可见Java多线程19:多线程下的其余组件之CountDownLatch、Semaphore、Exchanger。
CountDownLatch是一种共享锁,经过await()方法与countDown()两个方法实现自身的功能,首先看一下await()方法的实现:
1
2
3
|
public
void
await()
throws
InterruptedException {
sync.acquireSharedInterruptibly(
1
);
}
|
acquireSharedInterruptibly最终又回到tryAcquireShared方法上,直接贴整个Sync的代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private
static
final
class
Sync
extends
AbstractQueuedSynchronizer {
private
static
final
long
serialVersionUID = 4982264981922014374L;
Sync(
int
count) {
setState(count);
}
int
getCount() {
return
getState();
}
protected
int
tryAcquireShared(
int
acquires) {
return
(getState() ==
0
) ?
1
: -
1
;
}
protected
boolean
tryReleaseShared(
int
releases) {
// Decrement count; signal when transition to zero
for
(;;) {
int
c = getState();
if
(c ==
0
)
return
false
;
int
nextc = c-
1
;
if
(compareAndSetState(c, nextc))
return
nextc ==
0
;
}
}
}
|
其实看到tryAcquireShared方法,理解AbstractQueuedSynchronizer共享锁原理的,不用看countDown方法应该都能猜countDown方法是如何实现的。我这里总结一下:
- 传入一个count,state就等于count,await的时候判断是否是0,是0返回1表示成功,不是0返回-1表示失败,构建FIFO队列,head头只链接一个Node,Node中的线程就是调用CountDownLatch的await()方法的线程
- 每次countDown的时候对state-1,直到state减到0的时候才算tryReleaseShared成功,tryReleaseShared成功,唤醒被挂起的线程
为了验证(2),看一下上面Sync的tryReleaseShared方法就能够了,确实是这么实现的。
再理解独占锁与共享锁的区别
本文详细分析了ReentrantLock、Semaphore、CountDownLatch的实现原理,第一个是基于独占锁的实现,后两个是基于共享锁的实现,从这三个类咱们能够再总结一下独占锁与共享锁的区别,主要在两点上:
- 独占锁同时只有一条线程能够acquire成功,独占锁同时可能有多条线程能够acquire成功,Semaphore是典型例子;
- 独占锁每次只能唤醒一个Node,共享锁每次唤醒的时候能够将状态向后传播,便可能唤醒多个Node,CountDownLatch是典型例子。
带着这两个结论再看ReentrantLock、Semaphore、CountDownLatch,你必定会对独占锁与共享锁理解更深。