上篇文章 AQS系列一:源码分析非公平ReentrantLock 中,咱们分析了ReentrantLock的非公平实现,本篇会承接上文,继续分析ReentrantLock的公平锁实现(以及Condition的实现)。java
在此以前咱们要先弄明白,“不公平”体如今哪里。node
好吧,我也不清楚。
因而我对比了ReentrantLock的非公平和公平实现,即NonfairSync
vs FairSync
,发现差异主要体现在加锁,更确切的说是获取锁环节。segmentfault
## 非公平获取锁 final boolean nonfairTryAcquire(int acquires) { ... if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ... } ## 公平获取锁 protected final boolean tryAcquire(int acquires) { ... if (!hasQueuedPredecessors() //### 差异体如今此处 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } ... }
显然,公平锁多执行了!hasQueuedPredecessors()
,看看此方法的逻辑。并发
public final boolean hasQueuedPredecessors() { Node h, s; if ((h = head) != null) { ## h头结点 if ((s = h.next) == null ... ) { ## s二号节点 ... } if (s != null && s.thread != Thread.currentThread()) ##检查2号节点绑定线程,是否当前线程 return true; } return false; }
hasQueuedPredecessors方法只有在 2号节点不为空,且绑定线程非当前线程的前提下,会返回true。
返回ture意味着!hasQueuedPredecessors() = false
,没有资格获取锁(就是没机会执行compareAndSetState——尝试修改state)函数
反过来说,没有队列(无线程正在执行),或者没有2号节点(取消或者临时状态),再或者2号节点的绑定线程就是当前线程时,才会尝试获取锁。工具
咱们分析下最后这种状况,2号节点绑定的线程是第1个等待的线程(第1个获取锁失败的线程),第1个等待的线程在hasQueuedPredecessors()的运做下,成为了第1个有资格尝试获取锁的线程。而这,就是公平!源码分析
那么没有hasQueuedPredecessors方法的非公平锁,到底“不公平”在哪儿呢?
咱们回想一下,在加 / 解锁的过程当中,nonfairTryAcquire方法被调用的位置就能获得答案了。ui
public final void acquire(int arg) { if (!tryAcquire(arg) ### 位置1,尝试获取 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; ... for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { ### 位置2,尝试获取 setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } ... }
在上述代码中,tryAcquire(非公平实现会调用nonfairTryAcquire)会在位置一、2两处触发。试想以下场景:this
线程T-3
执行完毕,调用了unlock;随着线程T-2
被唤醒,位置2处代码可能会被执行。线程T-1
的介入,位置1处的代码也有可能被执行。所以线程T-2
和T-1
谁能在并发中抢到锁,存在不肯定性。spa
原理说完了,那具体怎么构建公平的ReentrantLock呢?构造函数传参便可:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); ## 入参fair传入true,构建公平锁 }
ReentrantLock的加解锁过程已详细分析了一遍,若是你常用这个工具,确定对衍生出另外一个大咖condition
有所了解。
二话不说,先甩出demo:
static Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();; public void doSomething(){ lock.lock(); System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName())); try { System.out.println(String.format("%s线程,await",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); //模拟耗时业务逻辑执行 condition.await(); //await System.out.println(String.format("%s线程,await被唤醒",Thread.currentThread().getName())); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName())); lock.unlock(); } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 1; while (total>0){ Thread t = new Thread(()->{ test.doSomething(); },"T-"+total); t.start(); TimeUnit.MILLISECONDS.sleep(200L); //让子线程T-1率先获取到锁 lock.lock(); System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName())); test.condition.signal(); System.out.println(String.format("%s线程,signal",Thread.currentThread().getName())); lock.unlock(); total--; } }
结合已掌握的加解锁原理,分析demo执行过程:
线程T-1
先获取到锁,200ms后main线程
也会尝试获取锁,固然main线程
获取不到——因为耗时达2s的业务逻辑疯狂执行中。(sleep处,此时main线程应该构建了同步队列,main线程
做为2号节点的绑定线程被无情阻塞,下图)线程T-1
搞定了难缠的业务逻辑,却又遭遇condition.await()
的伏击线程main
发现本身神奇的不被阻塞了,又神奇的获取到了锁。因而投桃报李,condition.signal()
接unlock
二连招唤醒了线程T-1
线程T-1
觉醒于await
处,执行完剩余逻辑demo的执行结果,能初步证实上述分析:
T-1线程,获取到锁了 T-1线程,await main线程,获取到锁了 main线程,signal T-1线程,await被唤醒 T-1线程,业务执行完毕
从构造函数出发:
public Condition newCondition() { return sync.newCondition(); } ## Sync类建立ConditionObject final ConditionObject newCondition() { return new ConditionObject(); }
ConditionObject是AQS中的另外一内部类,看看它的属性:
## ConditionObject类 private transient Node firstWaiter; private transient Node lastWaiter;
感受上和AQS的设定上有些像?
## AQS类 private transient volatile Node head; private transient volatile Node tail;
先大胆猜想一下,condition中极可能会再次构建同步队列。
接下来就是验证咱们的猜想的过程:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); ## 建立等待队列,node是尾节点。 详情参看[addConditionWaiter详情] int savedState = fullyRelease(node); ## 重置state,返回重置前的state值。 详情参看[fullyRelease详情] int interruptMode = 0; while (!isOnSyncQueue(node)) { ## 是否在AQS同步队列中 LockSupport.park(this); ## 不在AQS同步队列的节点,阻塞当前线程 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { if (!isHeldExclusively()) ## 当前线程是否owner线程,若是不是,抛异常——这儿决定了await必须用在lock()方法以后 throw new IllegalMonitorStateException(); Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Node.CONDITION); ## 建立新节点,原子形赋值waitStatus=CONDITION=-2,并绑定当前线程到node节点 ## node会做为尾节点,置于队列最后 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
final int fullyRelease(Node node) { try { int savedState = getState(); ## 获取当前state if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { node.waitStatus = Node.CANCELLED; throw t; } } public final boolean release(int arg) { if (tryRelease(arg)) { ## 尝试“清0”state Node h = head; ## 此处head不为空,unpark线程main,return true if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) ## 当前线程验证,若是当前线程!=owner,抛异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { ## 若是state清0,同时清空owner线程,return true free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
有了以前分析ReentrantLock的经验,与之很是相像的condition代码应该不难拿下。
这里画出await方法中fullyRelease(node)执行先后的节点和关键属性的变化:
图右侧(await方法执行到了LockSupport.park(this)时),线程T-1
已经阻塞,线程main
则解除阻塞状态。
经过上图很容易看出,咱们以前的猜想是正确的:await
方法又构建了一个同步队列,不过此次的头、尾指针在ConditionObject
类中。
再来看看signal方法做了什么:
public final void signal() { if (!isHeldExclusively()) ## 和await()方法中的同样,先验证owner线程 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) ## condition头结点传递 && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; Node p = enq(node); ## 将ConditionObject头结点移动到AQS队列尾部。 详情参看[enq详情] int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) ## 取消或修改waitStatus失败才做unpark操做,此处unpark不会触发 LockSupport.unpark(node.thread); return true; }
private Node enq(Node node) { for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); ## 入参node,成为了AQS队列新的尾节点 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { initializeSyncQueue(); ## 初始化AQS队列 } } }
signal中最神奇的莫过于enq(node)方法,它完成了节点的转移,condition队列头结点 -> AQS队列尾节点。
经过下图观察整个signal方法产生的各对象结构和属性变化:
观察可知,signal执行后节点转移已经完成,线程T-1依然阻塞,此时ConditionObject已经完成了它的历史使命。
线程T-1何时解除阻塞呢?其实这部分上篇文章已经分析过了,就是咱们的老朋友unlock()。
区别在于线程T-1被唤醒后,执行的是await后续的逻辑:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { ## 2.下次循环,node已经在AQS队列中,返回true,跳出循环 LockSupport.park(this); ## 1.线程T-1觉醒于此 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) ## 3.再度获取到锁 && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
至此,咱们已经了解了ReentrantLock的主逻辑的源码实现(公平、非公平、condition),本系列的下篇文章将进入下一副本——CountDownLatch
,敬请期待!