AbstractQueuedSynchronizer
先分析下同步器AbstractQueuedSynchronizer
,这个是用于锁实现的类,ReentrantLock就用到了它java
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
ReentrantLock
的lock()
方法就是调用acquire(int arg)
去作事情的。
其中protected boolean tryAcquire(int arg)
是留给子类去实现的,因此这里是采用了模板设计模式。这个方法简单来讲就是去获取锁。node
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
addWaiter
方法简单来讲是构造节点,而后用CAS
的方式把这个节点加入同步器中节点链表的尾巴。设计模式
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //若是前驱结点是头节点的话,那么尝试获取锁(也就是同步状态) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //若是失败的话 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued()
方法中,当前线程在“死循环”中尝试获取同步状态,而且只有前驱节点是头节点才可以尝试获取同步状态。安全
/* * 查询是否有线程比当前线程更早地请求获取锁 */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //若是h==t,返回false,表示没有。h==t的时候要么没有等待者,要么只有一个。当只有一个的时候, //那么这一个节点确定是首节点,而首节点中的线程一定是获取了锁的。 // h!=t&&((s = h.next) == null || s.thread != Thread.currentThread()) //若是h!=t,则进入后面的判断 //当头节点的后继节点为null,可是这个时候tail为null(head和tail节点都是懒初始 //化),或者头节点的后继节点不为null,可是头节点的后继节点中的线程不是当前线程,则返回true return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
若是等待队列为空,或者只有首节点,或者首节点的后继节点中的线程是当前线程,那么当前线程就能够去获取同步状态less
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ /* *若是这个节点的waitStatus已经被标记为SIGNAL(-1)了的话,那么 *这个节点就须要park,因此方法返回true */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ /* * 该节点的前驱节点若是被取消了(waitStatus为1时表示取消状态,目前只有这个状态的值大于 * 0),那么跳过前驱节点(经过死循环的方式把前驱结点的前驱结点设置为本身的前驱结点)。然 * 后退出if条件语句,调到末尾,返回false,表示本身还能够抢救一下,能够进行获取的锁的尝 * 试,而不是park。 * */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 若是以上状况都不成立的话,那么就把本身的waitStatus标记为SIGNAL,返回true,表示本身要 * park,战略性投降。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable { ..... abstract static class Sync extends AbstractQueuedSynchronizer /** public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ..... }
ConditionObject
await()
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //线程将会在调用park方法后阻塞,直到被从新唤醒,从condition队列加入同步队列,从await()方 //法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中), //进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); //成功获取同步状态(或者说锁)以后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已 //经成功地获取了锁。 } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final long fullyRelease(Node node) { boolean failed = true; try { long savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { //若是释放同步状态失败,就throws exception,在finally那里还会Cancels node throw new IllegalMonitorStateException(); } } finally { //若是释放同步状态失败,就Cancels node if (failed) node.waitStatus = Node.CANCELLED; } }
signal()
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
Condition
的signal()
方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点以前,会将节点移到同步队列中。signal()
方法进行了isHeldExclusively()
检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport
唤醒节点中的线程。/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { //当没有waiters时进行的一些优化,以便编译器进行方法内联,transferForSignal方法才是重点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) 把节点从condition队列“传输”到同步队列去。”传输“成功或者在唤醒前节点已经取消,就返回true。 */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). * 经过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。 */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。 LockSupport.unpark(node.thread); return true; }