#0 系列目录#java
#1 什么是同步器# 多线程并发的执行,之间经过某种 共享 状态来同步,只有当状态知足 xxxx 条件,才能触发线程执行 xxxx 。
这个共同的语义能够称之为同步器
。能够认为全部的锁机制均可以基于同步器定制来实现的
。node
而juc(java.util.concurrent)里的思想是 将这些场景抽象出来的语义经过统一的同步框架来支持。juc 里全部的这些锁机制都是基于 AQS ( AbstractQueuedSynchronizer )框架上构建的
。下面简单介绍下 AQS( AbstractQueuedSynchronizer )。windows
咱们来看下java.util.concurrent.locks大体结构:多线程
上图中,LOCK的实现类其实都是构建在AbstractQueuedSynchronizer上
,为什么图中没有用UML线表示呢,这是每一个Lock实现类都持有本身内部类Sync的实例,而这个Sync就是继承AbstractQueuedSynchronizer(AQS)
。为什么要实现不一样的Sync呢?这和每种Lock用途相关。另外还有AQS的State机制
。下文会举例说明不一样同步器内的Sync与state实现。并发
#2 AQS框架如何构建同步器# ##2.1 同步器的基本功能## 一个同步器至少须要包含两个功能:框架
根据做者论文, aqs 同步机制同时考虑了以下需求:ui
独占锁和共享锁两种机制
。若是须要取消,须要支持中断
。若是有超时要求,应该支持超时后中断的机制
。##2.2 同步状态的获取与释放## AQS实现了一个同步器的基本结构,下面以独占锁与共享锁分
开讨论,来讲明AQS怎样实现获取、释放同步状态。this
###2.2.1 独占模式### 独占获取: tryAcquire 自己不会阻塞线程,若是返回 true 成功就继续,若是返回 false 那么就阻塞线程并加入阻塞队列。操作系统
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取失败,则加入等待队列 selfInterrupt(); }
独占且可中断模式获取:支持中断取消.net
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
独占且支持超时模式获取: 带有超时时间,若是通过超时时间则会退出。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
独占模式释放:释放成功会唤醒后续节点
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
###2.2.2 共享模式### 共享模式获取
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
可中断模式共享获取
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
共享模式带定时获取
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
共享锁释放
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
注意以上框架只定义了一个同步器的基本结构框架,基本方法里依赖的 tryAcquire 、 tryRelease 、tryAcquireShared 、 tryReleaseShared 四个方法在 AQS 里没有实现
,这四个方法不会涉及线程阻塞,而是由各自不一样的使用场景根据状况来定制:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
从以上源码能够看出AQS实现基本的功能: AQS虽然实现了acquire,和release方法是可能阻塞的,可是里面调用的tryAcquire和tryRelease是由子类来定制的且是不阻塞的
。能够认为同步状态的维护、获取、释放动做是由子类实现的功能,而动做成功与否的后续行为时有AQS框架来实现
。
##2.3 状态获取、释放成功或失败的后续行为:线程的阻塞、唤醒机制## 有别于wait和notiry。这里利用 jdk1.5 开始提供的 LockSupport.park() 和 LockSupport.unpark() 的本地方法实现,实现线程的阻塞和唤醒
。
获得锁的线程禁用(park)和唤醒(unpark),也是直接native实现(这几个native方法的实现代码在hotspot\src\share\vm\prims\unsafe.cpp文件中,可是关键代码park的最终实现是和操做系统相关的,好比windows下实现是在os_windows.cpp中,有兴趣的同窗能够下载jdk源码查看)。唤醒一个被park()线程主要手段包括如下几种:
以被park()线程为参数
的unpark(Thread thread)。中断被park()线程
,如waiters.peek().interrupt();waiters为存储线程对象的队列。park()方法返回并不会报告究竟是上诉哪一种返回,因此返回后最好检查下线程状态,如:
LockSupport.park(); // 禁用当前线程 if(Thread.interrupted){ //doSomething }
AbstractQueuedSynchronizer(AQS)对于这点实现得至关巧妙,以下所示:
private void doAcquireSharedInterruptibly(int arg)throwsInterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } //parkAndCheckInterrupt()会返回park住的线程在被unpark后的线程状态,若是线程中断,跳出循环。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // 只有线程被interrupt后才会走到这里 cancelAcquire(node); throw new InterruptedException(); } //在park()住的线程被unpark()后,第一时间返回当前线程是否被打断 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
##2.4 线程阻塞队列的维护## 阻塞线程节点队列 CHL Node queue 。
根据论文里描述, AQS 里将阻塞线程封装到一个内部类 Node 里。并维护一个 CHL Node FIFO 队列
。 CHL队列是一个非阻塞的 FIFO 队列
,也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是经过自旋锁和 CAS 保证节点插入和移除的原子性
。实现无锁且快速的插入。CHL队列对应代码以下:
/** * CHL头节点 */ private transient volatile Node head; /** * CHL尾节点 */ private transient volatile Node tail;
Node节点是对Thread的一个封装,结构大概以下:
static final class Node { /** 表明线程已经被取消*/ static final int CANCELLED = 1; /** 表明后续节点须要唤醒 */ static final int SIGNAL = -1; /** 表明线程在等待某一条件/ static final int CONDITION = -2; /** 标记是共享模式*/ static final Node SHARED = new Node(); /** 标记是独占模式*/ static final Node EXCLUSIVE = null; /** * 状态位 ,分别可使CANCELLED、SINGNAL、CONDITION、0 */ volatile int waitStatus; /** * 前置节点 */ volatile Node prev; /** * 后续节点 */ volatile Node next; /** * 节点表明的线程 */ volatile Thread thread; /** *链接到等待condition的下一个节点 */ Node nextWaiter; }
##2.5 小结## 从源码能够看出AQS实现基本的功能:
AQS虽然实现了acquire,和release方法,可是里面调用的tryAcquire和tryRelease是由子类来定制的。能够认为同步状态的维护、获取、释放动做是由子类实现的功能,而动做成功与否的后续行为时有AQS框架来实现,还有如下一些私有方法,用于辅助完成以上的功能:
final boolean acquireQueued(final Node node, int arg) :申请队列 private Node enq(final Node node) : 入队 private Node addWaiter(Node mode) :以mode建立建立节点,并加入到队列 private void unparkSuccessor(Node node) : 唤醒节点的后续节点,若是存在的话。 private void doReleaseShared() :释放共享锁 private void setHeadAndPropagate(Node node, int propagate):设置头,而且若是是共享模式且propagate大于0,则唤醒后续节点。 private void cancelAcquire(Node node) :取消正在获取的节点 private static void selfInterrupt() :自我中断 private final boolean parkAndCheckInterrupt() : park 并判断线程是否中断
#3 AQS在各同步器内的Sync与State实现# ##3.1 什么是state机制## 提供 volatile 变量 state; 用于同步线程之间的共享状态。经过 CAS 和 volatile 保证其原子性和可见性
。对应源码里的定义:
/** * 同步状态 */ private volatile int state; /** *cas */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
##3.2 不一样实现类的Sync与State## 基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不同而已
。
###3.2.1 ReentrantLock### 须要记录当前线程获取原子状态的次数,若是次数为零,那么就说明这个线程放弃了锁(也有可能其余线程占据着锁从而须要等待),若是次数大于1,也就是得到了重进入的效果
,而其余线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。如下为ReetranLock的FairSync的tryAcquire实现代码解析。
//公平获取锁 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //若是当前重进入数为0,说明有机会取得锁 if (c == 0) { //若是是第一个等待者,而且设置重进入数成功,那么当前线程得到锁 if (isFirst(current) && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若是当前线程自己就持有锁,那么叠加剧进入数,而且继续得到锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //以上条件都不知足,那么线程进入等待队列。 return false; }
###3.2.2 Semaphore### 则是要记录当前还有多少次许可可使用,到0,就须要等待
,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。如下为Semaphore的FairSync实现:
protected int tryAcquireShared(int acquires) { Thread current = Thread.currentThread(); for (;;) { Thread first = getFirstQueuedThread(); //若是当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程须要等待 if (first != null && first != current) return -1; //若是当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,而且减去当前线程须要的许可证获得剩下的值 int available = getState(); int remaining = available - acquires; //若是remining<0,那么反馈给AQS当前线程须要等待,若是remaining>0,而且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知AQS当前线程拿到许可,能够继续执行。 if (remaining < 0 ||compareAndSetState(available, remaining)) return remaining; } }
###3.2.3 CountDownLatch### 闭锁则要保持其状态,在这个状态到达终止态以前,全部线程都会被park住
,闭锁能够设定初始值,这个值的含义就是这个闭锁须要被countDown()几回,由于每次CountDown是sync.releaseShared(1)
,而一开始初始值为10的话,那么这个闭锁须要被countDown()十次,才可以将这个初始值减到0,从而释放原子状态,让等待的全部线程经过。
//await时候执行,只查看当前须要countDown数量减为0了,若是为0,说明能够继续执行,不然须要park住,等待countDown次数足够,而且unpark全部等待线程 public int tryAcquireShared(int acquires) { return getState() == 0? 1 : -1; } //countDown 时候执行,若是当前countDown数量为0,说明没有线程await,直接返回false而不须要唤醒park住线程,若是不为0,获得剩下须要 countDown的数量而且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS断定是否释放全部await线程。 public boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
###3.2.4 FutureTask### 须要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法
,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续仍是park
,而FutureTask的tryAcquireShared方法所作的惟一事情就是检查状态,若是是RUNNING状态那么让当前线程park
。而跑任务的线程会在任务结束时调用FutureTask 实例的set方法(与等待线程持相同的实例),设定执行结果,而且经过unpark唤醒正在等待的线程,返回结果
。
//get时待用,只检查当前任务是否完成或者被Cancel,若是未完成而且没有被cancel,那么告诉AQS当前线程须要进入等待队列而且park住 protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } //断定任务是否完成或者被Cancel boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; } //get时调用,对于CANCEL与其余异常进行抛错 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0,nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } //任务的执行线程执行完毕调用(set(V v)) void innerSet(V v) { for (;;) { int s = getState(); //若是线程任务已经执行完毕,那么直接返回(多线程执行任务?) if (s == RAN) return; //若是被CANCEL了,那么释放等待线程,而且会抛错 if (s == CANCELLED) { releaseShared(0); return; } //若是成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工做(通常由FutrueTask的子类实现) if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); return; } } }
以上4个AQS的使用是比较典型,然而有个问题就是这些状态存在哪里呢?而且是能够计数的。从以上4个example,咱们能够很快获得答案,AQS提供给了子类一个int state属性
。而且暴露给子类getState()和setState()两个方法(protected)。这样就为上述状态解决了存储问题,RetrantLock能够将这个state用于存储当前线程的重进入次数
,Semaphore能够用这个state存储许可数
,CountDownLatch则能够存储须要被countDown的次数
,而Future则能够存储当前任务的执行状态(RUNING,RAN,CANCELL)
。其余的Synchronizer存储他们的一些状态。
AQS留给实现者的方法主要有5个方法,其中tryAcquire,tryRelease和isHeldExclusively三个方法为须要独占形式获取的synchronizer实现的
,好比线程独占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared为须要共享形式获取的synchronizer实现
。
ReentrantLock内部Sync类实现的是tryAcquire,tryRelease, isHeldExclusively三个方法
(由于获取锁的公平性问题,tryAcquire由继承该Sync类的内部类FairSync和NonfairSync实现
);Semaphore内部类Sync则实现了tryAcquireShared和tryReleasedShared
(与CountDownLatch类似,由于公平性问题,tryAcquireShared由其内部类FairSync和NonfairSync实现
)。CountDownLatch内部类Sync实现了tryAcquireShared和tryReleasedShared
。FutureTask内部类Sync也实现了tryAcquireShared和tryReleasedShared
。