愿我所遇之人,所历之事,哪怕由于我有一点点变好,我就心满意足了。html
AQS:AbstractQueuedSynchronizerjava
贯穿全文的图(核心):node
模板方法设计模式:定义一个操做中算法的骨架,而将一些步骤的实现延迟到子类中。git
等待队列是CLH(Craig, Landin, and Hagersten)锁队列。github
经过节点中的“状态”字段来判断一个线程是否应该阻塞。当该节点的前一个节点释放锁的时候,该节点会被唤醒。算法
private transient volatile Node head;
private transient volatile Node tail;
//The synchronization state.
//在互斥锁中它表示着线程是否已经获取了锁,0未获取,1已经获取了,大于1表示重入数。
private volatile int state;
复制代码
AQS维护了一个volatile int state(表明共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。segmentfault
state的访问方式有三种:设计模式
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。安全
不一样的自定义同步器争用共享资源的方式也不一样。自定义同步器在实现时只须要实现共享资源state的获取与释放方式便可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。bash
自定义同步器实现时主要实现如下几种方法:
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其余线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。固然,释放锁以前,A线程本身是能够重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每一个子线程执行完后countDown()一次,state会CAS减1。等到全部子线程都执行完后(即state=0),会unpark()主调用线程,而后主调用线程就会从await()函数返回,继续后续动做。
通常来讲,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现: tryAcquire-tryRelease tryAcquireShared-tryReleaseShared 中的一种便可。
固然AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
如下部分来自源码注释:
每次进入CLH队列时,须要对尾节点进入队列过程,是一个原子性操做。在出队列时,咱们只须要更新head节点便可。在节点肯定它的后继节点时, 须要花一些功夫,用于处理那些,因为等待超时时间结束或中断等缘由, 而取消等待锁的线程。
节点的前驱指针,主要用于处理,取消等待锁的线程。若是一个节点取消等待锁,则此节点的前驱节点的后继指针,要指向,此节点后继节点中,非取消等待锁的线程(有效等待锁的线程节点)。
咱们用next指针链接实现阻塞机制。每一个节点均持有本身线程,节点经过节点的后继链接唤醒其后继节点。
CLH队列须要一个傀儡结点做为开始节点。咱们不会再构造函数中建立它,由于若是没有线程竞争锁,那么,努力就白费了。取而代之的方案是,当有第一个竞争者时,咱们才构造头指针和尾指针。
线程经过同一节点等待条件,可是用另一个链接。条件只须要放在一个非并发的链接队列与节点关联,由于只有当线程独占持有锁的时候,才会去访问条件。当一个线程等待条件的时候,节点将会插入到条件队列中。当条件触发时,节点将会转移到主队列中。用一个状态值,描述节点在哪个队列上。
static final class Node {
//该等待节点处于共享模式
static final Node SHARED = new Node();
//该等待节点处于独占模式
static final Node EXCLUSIVE = null;
//表示节点的线程是已被取消的
static final int CANCELLED = 1;
//表示当前节点的后继节点的线程须要被唤醒
static final int SIGNAL = -1;
//表示线程正在等待某个条件
static final int CONDITION = -2;
//表示下一个共享模式的节点应该无条件的传播下去
static final int PROPAGATE = -3;
//状态位 ,分别可使CANCELLED、SINGNAL、CONDITION、PROPAGATE、0
volatile int waitStatus;
volatile Node prev;//前驱节点
volatile Node next;//后继节点
volatile Thread thread;//等待锁的线程
//ConditionObject链表的后继节点或者表明共享模式的节点。
//由于Condition队列只能在独占模式下被能被访问,咱们只须要简单的使用链表队列来连接正在等待条件的节点。
//而后它们会被转移到同步队列(AQS队列)再次从新获取。
//因为条件队列只能在独占模式下使用,因此咱们要表示共享模式的节点的话只要使用特殊值SHARED来标明便可。
Node nextWaiter;
//Returns true if node is waiting in shared mode
final boolean isShared() {
return nextWaiter == SHARED;
}
.......
}
复制代码
waitStatus不一样值含义:
该状态值为了简便使用,因此使用了数值类型。非负数值意味着该节点不须要被唤醒。因此,大多数代码中不须要检查该状态值的肯定值。
一个正常的Node,它的waitStatus初始化值是0。若是想要修改这个值,可使用AQS提供CAS进行修改。
在锁的获取时,并不必定只有一个线程才能持有这个锁(或者称为同步状态),因此此时有了独占模式和共享模式的区别,也就是在Node节点中由nextWaiter来标识。好比ReentrantLock就是一个独占锁,只能有一个线程得到锁,而WriteAndReadLock的读锁则能由多个线程同时获取,但它的写锁则只能由一个线程持有。
//忽略中断的(即不手动抛出InterruptedException异常)独占模式下的获取方法。
//该方法在成功返回前至少会调用一次tryAcquire()方法(该方法是子类重写的方法,若是返回true则表明能成功获取).
//不然当前线程会进入队列排队,重复的阻塞和唤醒等待再次成功获取后返回,
//该方法能够用来实现Lock.lock
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
该方法首先尝试获取锁( tryAcquire(arg)的具体实现定义在了子类中),若是获取到,则执行完毕,不然经过addWaiter(Node.EXCLUSIVE), arg)方法把当前节点添加到等待队列末尾,并设置为独占模式。
private Node addWaiter(Node mode) {
//把当前线程包装为node,设为独占模式
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队,即无竞争条件下确定成功。若是失败,则进入enq自旋重试入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS替换当前尾部。成功则返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//插入节点到队列中,若是队列未初始化则初始化,而后再插入。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
若是tail节点为空,执行enq(node);从新尝试,最终把node插入.在把node插入队列末尾后,它并不当即挂起该节点中线程,由于在插入它的过程当中,前面的线程可能已经执行完成,因此它会先进行自旋操做acquireQueued(node, arg),尝试让该线程从新获取锁!当条件知足获取到了锁则能够从自旋过程当中退出,不然继续。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//若是它的前继节点为头结点,尝试获取锁,获取成功则返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断当前节点的线程是否应该被挂起,若是应该被挂起则挂起。
//等待release唤醒释放
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//在队列中取消当前节点
cancelAcquire(node);
}
}
复制代码
若是没获取到锁,则判断是否应该挂起,而这个判断则得经过它的前驱节点的waitStatus来肯定:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//该节点若是状态若是为SIGNAL。则返回true,而后park挂起线程
if (ws == Node.SIGNAL)
return true;
//代表该节点已经被取消,向前循环从新调整链表节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//执行到这里表明节点是0或者PROPAGATE,而后标记他们为SIGNAL,可是
//还不能park挂起线程。须要重试是否能获取,若是不能,则挂起。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//挂起当前线程,且返回线程的中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
最后,咱们对获取独占式锁过程对作个总结:
AQS的模板方法acquire经过调用子类自定义实现的tryAcquire获取同步状态失败后->将线程构形成Node节点(addWaiter)->将Node节点添加到同步队列对尾(addWaiter)->节点以自旋的方法获取同步状态(acquirQueued)。在节点自旋获取同步状态时,只有其前驱节点是头节点的时候才会尝试获取同步状态,若是该节点的前驱不是头节点或者该节点的前驱节点是头节点单获取同步状态失败,则判断当前线程须要阻塞,若是须要阻塞则须要被唤醒事后才返回。
获取锁的过程:
既然是释放,那确定是持有锁的该线程执行释放操做,即head节点中的线程释放锁.
AQS中的release释放同步状态和acquire获取同步状态同样,都是模板方法,tryRelease释放的具体操做都有子类去实现,父类AQS只提供一个算法骨架。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//若是node的后继节点不为空且不是做废状态,则唤醒这个后继节点,
//不然从末尾开始寻找合适的节点,若是找到,则唤醒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
过程:首先调用子类的tryRelease()方法释放锁,而后唤醒后继节点,在唤醒的过程当中,须要判断后继节点是否知足状况,若是后继节点不为空且不是做废状态,则唤醒这个后继节点,不然从tail节点向前寻找合适的节点,若是找到,则唤醒。
释放锁过程:
java.util.concurrent中的不少可阻塞类(好比ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。
JDK中AQS被普遍使用,基于AQS实现的同步器包括:
每个基于AQS实现的同步器都会包含两种类型的操做,以下:
基于“复合优先于继承”的原则,基于AQS实现的同步器通常都是:声明一个内部私有的继承于AQS的子类Sync,对同步器全部公有方法的调用都会委托给这个内部子类。
后面会推出如下有关AQS的文章,已加深对于AQS的理解
本文不少内容整理自网络,参考文献: segmentfault.com/a/119000001… segmentfault.com/a/119000001… zhuanlan.zhihu.com/p/27134110 blog.csdn.net/wojiaolinaa… www.cnblogs.com/waterystone…
FIFO队列:www.cnblogs.com/waterystone…
我的微信公众号: