打通 Java 任督二脉 —— 并发数据结构的基石
每个 Java 的高级程序员在体验过多线程程序开发以后,都须要问本身一个问题,Java 内置的锁是如何实现的?最经常使用的最简单的锁要数 ReentrantLock,使用它加锁时若是没有当即加成功,就会阻塞当前的线程等待其它线程释放锁以后再从新尝试加锁,那线程是如何实现阻塞本身的?其它线程释放锁以后又是若是唤醒当前线程的?当前线程是如何得出本身没有加锁成功这一结论的?本篇内容将会从根源上回答上面提到的全部问题java
Java 的线程阻塞和唤醒是经过 Unsafe 类的 park 和 unpark 方法作到的。node
public class Unsafe { ... public native void park(boolean isAbsolute, long time); public native void unpark(Thread t); ... } 复制代码
这两个方法都是 native 方法,它们自己是由 C 语言来实现的核心功能。park 的意思是停车,让当前运行的线程 Thread.currentThread() 休眠,unpark 的意思是解除停车,唤醒指定线程。这两个方法在底层是使用操做系统提供的信号量机制来实现的。具体实现过程要深究 C 代码,这里暂时不去具体分析。park 方法的两个参数用来控制休眠多长时间,第一个参数 isAbsolute 表示第二个参数是绝对时间仍是相对时间,单位是毫秒。程序员
线程从启动开始就会一直跑,除了操做系统的任务调度策略外,它只有在调用 park 的时候才会暂停运行。锁能够暂停线程的奥秘所在正是由于锁在底层调用了 park 方法。算法
线程对象 Thread 里面有一个重要的属性 parkBlocker,它保存当前线程由于什么而 park。就比如停车场上停了不少车,这些车主都是来参加一场拍卖会的,等拍下本身想要的物品后,就把车开走。那么这里的 parkBlocker 大约就是指这场「拍卖会」。它是一系列冲突线程的管理者协调者,哪一个线程该休眠该唤醒都是由它来控制的。编程
class Thread { ... volatile Object parkBlocker; ... } 复制代码
当线程被 unpark 唤醒后,这个属性会被置为 null。Unsafe.park 和 unpark 并不会帮咱们设置 parkBlocker 属性,负责管理这个属性的工具类是 LockSupport,它对 Unsafe 这两个方法进行了简单的包装。缓存
class LockSupport { ... public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); U.park(false, 0L); setBlocker(t, null); // 醒来后置null } public static void unpark(Thread thread) { if (thread != null) U.unpark(thread); } } ... }
复制代码
Java 的锁数据结构正是经过调用 LockSupport 来实现休眠与唤醒的。线程对象里面的 parkBlocker 字段的值就是下面咱们要讲的「排队管理器」。微信
当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一块儿。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。每一把锁内部都会有这样一个队列管理器,管理器里面会维护一个等待的线程队列。ReentrantLock 里面的队列管理器是 AbstractQueuedSynchronizer,它内部的等待队列是一个双向列表结构,列表中的每一个节点的结构以下。数据结构
class AbstractQueuedSynchronizer { volatile Node head; // 队头线程将优先得到锁 volatile Node tail; // 抢锁失败的线程追加到队尾 volatile int state; // 锁计数 } class Node { Node prev; Node next; Thread thread; // 每一个节点一个线程 // 下面这两个特殊字段能够先不去理解 Node nextWaiter; // 请求的是共享锁仍是独占锁 int waitStatus; // 精细状态描述字 }
复制代码
加锁不成功时,当前的线程就会把本身归入到等待链表的尾部,而后调用 LockSupport.park 将本身休眠。其它线程解锁时,会从链表的表头取一个节点,调用 LockSupport.unpark 唤醒它。多线程
AbstractQueuedSynchronizer 类是一个抽象类,它是全部的锁队列管理器的父类,JDK 中的各类形式的锁其内部的队列管理器都继承了这个类,它是 Java 并发世界的核心基石。好比 ReentrantLock、ReadWriteLock、CountDownLatch、Semaphore、ThreadPoolExecutor 内部的队列管理器都是它的子类。这个抽象类暴露了一些抽象方法,每一种锁都须要对这个管理器进行定制。而 JDK 内置的全部并发数据结构都是在这些锁的保护下完成的,它是JDK 多线程高楼大厦的地基。
锁管理器维护的只是一个普通的双向列表形式的队列,这个数据结构很简单,可是仔细维护起来却至关复杂,由于它须要精细考虑多线程并发问题,每一行代码都写的无比当心。
JDK 锁管理器的实现者是 Douglas S. Lea,Java 并发包几乎全是他单枪匹马写出来的,在算法的世界里越是精巧的东西越是适合一我的来作。
Douglas S. Lea是纽约州立大学奥斯威戈分校计算机科学教授和现任计算机科学系主任,专门研究并发编程和并发数据结构的设计。他是Java Community Process的执行委员会成员,主持JSR 166,它为Java编程语言添加了并发实用程序。
后面咱们将 AbstractQueuedSynchronizer 简写成 AQS。我必须提醒各位读者,AQS 太复杂了,若是在理解它的路上遇到了挫折,这很正常。目前市场上并不存在一本能够轻松理解 AQS 的书籍,可以吃透 AQS 的人太少太少,我本身也不算。
公平锁会确保请求锁和得到锁的顺序,若是在某个点锁正处于自由状态,这时有一个线程要尝试加锁,公平锁还必须查看当前有没有其它线程排在排队,而非公平锁能够直接插队。联想一下在肯德基买汉堡时的排队场景。
也许你会问,若是某个锁处于自由状态,那它怎么会有排队的线程呢?咱们假设此刻持有锁的线程刚刚释放了锁,它唤醒了等待队列中第一个节点线程,这时候被唤醒的线程刚刚从 park 方法返回,接下来它就会尝试去加锁,那么从 park 返回到加锁之间的状态就是锁的自由态,这很短暂,而这短暂的时间内还可能有其它线程也在尝试加锁。
其次还有一点须要注意,执行了 Lock.park 方法的线程自我休眠后,并非非要等到其它线程 unpark 了本身才会醒来,它可能随时会以某种未知的缘由醒来。咱们看源码注释,park 返回的缘由有四种
其它线程 unpark 了当前线程
时间到了天然醒(park 有时间参数)
其它线程 interrupt 了当前线程
其它未知缘由致使的「假醒」
文档中没有明确说明何种未知缘由会致使假醒,它却是说明了当 park 方法返回时并不意味着锁自由了,醒过来的线程在从新尝试获取锁失败后将会再次 park 本身。因此加锁的过程须要写在一个循环里,在成功拿到锁以前可能会进行屡次尝试。
计算机世界非公平锁的服务效率要高于公平锁,因此 Java 默认的锁都使用了非公平锁。不过现实世界彷佛非公平锁的效率会差一点,好比在肯德基若是能够不停插队,你能够想象现场确定一片混乱。为何计算机世界和现实世界会有差别,大概是由于在计算机世界里某个线程插队并不会致使其它线程抱怨。
public ReentrantLock() { this.sync = new NonfairSync(); } public ReentrantLock(boolean fair) { this.sync = fair ? new FairSync() : new NonfairSync(); }
复制代码
ReentrantLock 的锁是排他锁,一个线程持有,其它线程都必须等待。而 ReadWriteLock 里面的读锁不是排他锁,它容许多线程同时持有读锁,这是共享锁。共享锁和排他锁是经过 Node 类里面的 nextWaiter 字段区分的。
class AQS { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; boolean isShared() { return this.nextWaiter == SHARED; } }
复制代码
那为何这个字段没有命名成 mode 或者 type 或者干脆直接叫 shared?这是由于 nextWaiter 在其它场景还有不同的用途,它就像 C 语言联合类型的字段同样随机应变,只不过 Java 语言没有联合类型。
关于条件变量,须要提出的第一个问题是为何须要条件变量,只有锁还不够么?考虑下面的伪代码,当某个条件知足时,才去干某件事
void doSomething() { locker.lock(); while(!condition_is_true()) { // 先看能不能搞事 locker.unlock(); // 搞不了就歇会再看看能不能搞 sleep(1); locker.lock(); // 搞事须要加锁,判断能不能搞事也须要加锁 } justdoit(); // 搞事 locker.unlock(); }
复制代码
当条件不知足时,就循环重试(其它线程会经过加锁来修改条件),可是须要间隔 sleep,否则 CPU 就会由于空转而飙高。这里存在一个问题,那就是 sleep 多久很差控制。间隔过久,会拖慢总体效率,甚至会错过期机(条件瞬间知足了又当即被重置了),间隔过短,又回致使 CPU 空转。有了条件变量,这个问题就能够解决了
void doSomethingWithCondition() { cond = locker.newCondition(); locker.lock(); while(!condition_is_true()) { cond.await(); } justdoit(); locker.unlock(); }
复制代码
await() 方法会一直阻塞在 cond 条件变量上直到被另一个线程调用了 cond.signal() 或者 cond.signalAll() 方法后才会返回,await() 阻塞时会自动释放当前线程持有的锁,await() 被唤醒后会再次尝试持有锁(可能又须要排队),拿到锁成功以后 await() 方法才能成功返回。
阻塞在条件变量上的线程能够有多个,这些阻塞线程会被串联成一个条件等待队列。当 signalAll() 被调用时,会唤醒全部的阻塞线程,让全部的阻塞线程从新开始争抢锁。若是调用的是 signal() 只会唤醒队列头部的线程,这样能够避免「惊群问题」。
await() 方法必须当即释放锁,不然临界区状态就不能被其它线程修改,condition_is_true() 返回的结果也就不会改变。 这也是为何条件变量必须由锁对象来建立,条件变量须要持有锁对象的引用这样才能够释放锁以及被 signal 唤醒后从新加锁。建立条件变量的锁必须是排他锁,若是是共享锁被 await() 方法释放了并不能保证临界区的状态能够被其它线程来修改,能够修改临界区状态的只能是排他锁。这也是为何 ReadWriteLock.ReadLock 类的 newCondition 方法定义以下
public Condition newCondition() { throw new UnsupportedOperationException(); }
复制代码
有了条件变量,sleep 很差控制的问题就解决了。当条件知足时,调用 signal() 或者 signalAll() 方法,阻塞的线程能够当即被唤醒,几乎没有任何延迟。
当多个线程 await() 在同一个条件变量上时,会造成一个条件等待队列。同一个锁能够建立多个条件变量,就会存在多个条件等待队列。这个队列和 AQS 的队列结构很接近,只不过它不是双向队列,而是单向队列。队列中的节点和 AQS 等待队列的节点是同一个类,可是节点指针不是 prev 和 next,而是 nextWaiter。
class AQS { ... class ConditionObject { Node firstWaiter; // 指向第一个节点 Node lastWaiter; // 指向第二个节点 } class Node { static final int CONDITION = -2; static final int SIGNAL = -1; Thread thread; // 当前等待的线程 Node nextWaiter; // 指向下一个条件等待节点 Node prev; Node next; int waitStatus; // waitStatus = CONDITION } ... }
复制代码
ConditionObject 是 AQS 的内部类,这个对象里会有一个隐藏的指针 this$0 指向外部的 AQS 对象,ConditionObject 能够直接访问 AQS 对象的全部属性和方法(加锁解锁)。位于条件等待队列里的全部节点的 waitStatus 状态都被标记为 CONDITION,表示节点是由于条件变量而等待。
当条件变量的 signal() 方法被调用时,条件等待队列的头节点线程会被唤醒,该节点从条件等待队列中被摘走,而后被转移到 AQS 的等待队列中,准备排队尝试从新获取锁。这时节点的状态从 CONDITION 转为 SIGNAL,表示当前节点是被条件变量唤醒转移过来的。
class AQS { ... boolean transferForSignal(Node node) { // 重置节点状态 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false Node p = enq(node); // 进入 AQS 等待队列 int ws = p.waitStatus; // 再修改状态为SIGNAL if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } ... }
复制代码
被转移的节点的 nextWaiter 字段的含义也发生了变动,在条件队列里它是下一个节点的指针,在 AQS 等待队列里它是共享锁仍是互斥锁的标志。
Java 并发包经常使用类库依赖结构
下面咱们精细分析加锁过程,深刻理解锁逻辑控制。我必须确定 Dough Lea 的代码写成下面这样的极简形式,阅读起来仍是挺难以理解的。
class ReentrantLock { ... public void lock() { sync.acquire(1); } ... } class Sync extends AQS { ... public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ... }
复制代码
acquire 的 if 判断语句要分为三个部分,tryAcquire 方法表示当前的线程尝试加锁,若是加锁不成功就须要排队,这时候调用 addWaiter 方法,将当前线程入队。而后再调用 acquireQueued 方法,开始了 park 、醒来重试加锁、加锁不成功继续 park 的循环重试加锁过程。直到加锁成功 acquire 方法才会返回。
若是在循环重试加锁过程当中被其它线程打断了,acquireQueued 方法就会返回 true。这时候线程就须要调用 selfInterrupt() 方法给当前线程设置一个被打断的标识位。
// 打断当前线程,其实就是设置一个标识位 static void selfInterrupt() { Thread.currentThread().interrupt(); }
复制代码
线程如何知道本身被其它线程打断了呢?在 park 醒来以后调用 Thread.interrupted() 就知道了,不过这个方法只能调用一次,由于它在调用以后就会当即 clear 打断标志位。这也是为何 acquire 方法里须要调用 selfInterrupt() ,为的就是从新设置打断标志位。这样上层的逻辑才能够经过 Thread.interrupted() 知道本身有没有被打断。
acquireQueued 和 addWaiter 方法由 AQS 类提供,tryAcquire 须要由子类本身实现。不一样的锁会有不一样的实现。下面咱们来看看 ReentrantLock 的公平锁 tryAcquire 方法的实现
这里有个 if else 分支,其中 else if 部分表示锁的重入,当前尝试加锁的线程是已经持有了这把锁的线程,也就是同一个线程重复加锁,这时只须要增长计数值就好了。锁的 state 记录的就是加锁计数,重入一次就 +1。AQS 对象里有一个 exclusiveOwnerThread 字段,记录了当前持有排他锁的线程。
if(c == 0) 意味着当前锁是自由态,计数值为零。这时就须要争抢锁,由于同一时间可能会有多个线程在调用 tryAcquire。争抢的方式是用 CAS 操做 compareAndSetState,成功将锁计数值从 0 改为 1 的线程将得到这把锁,将当前的线程记录到 exclusiveOwnerThread 中。
代码里还有一个 hasQueuedPredecessors() 判断,这个判断很是重要,它的意思是看看当前的 AQS 等待队列里有没有其它线程在排队,公平锁在加锁以前须要 check 一下,若是有排队的,本身就不能插队。而非公平锁就不须要 check,公平锁和非公平锁的所有的实现差别就在于此,就这一个 check 决定了锁的公平与否。
下面咱们再看看 addWaiter 方法的实现,参数 mode 表示是共享锁仍是排他锁,它对应 Node.nextWaiter 属性。
addWaiter 须要将新的节点添加到 AQS 等待队列的队尾。若是队尾 tail 是空的意味着队列尚未初始化,那就须要初始化一下。AQS 队列在初始化时须要一个冗余的头部节点,这个节点的 thread 字段是空的。
将新节点添加到队尾也是须要考虑多线程并发的,因此代码里再一次使用了 CAS 操做 compareAndSetTail 来竞争队尾指针。没有竞争到的线程就会继续下一轮竞争 for(;;) 继续使用 CAS 操做将新节点往队尾添加。
下面咱们再看看 acquireQueue 方法的代码实现,它会重复 park、尝试再次加锁、加锁失败继续 park 的循环过程。
acquireQueue 在尝试加锁以前会先看看本身是否是 AQS 等待队列的第一个节点,若是不是它就继续去 park。这意味着无论是公平仍是非公平锁,在这里它们都统一采起了公平的方案,看看队列中是否是轮到本身了。也就是说「一朝排队,永远排队」。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
复制代码
线程在 park 返回醒来以后要当即检测一下是否被其它线程中断了。不过即便发生中断了,它还会继续尝试获取锁,若是获取不到,还会继续睡眠,直到锁获取到了才将中断状态返回。这意味着打断线程并不会致使死锁状态(拿不到锁)退出。
同时咱们还能够注意到锁是能够取消的 cancelAcquire(),准确地说是取消处于等待加锁的状态,线程处于 AQS 的等待队列中等待加锁。那什么状况下才会抛出异常而致使取消加锁呢,惟一的可能就是 tryAcquire 方法,这个方法是由子类实现的,子类的行为不受 AQS 控制。当子类的 tryAcquire 方法抛出了异常,那 AQS 最好的处理方法就是取消加锁了。cancelAcquire 会将当前节点从等待队列中移除。
解锁的过程要简单一些,将锁计数降为零后,唤醒等待队列中的第一个有效节点。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 解铃还须系铃人 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
复制代码
考虑到可重入锁,须要判断锁计数是否降为零才能够肯定锁是否完全被释放。只有锁完全被释放了才能唤醒后继等待节点。unparkSuccessor 会跳过无效节点(已取消的节点),找到第一个有效节点调用 unpark() 唤醒相应的线程。
读写锁分为两个锁对象 ReadLock 和 WriteLock,这两个锁对象共享同一个 AQS。AQS 的锁计数变量 state 将分为两个部分,前 16bit 为共享锁 ReadLock 计数,后 16bit 为互斥锁 WriteLock 计数。互斥锁记录的是当前写锁重入的次数,共享锁记录的是全部当前持有共享读锁的线程重入总次数。
读写锁一样也须要考虑公平锁和非公平锁。共享锁和互斥锁的公平锁策略和 ReentrantLock 同样,就是看看当前还有没有其它线程在排队,本身会乖乖排到队尾。非公平锁策略不同,它会比较偏向于给写锁提供更多的机会。若是当前 AQS 队列里有任何读写请求的线程在排队,那么写锁能够直接去争抢,可是若是队头是写锁请求,那么读锁须要将机会让给写锁,去队尾排队。 毕竟读写锁适合读多写少的场合,对于偶尔出现一个写锁请求就应该获得更高的优先级去处理。
读写锁的写锁加锁在总体逻辑上和 ReentrantLock 是同样的,不一样的是 tryAcquire() 方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
复制代码
写锁也须要考虑可重入,若是当前 AQS 互斥锁的持有线程正好是当前要加锁的线程,那么就是写锁在重入,重入只须要递增锁计数值便可。当 c!=0 也就是锁计数不为零时,既多是由于当前的 AQS 有读锁也多是由于有写锁,判断 w == 0 就是判断当前的计数是否是读锁带来的。
若是计数值为零,那就开始争抢锁。取决于锁是否公平,在争抢以前调用 writerShouldBlock() 方法看看本身是否须要排队,若是不须要排队,就可使用 CAS 操做来争抢,成功将计数值从 0 设置为 1 的线程将独占写锁。
读锁加锁过程比写锁要复杂不少,它在总体流程上和写锁同样,可是细节差距很大。特别是它须要为每个线程记录读锁计数,这部分逻辑占据了很多代码。
public final void acquireShared(int arg) { // 若是尝试加锁不成功, 就去排队休眠,而后循环重试 if (tryAcquireShared(arg) < 0) // 排队、循环重试 doAcquireShared(arg); }
复制代码
若是当前线程已经持有写锁,它还能够继续加读锁,这是为了达成锁降级必须支持的逻辑。锁降级是指在持有写锁的状况下,再加读锁,再解写锁。相比于先写解锁再加读锁而言,这样能够省去加锁二次排队的过程。由于锁降级的存在,锁计数中读写计数能够同时不为零。
wlock.lock(); if(whatever) { // 降级 rlock.lock(); wlock.unlock(); doRead(); rlock.unlock(); } else { // 不降级 doWrite() wlock.unlock(); }
复制代码
为了给每个读锁线程进行锁计数,它设置了一个 ThreadLocal 变量。
private transient ThreadLocalHoldCounter readHolds; static final class HoldCounter { int count; final long tid = LockSupport.getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
复制代码
可是 ThreadLocal 变量访问起来效率不够高,因此又设置了缓存。它会存储最近一次获取读锁线程的锁计数。在线程争用不是特别频繁的状况下,直接读取缓存会比较高效。
private transient HoldCounter cachedHoldCounter;
复制代码
Dough Lea 以为使用 cachedHoldCounter 仍是不够高效,因此又加了一层缓存记录 firstReader,记录第一个将读锁计数从 0 变成 1 的线程以及锁计数。当没有线程争用时,直接读取这两个字段会更加高效。
private transient Thread firstReader; private transient int firstReaderHoldCount; final int getReadHoldCount() { // 先访问锁全局计数的读计数部分 if (getReadLockCount() == 0) return 0; // 再访问 firstReader Thread current = Thread.currentThread(); if (firstReader == current) return firstReaderHoldCount; // 再访问最近的读线程锁计数 HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == LockSupport.getThreadId(current)) return rh.count; // 无奈读 ThreadLocal 吧 int count = readHolds.get().count; if (count == 0) readHolds.remove(); return count; }
复制代码
因此咱们看到为了记录这个读锁计数做者煞费苦心,那这个读计数的做用是什么呢?那就是线程能够经过这个计数值知道本身有没有持有这个读写锁。
读加锁还有一个自旋的过程,所谓自旋就是第一次加锁失败,那就直接循环重试,不休眠,听起来有点像死循环重试法。
final static int SHARED_UNIT = 65536 // 读计数是高16位 final int fullTryAcquireShared(Thread current) { for(;;) { int c = getState(); // 若是有其它线程加了写锁,仍是返回睡觉去吧 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; ... // 超出计数上限 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 拿到读锁了 ... return 1 } ... // 循环重试 } }
复制代码
由于读锁须要使用 CAS 操做来修改底层锁的总读计数值,成功的才能够得到读锁,获取读锁的 CAS 操做失败只是意味着读锁之间存在 CAS 操做的竞争,并不意味着此刻锁被别人占据了本身不能得到。多试几回确定能够加锁成功,这就是自旋的缘由所在。一样在释放读锁的时候也有一个 CAS 操做的循环重试过程。
protected final boolean tryReleaseShared(int unused) { ... for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { return nextc == 0; } } ... }