最近打算将jdk的Lock系列详细的分析一下,解决如下几个问题:node
Lock和synchronized是对应的,Condition和Object的监控方法(wait、notify)是对应的安全
先来大体看下接口方法并发
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
lock:在获取锁以前一直阻塞,而且不接受线程中断响应app
lockInterruptibly:在获取锁以前一直阻塞,可是接受线程中断响应,即一旦该线程被中断,会退出阻塞,抛出InterruptedException异常,该方法执行结束less
tryLock:尝试获取锁,无论成不成功立马返回,返回结果表明是否成功获取了锁ide
tryLock(long time, TimeUnit unit):在一段时间内不断尝试获取锁,若是超时还未获取锁则抛出InterruptedException异常,该方法执行结束ui
unlock:释放锁,要和上面的获取锁方法成对出现this
newCondition:使用该锁获取一个Condition,Condition是和生产它的锁是息息相关,绑定在一块儿的。线程
##2.2 Condition接口code
先来大体看下接口方法
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
就await和signal方法,和Object的wait、notify是相对应的。
await:阻塞,一直等待到该对象的signal或者signalAll被调用,退出阻塞,或者该线程被中断,抛出InterruptedException异常,方法执行结束
awaitUninterruptibly:和上面差很少,可是不响应线程的中断,即线程中断对它没影响
await(long time, TimeUnit unit)和awaitUntil(Date deadline):等待一段时间,一旦超时或者线程被中断,都会抛出InterruptedException异常,方法执行结束
signal:唤醒一个处于await的线程,这里都是针对同一个Condition对象来讲的
signalAll:唤醒全部处于await的线程,这里都是针对同一个Condition对象来讲的
咱们常常会据说AQS,全称就是这个AbstractQueuedSynchronizer。它在锁中扮演什么角色呢?
Lock的接口方法是针对用户的使用而定义的,咱们在实现Lock的时候,就须要作以下事情,重点关注下这些事情的共性和异性
指明什么状况下才叫获取到锁:如独占锁,一旦有人占据了,就不能获取到锁。如共享锁,有人占据,可是没超过限制也能获取锁。这一部分应该是锁的实现的业务代码,每种锁都有本身的业务逻辑。这一部分其实就是AbstractQueuedSynchronizer对子类留出的tryAcquire方法
获取不到锁的时候该如何处理呢:咱们固然但愿它们可以继续等待,有一个队列就最好不过了,一旦获取锁失败就加入到等待队列中排队,队列中的等待者依次再去竞争获取锁。这一部分代码其实就和哪一种锁没太大关系了,因此应该是锁的共性部分,这一部分其实就是AbstractQueuedSynchronizer实现的共性部分acquireQueued
那么针对Lock接口定义的lock方法实现逻辑就通常以下了:
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
表明着先试着获取一下锁,若是获取不成功,就把以后的处理逻辑交给AbstractQueuedSynchronizer来处理。反正你只管告诉AbstractQueuedSynchronizer它怎样才叫获取到锁,其余的等待处理逻辑你就能够不用再关心了。
以上也仅仅是部份内容,下面来全面看下AbstractQueuedSynchronizer对外留出的接口和已实现的共性部分
对外留出的接口:
tryAcquire:该方法向AQS解释了怎么才叫获取一把独占锁
tryRelease:该方法向AQS解释了怎么才叫释放一把独占锁
tryAcquireShared:该方法向AQS解释了怎么才叫获取一把共享锁
tryReleaseShared:该方法向AQS解释了怎么才叫释放一把共享锁
这些内容就是各类锁自己的业务逻辑,属于异性部分。
来看看锁的共性部分相关方法:
acquire:获取一把独占锁的过程
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
就是先拿锁实现的tryAcquire方法去尝试获取独占锁,一旦获取锁失败就进入队列,交给AQS来处理,一旦成功就表示获取到了一把锁。
release:释放一把独占锁的过程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
就是先拿锁实现的tryRelease方法尝试释放独占锁,一旦释放成功,就通知队列,有人释放锁了,队列前面的能够再次去竞争锁了(这一部分下面详细说明)
acquireShared:获取一把共享锁的过程
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
就是先拿锁实现的tryAcquireShared方法尝试获取共享锁,一旦获取失败,就进入队列,交给AQS来处理
releaseShared:释放一把共享锁的过程
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
就是先拿锁实现的tryReleaseShared方法尝试释放共享锁,一旦释放成功,就通知队列,有人释放锁了
一个Lock只要实现了AQS的留出的业务部分代码,就可使用AQS提供的上述方法来实现锁的相关功能,如一个简单的独占锁实现以下(略去其余一些代码):
public class Mutex implements Lock{ @Override public void lock() { sync.acquire(1); } @Override public void unlock() { sync.release(1); } private final Sync sync=new Sync(); private static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { if(getState()==0){ throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } } }
从上面咱们就看到总是方法中会有各类的int参数,其实这是AbstractQueuedSynchronizer将获取锁的过程量化对数字的操做,而state变量就是用于记录当前数字值的。以独占锁为例:
state=0 表示该锁未被其余线程获取
一旦有线程想获取锁,就能够对state进行CAS增量操做,这个增量能够是任意值,不过大多数都默认取1。也就是说一旦一个线程对state CAS操做成功就表明该线程获取到了锁,则state就变成1了。其余操做对state CAS操做失败的就表明没获取到锁,就自动进入AQS管理流程
其余线程发现当前state值不等于0表示锁已被其余线程获取,就自动进入AQS管理流程
一旦获取锁的线程想释放锁,就能够对state进行自减,即减到0,其余线程又能够去获取锁了
从上面的例子中能够看到对state的几种线程安全和非安全操做:
compareAndSetState(int expect, int update):线程安全的设置状态,由于可能多个线程并发调用该方法,因此须要CAS来保证线程安全
setState(int newState):非线程安全的设置状态,这种通常是针对只有一个线程获取锁的时候来释放锁,此时没有并发的可能性,因此就不须要上述的compareAndSetState操做
getState():获取状态值
AQS提供了上述线程安全和非安全的设置状态state的方法,供咱们在实现锁的tryAcquire、tryRelease等方法的时候合理的时候它们。
就以获取独占锁为例来详细看下该过程:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
前面简单提到了,就是先拿锁实现的tryAcquire方法去尝试获取独占锁,一旦获取锁失败就进入队列,交给AQS来处理。AQS的处理简单描述下就是将当前线程包装成Node节点而后放到队列中进程排队,等待前面的Node节点都出队了,被唤醒轮到本身再次去竞争锁。
咱们先来认识下Node节点,大体以下结构:
static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; volatile int waitStatus; } private transient volatile Node head; private transient volatile Node tail;
首先就是prev、next节点能够构成一个双向队列。AQS中含有上述的head和tail两个属性一块儿来构成FIFO队列。
Thread thread则表明的是构成此节点的线程。
Node nextWaiter:是用于condition queue,用于构成一个单向的FIFO队列,详见下面。
volatile int waitStatus:则表示该节点当前的状态,目前有以下状态:
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
CANCELLED:表示该节点所对应的线程由于获取锁的过程当中超时或者被中断而被设置成此状态
SIGNAL:表示该节点所对应的线程被阻塞,再也不被调度执行,须要等待其余线程释放锁以后来唤醒它,才能再次加入锁的竞争
CONDITION:表示该节点所对应的线程被阻塞,再也不被调度执行,在等待某一个condition的signal、signalAll方法的唤醒
PROPAGATE:只用于共享状态的HEAD节点,目前还没弄清楚,欢迎一块儿来探讨
节点建立后默认状态值是0。
接下来咱们就要分别看下这个独占锁的入队和出队过程以及共享锁的入队和出队过程
###3.3.1 独占锁的获取和释放过程
先来看看获取
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先调用锁本身的tryAcquire业务逻辑来尝试获取锁,一旦获取失败,则进入AQS的处理流程,即acquireQueued方法
第一步:就是构造一个Node并入队
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这里先尝试经过CAS将新添加的Node节点放置当前tail的后面,若是tail为空或者CAS失败,就进入下面的for循环形式的CAS流程,前面的尝试主要是为了针对大部分状况即(pred!=null的状况下)可以快速处理,而for循环则是要考虑全部状况,由于判断逻辑就比较多了。如刚初始化的时候,head和tail都指向了一个空的Node,该Node并不须要获取锁。
第二步:入队后的处理逻辑,是被阻塞呢?仍是持续不断尝试获取锁呢?
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这个就要查看新构建的Node的前一个节点是否是head,若是是head,则该新节点能够尝试下获取锁,一旦获取锁成功就会设置head指向当前节点Node。
来重点说说这个head节点:刚初始化的时候head和tail都指向的是一个空的Node,head节点并无获取到锁,见上面。若是上述尝试获取锁成功就重新设置head节点为当前Node,此时head节点又是一个获取了锁的节点。
若是当前节点的前一个节点不是head或者是head可是尝试获取锁失败,此时就须要衡量下是否须要将当前节点阻塞,即shouldParkAfterFailedAcquire方法的逻辑:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若是前置节点处于Node.SIGNAL状态,则该Node则须要被阻塞,即还轮不到它。
若是前置节点状态处于Node.CANCELLED状态,则该Node须要从新更换前置节点
若是前置节点状态处于其余状态,则须要把前置节点的状态值设置成Node.SIGNAL,以便下一次循环尝试获取锁失败时,该节点被阻塞,即知足上述第一种状况。从这里看出,节点的状态大部分是应用于后续节点行为判断使用的,而对自身的逻辑并没什么影响。
基本上不管如何,该节点没有机会尝试获取锁或者有机会尝试获取锁可是又失败时,最终都会被阻塞,则里的阻塞使用的就是LockSupport.park:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
接下来看看释放
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
先调用锁的tryRelease具体业务逻辑,而后就会使用LockSupport.unpark唤醒head的下一个节点,至此在上述acquireQueued方法中被阻塞的那个Node(head的下一个Node)再也不阻塞,再次继续for循环流程,又一次开始尝试,若是获取成功,则更新了head节点,则以前的head就表示出队了,若是获取还失败,说明又被外部线程获取到了,那就再一次的被park阻塞。
共享锁的获取和释放过程就再也不说了,大部分都是相同的。
来看下AQS提供的Condition实现,简单以下
public class ConditionObject implements Condition{ private transient Node firstWaiter; private transient Node lastWaiter; }
一个ConditionObject对象自己维护了一个FIFO单向队列,这里经过Node的Node nextWaiter属性来创建关联。
咱们来简单看看Condition的await和signal:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
这里再也不一点一旦详细研究了,内容太多了,简单看下总体的逻辑:
前提:在调用Condition的await和signal的方法前必须先获取到锁
第一步:首先构造一个Node,初始化状态为Node.CONDITION,并放到对应ConditionObject的lastWaiter上
第二步:释放锁
第三步:while循环里检查该Node是否在同步队列即上述的FIFO双向队列,不在的话,则被park住,等待unpark的唤醒
第四步:收到了unpark唤醒(在唤醒的时候就顺便将该Node加入了上述的FIFO双向队列中了,所以能够跳出while循环了),跳出了while循环
第五步:该Node已经进入了上述的FIFO双向队列中了,开始了上面介绍的逻辑
再来看看唤醒操做:
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
就是unpark头节点Node,而且将该Node放入上述的FIFO双向队列中。
#4 结束语
下一篇就来详细的介绍几个具体的基于AQS的锁的实现。