前面已经讲解了AQS源码的独享模式,今天来讲一下AQS的共享模式
下面以CountDownLatch去讲解AQS的共享模式
首先讲下什么是CountDownLatch,CountDownLatch所描述的是”在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待“。在API中是这么说的:
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
先看CountDownLatch的例子
public
static
void
main(String[] args) {
final
CountDownLatch latch =
new
CountDownLatch(
2
);
new
Thread(){
public
void
run() {
try
{
System.out.println(
"线程1执行"
);
Thread.sleep(
5000
);
latch.countDown();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new
Thread(){
public
void
run() {
try
{
System.out.println(
"线程2执行"
);
Thread.sleep(
3000
);
latch.countDown();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new
Thread(){
public
void
run() {
try
{
System.out.println(
"线程3阻塞"
);
latch.await();
System.out.println(
"线程3继续执行"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try
{
Thread.sleep(
1000
);
System.out.println(
"主线程线程阻塞"
);
latch.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"主线程继续执行"
);
}
|
线程3 和主线程会加入到队列中
node1会判断前序节点是否是头结点,如果是前序节点是头节点 但是计数器不为0 则阻塞自己 并将waitstatus状态改为-1 即SIGNAL
node2 会判断当前节点是否为头结点,前序节点不是头结点 直接阻塞自己 并将waitstatus状态改为-1
如果计数器为零,就会把node1给唤醒,唤醒后 node1将自己的节点设置为头结点 并将节点waitstatus状态设置为 -3 PROPAGATE
然后继续执行for循环 这时候node2的前序节点是头结点,然后继续将节点node2设置为头结点,并将节点waitstatus状态设置为-3 即PROPAGATE
接着看CountDownLatch的源码
public
class
CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private
static
final
class
Sync
extends
AbstractQueuedSynchronizer {
private
static
final
long
serialVersionUID = 4982264981922014374L;
Sync(
int
count) {
setState(count);
}
int
getCount() {
return
getState();
}
protected
int
tryAcquireShared(
int
acquires) {
return
(getState() ==
0
) ?
1
: -
1
;
}
protected
boolean
tryReleaseShared(
int
releases) {
// Decrement count; signal when transition to zero
for
(;;) {
int
c = getState();
if
(c ==
0
)
return
false
;
int
nextc = c-
1
;
if
(compareAndSetState(c, nextc))
return
nextc ==
0
;
}
}
}
private
final
Sync sync;
//构造一个用给定计数初始化的 CountDownLatch
public
CountDownLatch(
int
count) {
if
(count <
0
)
throw
new
IllegalArgumentException(
"count < 0"
);
this
.sync =
new
Sync(count);
}
}
public
void
await()
throws
InterruptedException {
sync.acquireSharedInterruptibly(
1
);
}
public
boolean
await(
long
timeout, TimeUnit unit)
throws
InterruptedException {
return
sync.tryAcquireSharedNanos(
1
, unit.toNanos(timeout));
}
public
void
countDown() {
sync.releaseShared(
1
);
}
}
|
可以看出CountDownLatch内部依赖Sync实现,
Sync继承AQS。CountDownLatch仅提供了一个构造方法:
CountDownLatch(int count) : 构造一个用给定计数初始化的 CountDownLatch 设置count
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } }
Sync(int count) {
setState(count);
}
设置state是count
看countDown方法
public
void
countDown() {
sync.releaseShared(
1
);
}
public
final
boolean
releaseShared(
int
arg) {
if
(tryReleaseShared(arg)) {
//如果此线程是被等待线程里最后一个被释放的线程 就去通知同步等待队列里的节点
doReleaseShared();
return
true
;
}
return
false
;
}
|
再看tryReleaseShared方法
protected
boolean
tryReleaseShared(
int
releases) {
// Decrement count; signal when transition to zero
for
(;;) {
int
c = getState();
//获取计数器的值
if
(c ==
0
)
return
false
;
int
nextc = c-
1
;
//每个被等待的线程执行完计数器减1
if
(compareAndSetState(c, nextc))
//设置计数器的新值
return
nextc ==
0
;
//如果计数器为0 返回true
}
}
}
|
再看doReleaseShared方法
private
void
doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for
(;;) {
Node h = head;
if
(h !=
null
&& h != tail) {
int
ws = h.waitStatus;
//如果头结点是-1 (可以看下面wait方法有讲解,已经把头结点设置为-1了 所以会走
//f (ws == Node.SIGNAL) 这一步
if
(ws == Node.SIGNAL) {
if
(!compareAndSetWaitStatus(h, Node.SIGNAL,
0
))
//把头结点再设置为0 不成功自旋操作,直到设置成功
continue
; i
// loop to recheck cases
unparkSuccessor(h);
//唤醒节点
}
else
if
(ws ==
0
&&
!compareAndSetWaitStatus(h,
0
, Node.PROPAGATE))
continue
;
// loop on failed CAS
}
if
(h == head)
// loop if head changed
break
;
}
}
|
再看unparkSuccessor方法
private
void
unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int
ws = node.waitStatus;
//因为头结点已经设置为0了,所以ws<0不满足
if
(ws <
0
)
compareAndSetWaitStatus(node, ws,
0
);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if
(s ==
null
|| s.waitStatus >
0
) {
//这一步也不满足,可以看下面wait方法里有讲解 头结点的后续节点的status都是-1
//所以这一步不满足 直接走LockSupport.unpark(s.thread);唤醒头结点的下一个节点
s =
null
;
//如果waitstatus>0说明 节点取消了 就找下一个waitstatus是-1的节点 并唤醒
for
(Node t = tail; t !=
null
&& t != node; t = t.prev)
if
(t.waitStatus <=
0
)
s = t;
}
if
(s !=
null
)
LockSupport.unpark(s.thread);
}
|
再看wait方法
public
final
void
acquireSharedInterruptibly(
int
arg)
throws
InterruptedException {
if
(Thread.interrupted())
throw
new
InterruptedException();
if
(tryAcquireShared(arg) <
0
)
//尝试获取锁,获取失败就执行下面的方法
doAcquireSharedInterruptibly(arg);
}
|
看tryAcquireShared方法
protected
int
tryAcquireShared(
int
acquires) {
return
(getState() ==
0
) ?
1
: -
1
;
}
|
如果state是0,说明被等待的线程全都执行完了 。return -1说明没有执行完
再看doAcquireSharedInterruptibly方法
private
void
doAcquireSharedInterruptibly(
int
arg)
throws
InterruptedException {
final
Node node = addWaiter(Node.SHARED);
//如果队列是空的,就新建一个头节点,头节点指向尾节点,
//然后再新建一个节点放在头节点后面 如果队列不为空,就在尾节点后面新建一个节点。节点是shared类型的
//队列节点的waitStatus默认是0 因为上篇AQS源码一种有讲解,就不讲那么多了
boolean
failed =
true
;
try
{
for
(;;) {
//开启自旋
final
Node p = node.predecessor();
if
(p == head) {
//如果新建节点的前序节点是头节点,而且state的值为0 就走到setHeadAndPropagate方法
int
r = tryAcquireShared(arg);
if
(r >=
0
) {
//如果被等待的线程执行完了
setHeadAndPropagate(node, r);
//把当前节点设置为头节点,而且唤醒后续挂起的节点
p.next =
null
;
// help GC
failed =
false
;
return
p.next =
null
;
// help GC
failed =
false
;
return
;
}
}
if
(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果当前节点的前序节点不是头节点或者计数器不等于0,就阻塞当前节点
throw
throw
new
InterruptedException();
}
}
finally
{
if
(failed)
cancelAcquire(node);
}
}
|
再看shouldParkAfterFailedAcquire方法
}
finally
|