在 Java5
以前,只能使用 synchronized
关键字来实现锁。它使用起来比较简单,可是有一些局限性:java
而在 Java5
中,并发包中增长了 Lock
接口及其实现类,它的功能与 synchronized
相似,须要进行显示地获取和释放锁,可是却提供了不少 synchronized
不具备的特性。举一个例子:node
Lock lock = new ReentrantLock();
lock.lock();
try {
//
} finally {
lock.unlock();
}
复制代码
注意的是获取锁的 lock
方法应该写在 try
块以外,由于若是写在 try
块中,获取锁时发生了异常,抛出异常的同时也会致使锁无端释放,而不是等到执行 finally
语句时才释放锁。编程
在 Lock
接口中,定义了锁获取和释放的基本操做,包括可中断的获取锁、超时获取锁等特性:安全
public interface Lock {
// 获取锁
void lock();
// 可中断地获取锁,即获取锁时,其余线程能够中断当前线程
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,调用后会当即返回,能获取就返回 true,不然返回 false
boolean tryLock();
// 在给定时间内可中断地尝试获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 返回一个绑定到该 Lock 实例上的 Condition
// 只有当前线程持有了锁,才能调用 await 方法,await 方法的调用将会自动释放锁
Condition newCondition();
}
复制代码
Lock
接口的主要实现就是 ReentrantLock
。而 Lock
接口的实现基本都是经过内部实现了一个同步器 AQS
的子类来实现线程访问控制的。并发
同步器 AbstractQueuedSynchronizer
,是用来构建锁或其余同步组件的基础框架。它使用一个 int
成员变量表示同步状态,经过内置的 FIFO
同步队列来完成线程获取资源时的排队等待工做。框架
在自定义同步组件时,推荐定义一个静态内部类,使其继承自同步器 AQS
并实现它的抽象方法来管理同步状态,在实现抽象方法时,对同步状态的管理可使用同步器提供的三个方法。oop
private volatile int state;
// 获取当前同步状态
protected final int getState() {
return state;
}
// 设置当前同步状态
protected final void setState(int newState) {
state = newState;
}
// 使用 CAS 设置当前状态,保证原子性
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
同步器是实现同步组件的关,它们两者的关系以下:ui
同步器是基于模板方法模式的。使用者须要继承同步器并重写指定的方法。而可重写的方法主要有:this
方法名 | 描述 |
---|---|
tryAcquire | 独占式获取同步状态 |
tryRelease | 独占式释放同步状态 |
tryAcquireShared | 共享式获取同步状态 |
tryReleaseShared | 共享式释放同步状态 |
isHeldExclusively | 判断同步器是否被线程独占 |
随后将同步器组合到自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法会调用使用者重写的方法。spa
可调用的模板方法主要有三类:独占式获取与释放同步状态、共享式获取与释放状态、以及查询同步队列中的等待线程状况。下文会介绍它们,并简单分析其实现原理。
同步器内部使用一个 FIFO
同步队列来管理同步状态,在线程获取同步状态失败时,同步器会将当前线程与等待状态等信息构形成一个节点,将其加入到同步队列中,同时会阻塞当前线程。当释放同步状态时,则会唤醒队列中首节点的线程,使其再次尝试获取同步状态。
同步队列中的节点的主要属性有:
static final class Node {
// 等待状态
volatile int waitStatus;
// 前驱节点,在入队时被赋值
volatile Node prev;
// 后继节点,
volatile Node next;
// 加入节点的线程,该线程获取到同步状态
volatile Thread thread;
}
复制代码
等待状态 waitStatus
的取值主要有:
// 同步队列中等待的线程等待超时或被中断,须要取消等待,以后节点的状态将不会再改变
static final int CANCELLED = 1;
// 后继节点的线程处于等待状态
// 当前节点的线程释放或取消同步状态时,会唤醒它的后继节点
static final int SIGNAL = -1;
// 节点目前在等待队列中
// 当节点被唤醒时,从等待队列转移到同步队列中,尝试获取同步状态
static final int CONDITION = -2;
// 共享式同步状态被传播给其余节点
static final int PROPAGATE = -3;
//初始化 waitStatus 值为 0
复制代码
同步器中包含两个引用,分别指向同步队列的首节点和尾节点:
// 头节点,惰性初始化
private transient volatile Node head;
// 尾节点,惰性初始化
private transient volatile Node tail;
复制代码
当线程没法获取同步状态,会将该线程构形成一个节点加入同步队列中,使用 addWaiter
方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
若是快速尝试添加尾节点失败,则调用 enq
方法经过死循环来保证节点的正确添加:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 若是未初始化,则会先初始化,再继续尝试
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
而这个过程可能会有多个线程同时执行,因此必需要保证线程安全,提供了基于 CAS
的设置尾节点的方法:
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
复制代码
同步队列中,首节点是获取同步状态成功的节点,线程在释放同步状态时,会唤醒后继节点,后继节点成功获取同步状态时将本身设置为首节点,因为只有一个线程能获取到同步状态,因此设置头节点的方法不须要 CAS
方法保证:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
复制代码
独占式获取与释放同步状态主要有四个模板方法,分别是:
方法名 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态 |
void acquireInterruptibly(int arg) | 可响应中断的独占式获取同步状态 |
boolean tryAcquireNanos(int arg, long nanos) | 可响应中断的独占式超时获取同步状态 |
boolean release(int arg) | 独占式释放同步状态 |
acquire
方法能够获取同步状态,该方法为独占式获取,不可中断,也就是若是线程获取同步状态失败,加入到同步队列中,后续对线程进行中断操做,线程并不会被移除。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
acquire
方法中,首先调用 tryAcquire
方法尝试获取同步状态,该方法由自定义组件本身实现。若是获取失败,调用 addWaiter
方法将当前线程加入到同步队列末尾。最后调用 acquiredQueued
方法经过死循环的方式来获取同步状态:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
该方法中,经过死循环的方式来获取同步状态,而且只有前驱节点是头节点时,才可以尝试获取同步状态,这样作就是为了保持 FIFO
同步队列原则,即先加入到同步队列中的线程先尝试获取同步状态。
另外,在自旋时首先会调用 shouldParkAfterFailedAcquire
方法判断是否应该被阻塞:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱节点状态为 SIGNAL ,则当前节点能够被阻塞
return true;
if (ws > 0) {
// 前驱节点处于取消状态,也就是超时或被中断,须要从同步队列中删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 不然,将当前节点设置为 SIGNAL,不会阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
该方法主要是根据前驱节点的 waitStatus
来判断当前节点的线程,若是当前节点应该被阻塞,则会调用 parkAndCheckInterrupt
方法阻塞:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
该方法调用 LockSupport.park()
方法阻塞当前线程,并返回当前线程的中断状态。
acquireInterruptibly
方法以可响应中断的方式获取同步状态,其中调用 tryAcquire
方法失败后,会调用 doAcquireInterruptibly
方法自旋式获取。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
复制代码
doAcquireInterruptibly
方法与普通地独占式获取同步状态很是相似,只是再也不使用 interrupt
标志,而是直接抛出 InterruptedException
异常。
tryAcquireNanos
方法能够超时获取同步状态,即在指定时间内可中断地获取同步状态。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
复制代码
该方法首先调用 tryAcquire
方法尝试获取同步状态,若是获取失败,则会调用 doAcquireNanos
方法:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算总的超时时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 剩余的超时时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 若是超时时间大于 临界值,则会阻塞线程,不然快速自旋
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
该方法中,首先计算出超时的最终时间,而后将当前节点加入到同步队列中。
而后自旋进行判断,若是当前节点为头节点,则会调用 tryAcquire
方法尝试获取同步状态;不然从新计算超时时间,若是 nanosTimeout
小于 0
,则获取失败。不然继续判断超时时间是否大于 spinForTimeoutThreshold
临界值,若是大于表示时间较长,调用 LockSupport.parkNanos
使线程阻塞。
若是时间较短,则直接进入自旋过程,继续判断。另外,还会判断线程是否被中断。
release
方法用来释放同步状态,该方法释放了同步状态后,会唤醒后继节点,使其从新尝试获取同步状态。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
该方法中,首先调用 tryRelease
方法尝试释放同步状态,该方法由自定义同步组件本身实现。而后调用 unparkSuccessor
方法来唤醒后继节点:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 节点状态设置为 0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 若是后继节点超时或者被中断
if (s == null || s.waitStatus > 0) {
s = null;
// 从 tail 向前,找最靠近 head 的可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
该方法首先找到一个可用的 waitStatus
值大于 0
的节点,而后调用 LockSupport.unpark
方法唤醒该线程。
共享式与独占式最大的区别就是同一时刻有多个线程同时获取到同步状态。
共享式获取与释放同步状态主要有四个模板方法,分别是:
方法名 | 描述 |
---|---|
acquireShared(int arg) | 共享式获取同步状态 |
acquireSharedInterruptibly(int arg) | 可响应中断的共享式获取同步状态 |
tryAcquireSharedNanos(int arg, long anos) | 可响应中断的共享式超时获取同步状态 |
releaseShared(int arg) | 共享式释放同步状态 |
acquireShared
方法能够共享式地获取同步状态:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
该方法中,首先调用 tryAcquireShared
方法尝试获取同步状态,若是返回值大于等于 0
,则表示获取成功。不然获取失败,则会调用 doAcquireShared
方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 大于等于 0,表示获取成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
首先以共享节点加入到等待队列中,而后以死循环的方式进行判断,若是当前节点的前驱节点为头节点,则调用 doAcquireShared
方法尝试获取同步状态,直到其返回值大于等于 0
。
可响应中断、超时获取的共享式获取同步状态与以前相似,这里也就很少介绍。
releaseShared
方法用于共享式释放同步状态,
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
该方法首先调用 tryReleaseShared
尝试释放同步状态,若是释放失败,则会调用 doReleaseShared
方法;
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
该方法中在释放同步状态时,因为有多个线程,须要保证线程安全。首先,若是后继节点的线程须要唤醒,则将当前节点的状态设置为 0
,而后调用 unparkSuccessor
方法唤醒后继节点。