为充分利用机器性能,人们发明了多线程。但同时带来了线程安全问题,因而人们又发明了同步锁。java
这个问题天然人人知道,但你真的了解同步锁吗?仍是说你会用其中的上锁与解锁功能?node
今天咱们就一块儿来深刻看同步锁的原理和实现吧!c#
同步锁的职责能够说就一个,限制资源的使用(线程安全从属)。安全
它通常至少会包含两个功能: 1. 给资源加锁; 2. 给资源解锁;另外,它通常还有 等待/通知 即 wait/notify 的功能;数据结构
同步锁的应用场景:多个线程同时操做一个事务必须保证正确性;一个资源只能同时由一线程访问操做;一个资源最多只能接入k的并发访问;保证访问的顺序性;多线程
同步锁的实现方式:操做系统调度实现;应用自行实现;CAS自旋;并发
同步锁的几个问题:app
为何它能保证线程安全?less
锁等待耗CPU吗?oop
使用锁后性能降低严重的缘由是啥?
其实对于应用层来讲,很是多就是 lock/unlock , 这也是锁的核心。
AQS 是java中不少锁实现的基础,由于它屏蔽了不少繁杂而底层的阻塞操做,为上层抽象出易用的接口。
咱们就以AQS做为跳板,先来看一下上锁的过程。为不至于陷入具体锁的业务逻辑中,咱们先以最简单的 CountDownLatch 看看。
// 先看看 CountDownLatch 的基础数据结构,能够说是不能再简单了,就继承了 AQS,而后简单覆写了几个必要方法。 // java.util.concurrent.CountDownLatch.Sync /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // 只有一种状况会获取锁成功,即 state == 0 的时候 return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 原始的锁数量是在初始化时指定的不可变的,每次释放一个锁标识 int nextc = c-1; if (compareAndSetState(c, nextc)) // 只有一状况会释放锁成功,即本次释放后 state == 0 return nextc == 0; } } } private final Sync sync;
public void await() throws InterruptedException { // 调用 AQS 的接口,由AQS实现了锁的骨架逻辑 sync.acquireSharedInterruptibly(1); } // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 首先尝试获取锁,若是成功就不用阻塞了 // 而从上面的逻辑咱们看到,获取锁至关之简单,因此,获取锁自己并无太多的性能消耗哟 // 若是获取锁失败,则会进行稍后尝试,这应该是复杂而精巧的 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 首先将当前线程添加排队队尾,此处会保证线程安全,稍后咱们能够看到 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取其上一节点,若是上一节点是头节点,就表明当前线程能够再次尝试获取锁了 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 先检测是否须要阻塞,而后再进行阻塞等待,阻塞由 LockSupport 底层支持 // 若是阻塞后,将不会主动唤醒,只会由 unlock 时,主动被通知 // 所以,此处便是获取锁的最终等待点 // 操做系统将不会再次调度到本线程,直到获取到锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // 如此线程安全地添加当前线程到队尾? CAS 保证 /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // 检测是否须要进行阻塞 /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 只有前置节点是 SIGNAL 状态的节点,才须要进行 阻塞等待,固然前置节点会在下一次循环中被设置好 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // park 阻塞实现 /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { // 将当前 AQS 实例做为锁对象 blocker, 进行操做系统调用阻塞, 因此全部等待锁的线程将会在同一个锁前提下执行 LockSupport.park(this); return Thread.interrupted(); }
如上,上锁过程是比较简单明了的。加入一队列,而后由操做系统将线程调出。(那么操做系统是如何把线程调出的呢?有兴趣自行研究)
public void countDown() { // 一样直接调用 AQS 的接口,由AQS实现了锁的释放骨架逻辑 sync.releaseShared(1); } // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { // 调用业务实现的释放逻辑,若是成功,再执行底层的释放,如队列移除,线程通知等等 // 在 CountDownLatch 的实现中,只有 state == 0 时才会成功,因此它只会执行一次底层释放 // 这也是咱们认为 CountDownLatch 可以作到多线程同时执行的效果的缘由之一 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // 队列不为空才进行释放 if (h != null && h != tail) { int ws = h.waitStatus; // 看过上面的 lock 逻辑,咱们知道只要在阻塞状态,必定是 Node.SIGNAL if (ws == Node.SIGNAL) { // 状态改变成功,才进行后续的唤醒逻辑 // 由于先改变状态成功,才算是线程安全的,再进行唤醒,不然进入下一次循环再检查 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 将头节点的下一节点唤醒,若有必要 unparkSuccessor(h); } // 这里的 propagates, 是要传播啥呢?? // 为何只唤醒了一个线程,其余线程也能够动了? else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 唤醒下一个节点 // 但若是下一节点已经取消等待了,那么就找下一个没最近的没被取消的线程进行唤醒 // 唤醒只是针对一个线程的哟 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
由于从上一节的讲解中,咱们看到,当用户调用 countDown 时,仅仅是让操做系统唤醒了 head 的下一个节点线程或者最近未取消的节点。那么,从哪里来的全部线程都获取了锁从而运行呢?
实际上是在 获取锁的过程当中,还有一点咱们未看清:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 当countDown被调用后,head节点被唤醒,执行 int r = tryAcquireShared(arg); if (r >= 0) { // 获取到锁后,设置node为下一个头节点,并把唤醒状态传播下去,而这里面确定会作一些唤醒其余线程的操做,请看下文 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 若是有必要,则作一次唤醒下一线程的操做 // 在 countDown() 不会触发此操做,因此这里只是一个内部调用传播 Node s = node.next; if (s == null || s.isShared()) // 此处锁释放逻辑如上,总之,又是另外一次的唤醒触发 doReleaseShared(); } }
到此,咱们明白了它是怎么作到一个锁释放,全部线程可通行的。也从根本上回答了咱们猜测,全部线程同时并发运行。然而并无,它只是经过唤醒传播性来依次唤醒各个等待线程的。从绝对时间性上来说,都是有前后关系的。之后可别再浅显说是同时执行了哟。
上面看出,针对一个lock/unlock 的过程仍是很简单的,由操做系统负责大头,实现代码也并很少。
可是针对稍微有点要求的场景,就会进行条件式的操做。好比:持有某个锁运行一段代码,可是,运行时发现某条件不知足,须要进行等待而不能直接结束,直到条件成立。即所谓的 wait 操做。
乍一看,wait/notify 与 lock/unlock 很像,其实否则。区分主要是 lock/unlock 是针对整个代码段的,而 wait/notify 则是针对某个条件的,即获取了锁不表明条件成立了,可是条件成立了必定要在锁的前提下才能进行安全操做。
那么,是否 wait/notify 也同样的实现简单呢?好比java的最基础类 Object 类就提供了 wait/notify 功能。
咱们既然想一探究竟,仍是以并发包下的实现做为基础吧,毕竟 java 才是咱们的强项。
本次,我们以 ArrayBlockingQueue#put/take 做为基础看下这种场景的使用先。
ArrayBlockingQueue 的put/take 特性就是,put当队列满时,一直阻塞,直到有可用位置才继续运行下一步。而take当队列为空时同样阻塞,直到队列里有数据才运行下一步。这种场景使用锁主很差搞了,由于这是一个条件判断。put/take 以下:
// java.util.concurrent.ArrayBlockingQueue#put /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 当队列满时,一直等待 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } // java.util.concurrent.ArrayBlockingQueue#take public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 当队列为空时一直等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
看起来至关简单,彻底符合人类思惟。只是,这里使用的两个变量进行控制流程 notFull,notEmpty. 这两个变量是如何进行关联的呢?
在这以前,咱们还须要补充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,什么时候才能运行呢?如上代码在各自的入队和出队完成以后进行通知就能够了。
// 与 put 对应,入队完成后,队列天然就不为空了,通知下 notEmpty 就行了 /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 我已放入一个元素,不为空了 notEmpty.signal(); } // 与 take 对应,出队完成后,天然就不多是满的了,至少一个空余空间。 /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 我已移除一个元素,确定没有满了,大家继续放入吧 notFull.signal(); return x; }
是否是超级好理解。是的。不过,咱们不是想看 ArrayBlockingQueue 是如何实现的,咱们是要论清 wait/notify 是如何实现的。由于毕竟,他们不是一个锁那么简单。
// 三个锁的关系,即 notEmpty, notFull 都是 ReentrantLock 的条件锁,至关因而其子集吧 /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // lock.newCondition() 是什么鬼?它是 AQS 中实现的 ConditionObject // java.util.concurrent.locks.ReentrantLock#newCondition public Condition newCondition() { return sync.newCondition(); } // java.util.concurrent.locks.ReentrantLock.Sync#newCondition final ConditionObject newCondition() { // AQS 中定义 return new ConditionObject(); }
接下来,咱们要带着几个疑问来看这个 Condition 的对象:
1. 它的 wait/notify 是如何实现的?
2. 它是如何与互相进行联系的?
3. 为何 wait/notify 必需要在外面的lock获取以后才能执行?
4. 它与Object的wait/notify 有什么相同和不一样点?
可以回答了上面的问题,基本上对其原理与实现也就理解得差很少了。
咱们从上面能够看到,它是经过调用 await()/signal() 实现的,到底作事如何,且看下面。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await() /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加当前线程到 等待线程队列中,有 lastWaiter/firstWaiter 维护 Node node = addConditionWaiter(); // 释放当前lock中持有的锁,详情且看下文 int savedState = fullyRelease(node); // 从如下开始,将再也不保证线程安全性,由于当前的锁已经释放,其余线程将会从新竞争锁使用 int interruptMode = 0; // 循环断定,若是当前节点不在 sync 同步队列中,那么就反复阻塞本身 // 因此判断是否在 同步队列上,是很重要的 while (!isOnSyncQueue(node)) { // 没有在同步队列,阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 当条件被知足后,须要从新竞争锁,详情看下文 // 竞争到锁后,原样返回到 wait 的原点,继续执行业务逻辑 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 下面是异常处理,忽略 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 预期的,都是释放锁成功,若是失败,说明当前线程并并未获取到锁,引起异常 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { // tryRelease 由客户端自定义实现 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 如何断定当前线程是否在同步队列中或者能够进行同步队列? /** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { // 若是上一节点尚未被移除,当前节点就不能被加入到同步队列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是当前节点的下游节点已经存在,则它自身一定已经被移到同步队列中 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 最终直接从同步队列中查找,若是找到,则自身已经在同步队列中 return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } // 当条件被知足后,须要从新竞争锁,以保证外部的锁语义,由于以前本身已经将锁主动释放 // 这个锁与 lock/unlock 时的一毛同样,没啥可讲的 // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
总结一下 wait 的逻辑:
1. 前提:自身已获取到外部锁;
2. 将当前线程添加到 ConditionQueue 等待队列中;
3. 释放已获取到的锁;
4. 反复检查进入等待,直到当前节点被移动到同步队列中;
5. 条件知足被唤醒,从新竞争外部锁,成功则返回,不然继续阻塞;(外部锁是同一个,这也是要求两个对象必须存在依赖关系的缘由)
6. wait前线程持有锁,wait后线程持有锁,没有一点外部锁变化;
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { // 只有获取锁的实例,才能够进行signal,不然你拿什么去保证线程安全呢 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 通知 firstWaiter if (first != null) doSignal(first); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { // 最多只转移一个 节点 do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 将一个节点从 等待队列 移动到 同步队列中,便可参与下一轮竞争 // 只有确实移动成功才会返回 true // 说明:当前线程是持有锁的线程 // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 同步队列由 head/tail 指针维护 Node p = enq(node); int ws = p.waitStatus; // 注意,此处正常状况下并不会唤醒等待线程,仅是将队列转移。 // 由于当前线程的锁保护区域并未完成,完成后天然会唤醒其余等待线程 // 不然将会存在当前线程任务还未执行完成,却被其余线程抢了先去,那接下来的任务当如何?? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
总结一下,notify 的功能原理以下:
因此,实现 wait/notify, 最关键的就是维护两个队列,等待队列与同步队列,并且都要求是在有外部锁保证的状况下执行。
到此,咱们也能回答一个问题:为何wait/notify必定要在锁模式下才能运行?
由于wait是等待条件成立,此时一定存在竞争须要作保护,而它自身又必须释放锁以使外部条件可成立,且后续须要作恢复动做;而notify以后可能还有后续工做必须保障安全,notify只是锁的一个子集。。。
有时条件成立后,能够容许全部线程通行,这时就能够进行 notifyAll, 那么若是达到通知全部的目的呢?是一块儿通知仍是??
如下是 AQS 中的实现:
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
能够看到,它是经过遍历全部节点,依次转移等待队列到同步队列(通知)的,本来就没有人能同时干几件事的!
本文从java实现的角度去解析同步锁的原理与实现,但并不局限于java。道理老是相通的,只是像操做系统这样的大佬,能干的活更纯粹:好比让cpu根本不用调度一个线程。