前几篇文章分析了线程池的原理,接下来研究锁的方面。显式锁ReentrantLock和同步工具类的实现基础都是AQS,因此合起来一齐研究。node
AQS便是AbstractQueuedSynchronizer,一个用来构建锁和同步工具的框架,包括经常使用的ReentrantLock、CountDownLatch、Semaphore等。并发
AQS没有锁之类的概念,它有个state变量,是个int类型,在不一样场合有着不一样含义。本文研究的是锁,为了好理解,姑且先把state当成锁。框架
AQS围绕state提供两种基本操做“获取”和“释放”,有条双向队列存放阻塞的等待线程,并提供一系列判断和处理方法,简单说几点:工具
至于线程是否能够得到state,如何释放state,就不是AQS关心的了,要由子类具体实现。性能
直接分析AQS的代码会比较难明白,因此结合子类ReentrantLock来分析。AQS的功能能够分为独占和共享,ReentrantLock实现了独占功能,是本文分析的目标。ui
Lock lock = new ReentranLock(); lock.lock(); try{ //do something }finally{ lock.unlock(); }
ReentrantLock实现了Lock接口,加锁和解锁都须要显式写出,注意必定要在适当时候unlock。this
和synchronized相比,ReentrantLock用起来会复杂一些。在基本的加锁和解锁上,二者是同样的,因此无特殊状况下,推荐使用synchronized。ReentrantLock的优点在于它更灵活、更强大,除了常规的lock()、unlock()以外,还有lockInterruptibly()、tryLock()方法,支持中断、超时。spa
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock的内部类Sync继承了AQS,分为公平锁FairSync和非公平锁NonfairSync。线程
ReentrantLock默认使用非公平锁是基于性能考虑,公平锁为了保证线程规规矩矩地排队,须要增长阻塞和唤醒的时间开销。若是直接插队获取非公平锁,跳过了对队列的处理,速度会更快。指针
final void lock() { acquire(1);} public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先来看公平锁的实现,lock方法很简单的一句话调用AQS的acquire方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
噢,AQS的tryAcquire不能直接调用,由于是否获取锁成功是由子类决定的,直接看ReentrantLock的tryAcquire的实现。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
获取锁成功分为两种状况,第一个if判断AQS的state是否等于0,表示锁没有人占有。接着,hasQueuedPredecessors判断队列是否有排在前面的线程在等待锁,没有的话调用compareAndSetState使用cas的方式修改state,传入的acquires写死是1。最后线程获取锁成功,setExclusiveOwnerThread将线程记录为独占锁的线程。
第二个if判断当前线程是否为独占锁的线程,由于ReentrantLock是可重入的,线程能够不停地lock来增长state的值,对应地须要unlock来解锁,直到state为零。
若是最后获取锁失败,下一步须要将线程加入到等待队列。
AQS内部有一条双向的队列存放等待线程,节点是Node对象。每一个Node维护了线程、先后Node的指针和等待状态等参数。
线程在加入队列以前,须要包装进Node,调用方法是addWaiter:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
每一个Node须要标记是独占的仍是共享的,由传入的mode决定,ReentrantLock天然是使用独占模式Node.EXCLUSIVE。
建立好Node后,若是队列不为空,使用cas的方式将Node加入到队列尾。注意,这里只执行了一次修改操做,而且可能由于并发的缘由失败。所以修改失败的状况和队列为空的状况,须要进入enq。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq是个死循环,保证Node必定能插入队列。注意到,当队列为空时,会先为头节点建立一个空的Node,由于头节点表明获取了锁的线程,如今尚未,因此先空着。
线程加入队列后,下一步是调用acquireQueued阻塞线程。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //1 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //2 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
标记1是线程唤醒后尝试获取锁的过程。若是前一个节点正好是head,表示本身排在第一位,能够立刻调用tryAcquire尝试。若是获取成功就简单了,直接修改本身为head。这步是实现公平锁的核心,保证释放锁时,由下个排队线程获取锁。(看到线程解锁时,再看回这里啦)
标记2是线程获取锁失败的处理。这个时候,线程可能等着下一次获取,也可能不想要了,Node变量waitState描述了线程的等待状态,一共四种状况:
static final int CANCELLED = 1; //取消 static final int SIGNAL = -1; //下个节点须要被唤醒 static final int CONDITION = -2; //线程在等待条件触发 static final int PROPAGATE = -3; //(共享锁)状态须要向后传播
shouldParkAfterFailedAcquire传入当前节点和前节点,根据前节点的状态,判断线程是否须要阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若是线程须要阻塞,由parkAndCheckInterrupt方法进行操做。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt使用了LockSupport,和cas同样,最终使用UNSAFE调用Native方法实现线程阻塞(之后有机会就分析下LockSupport的原理,park和unpark方法做用相似于wait和notify)。最后返回线程唤醒后的中断状态,关于中断,后文会分析。
到这里总结一下获取锁的过程:线程去竞争一个锁,可能成功也可能失败。成功就直接持有资源,不须要进入队列;失败的话进入队列阻塞,等待唤醒后再尝试竞争锁。
经过上面详细的获取锁过程分析,释放锁过程大概能够猜到:头节点是获取锁的线程,先移出队列,再通知后面的节点获取锁。
public void unlock() { sync.release(1); }
ReentrantLock的unlock方法很简单地调用了AQS的release:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
和lock的tryAcquire同样,unlock的tryRelease一样由ReentrantLock实现:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
由于锁是能够重入的,因此每次lock会让state加1,对应地每次unlock要让state减1,直到为0时将独占线程变量设置为空,返回标记是否完全释放锁。
最后,调用unparkSuccessor将头节点的下个节点唤醒:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
寻找下个待唤醒的线程是从队列尾向前查询的,找到线程后调用LockSupport的unpark方法唤醒线程。被唤醒的线程从新执行acquireQueued里的循环,就是上文关于acquireQueued标记1部分,线程从新尝试获取锁。
static void selfInterrupt() { Thread.currentThread().interrupt(); }
在acquire里还有最后一句代码调用了selfInterrupt,功能很简单,对当前线程产生一个中断请求。
为何要这样操做呢?由于LockSupport.park阻塞线程后,有两种可能被唤醒。
第一种状况,前节点是头节点,释放锁后,会调用LockSupport.unpark唤醒当前线程。整个过程没有涉及到中断,最终acquireQueued返回false时,不须要调用selfInterrupt。
第二种状况,LockSupport.park支持响应中断请求,可以被其余线程经过interrupt()唤醒。但这种唤醒并无用,由于线程前面可能还有等待线程,在acquireQueued的循环里,线程会再次被阻塞。parkAndCheckInterrupt返回的是Thread.interrupted(),不只返回中断状态,还会清除中断状态,保证阻塞线程忽略中断。最终acquireQueued返回true时,真正的中断状态已经被清除,须要调用selfInterrupt维持中断状态。
所以普通的lock方法并不能被其余线程中断,ReentrantLock是能够支持中断,须要使用lockInterruptibly。
二者的逻辑基本同样,不一样之处是parkAndCheckInterrupt返回true时,lockInterruptibly直接throw new InterruptedException()。
分析完公平锁的实现,还剩下非公平锁,主要区别是获取锁的过程不一样。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
在NonfairSync的lock方法里,第一步直接尝试将state修改成1,很明显,这是抢先获取锁的过程。若是修改state失败,则和公平锁同样,调用acquire。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
nonfairTryAcquire和tryAcquire乍一看几乎同样,差别只是缺乏调用hasQueuedPredecessors。这点体验出公平锁和非公平锁的不一样,公平锁会关注队列里排队的状况,老老实实按照FIFO的次序;非公平锁只要有机会就抢占,才无论排队的事。
从ReentrantLock的实现完整分析了AQS的独占功能,总的来说并不复杂。别忘了AQS还有共享功能,下一篇是--分析CountDownLatch的实现原理。
做者:展翅而飞 连接:https://www.jianshu.com/p/fe027772e156 来源:简书 简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。