我在Java并发包源码学习系列:AbstractQueuedSynchronizer#同步队列与Node节点已经粗略地介绍了一下CLH的结构,本篇主要解析该同步队列的相关操做,所以在这边再回顾一下:html
AQS经过内置的FIFO同步双向队列来完成资源获取线程的排队工做,内部经过节点head【其实是虚拟节点,真正的第一个线程在head.next的位置】和tail记录队首和队尾元素,队列元素类型为Node。java
接下来将要经过AQS以独占式的获取和释放资源的具体案例来详解内置CLH阻塞队列的工做流程,接着往下看吧。node
public final void acquire(int arg) { if (!tryAcquire(arg) && // tryAcquire由子类实现,表示获取锁,若是成功,这个方法直接返回了 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是获取失败,执行 selfInterrupt(); }
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
的逻辑,咱们能够将其进行拆分,分为两步:
selfInterrupt()
的逻辑,进行阻塞。接下来咱们分别来看看addWaiter
和acquireQueued
两个方法。编程
根据传入的mode参数决定独占或共享模式,为当前线程建立节点,并入队。多线程
// 其实就是把当前线程包装一下,设置模式,造成节点,加入队列 private Node addWaiter(Node mode) { // 根据mode和thread建立节点 Node node = new Node(Thread.currentThread(), mode); // 记录一下原尾节点 Node pred = tail; // 尾节点不为null,队列不为空,快速尝试加入队尾。 if (pred != null) { // 让node的prev指向尾节点 node.prev = pred; // CAS操做设置node为新的尾节点,tail = node if (compareAndSetTail(pred, node)) { // 设置成功,让原尾节点的next指向新的node,实现双向连接 pred.next = node; // 入队成功,返回 return node; } } // 快速入队失败,进行不断尝试 enq(node); return node; }
几个注意点:并发
private Node enq(final Node node) { // 自旋,俗称死循环,直到设置成功为止 for (;;) { // 记录原尾节点 Node t = tail; // 第一种状况:队列为空,原先head和tail都为null, // 经过CAS设置head为哨兵节点,若是设置成功,tail也指向哨兵节点 if (t == null) { // Must initialize // 初始化head节点 if (compareAndSetHead(new Node())) // tail指向head,下个线程来的时候,tail就不为null了,就走到了else分支 tail = head; // 第二种状况:CAS设置尾节点失败的状况,和addWaiter同样,只不过它在for(;;)中 } else { // 入队,将新节点的prev指向tail node.prev = t; // CAS设置node为尾部节点 if (compareAndSetTail(t, node)) { //原来的tail的next指向node t.next = node; return t; } } } }
enq的过程是自选设置队尾的过程,若是设置成功,就返回。若是设置失败,则一直尝试设置,理念就是,我总能等待设置成功那一天。app
咱们还能够发现,head是延迟初始化的,在第一个节点尝试入队的时候,head为null,这时使用了new Node()
建立了一个不表明任何线程的节点,做为虚拟头节点,且咱们须要注意它的waitStatus初始化为0,这一点对咱们以后分析有指导意义。ide
若是是CAS失败致使重复尝试,那就仍是让他继续CAS好了。oop
// 这个方法若是返回true,代码将进入selfInterrupt() final boolean acquireQueued(final Node node, int arg) { // 注意默认为true boolean failed = true; try { // 是否中断 boolean interrupted = false; // 自旋,即死循环 for (;;) { // 获得node的前驱节点 final Node p = node.predecessor(); // 咱们知道head是虚拟的头节点,p==head表示若是node为阻塞队列的第一个真实节点 // 就执行tryAcquire逻辑,这里tryAcquire也须要由子类实现 if (p == head && tryAcquire(arg)) { // tryAcquire获取成功走到这,执行setHead出队操做 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 走到这有两种状况 1.node不是第一个节点 2.tryAcquire争夺锁失败了 // 这里就判断 若是当前线程争锁失败,是否须要挂起当前这个线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 死循环退出,只有tryAcquire获取锁失败的时候failed才为true if (failed) cancelAcquire(node); } }
CLU同步队列遵循FIFO,首节点的线程释放同步状态后,唤醒下一个节点。将队首节点出队的操做实际上就是,将head指针指向将要出队的节点就能够了。源码分析
private void setHead(Node node) { // head指针指向node head = node; // 释放资源 node.thread = null; node.prev = null; }
/** * 走到这有两种状况 1.node不是第一个节点 2.tryAcquire争夺锁失败了 * 这里就判断 若是当前线程争锁失败,是否须要挂起当前这个线程 * * 这里pred是前驱节点, node就是当前节点 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点的waitStatus int ws = pred.waitStatus; // 前驱节点为SIGNAL【-1】直接返回true,表示当前节点能够被直接挂起 if (ws == Node.SIGNAL) return true; // ws>0 CANCEL 说明前驱节点取消了排队 if (ws > 0) { // 下面这段循环其实就是跳过全部取消的节点,找到第一个正常的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 将该节点的后继指向node,创建双向链接 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. * 官方说明:走到这waitStatus只能是0或propagate,默认状况下,当有新节点入队时,waitStatus老是为0 * 下面用CAS操做将前驱节点的waitStatus值设置为signal */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回false,接着会再进入循环,此时前驱节点为signal,返回true return false; }
针对前驱节点的waitStatus有三种状况:
等待状态不会为
Node.CONDITION
,由于它用在 ConditonObject 中
PROPAGATE表示共享模式下,前驱节点不只会唤醒后继节点,同时也可能会唤醒后继的后继。
咱们能够发现,这个方法在第一次走进来的时候是不会返回true的。缘由在于,返回true的条件时前驱节点的状态为SIGNAL,而第一次的时候尚未给前驱节点设置SIGNAL呢,只有在CAS设置了状态以后,第二次进来才会返回true。
那SIGNAL的意义究竟是什么呢?
这里引用:并发编程——详解 AQS CLH 锁 # 为何 AQS 须要一个虚拟 head 节点
waitStatus这里用ws简称,每一个节点都有ws变量,用于表示该节点的状态。初始化的时候为0,若是被取消为1,signal为-1。
若是某个节点的状态是signal的,那么在该节点释放锁的时候,它须要唤醒下一个节点。
所以,每一个节点在休眠以前,若是没有将前驱节点的ws设置为signal,那么它将永远没法被唤醒。
所以咱们会发现上面当前驱节点的ws为0或propagate的时候,采用cas操做将ws设置为signal,目的就是让上一个节点释放锁的时候可以通知本身。
private final boolean parkAndCheckInterrupt() { // 挂起当前线程 LockSupport.park(this); return Thread.interrupted(); }
shouldParkAfterFailedAcquire方法返回true以后,就会调用该方法,挂起当前线程。
LockSupport.park(this)
方法挂起的线程有两种途径被唤醒:1.被unpark() 2.被interrupt()。
须要注意这里的Thread.interrupted()会清除中断标记位。
上面tryAcquire获取锁失败的时候,会走到这个方法。
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 将节点的线程置空 node.thread = null; // 跳过全部的取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 这里在没有并发的状况下,preNext和node是一致的 Node predNext = pred.next; // Can use unconditional write instead of CAS here. 能够直接写而不是用CAS // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 设置node节点为取消状态 node.waitStatus = Node.CANCELLED; // 若是node为尾节点就CAS将pred设置为新尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 设置成功以后,CAS将pred的下一个节点置为空 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && // pred不是首节点 ((ws = pred.waitStatus) == Node.SIGNAL || // pred的ws为SIGNAL 或 能够被CAS设置为SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // pred线程非空 // 保存node 的下一个节点 Node next = node.next; // node的下一个节点不是cancelled,就cas设置pred的下一个节点为next if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 上面的状况除外,则走到这个分支,唤醒node的下一个可唤醒节点线程 unparkSuccessor(node); } node.next = node; // help GC } }
public final boolean release(int arg) { if (tryRelease(arg)) { // 子类实现tryRelease方法 // 得到当前head Node h = head; // head不为null而且head的等待状态不为0 if (h != null && h.waitStatus != 0) // 唤醒下一个能够被唤醒的线程,不必定是next哦 unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 若是node的waitStatus<0为signal,CAS修改成0 // 将 head 节点的 ws 改为 0,清除信号。表示,他已经释放过了。不能重复释放。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 唤醒后继节点,可是有可能后继节点取消了等待 即 waitStatus == 1 Node s = node.next; // 若是后继节点为空或者它已经放弃锁了 if (s == null || s.waitStatus > 0) { s = null; // 从队尾往前找,找到没有没取消的全部节点排在最前面的【直到t为null或t==node才退出循环嘛】 for (Node t = tail; t != null && t != node; t = t.prev) // 若是>0表示节点被取消了,就一直向前找呗,找到以后不会return,还会一直向前 if (t.waitStatus <= 0) s = t; } // 若是后继节点存在且没有被取消,会走到这,直接唤醒后继节点便可 if (s != null) LockSupport.unpark(s.thread); }