在Lock接口出现以前,Java程序依靠 synchronized 关键字实现锁的功能。锁提供了相似的同步功能,只是在使用时须要显式获取和释放锁,同时还拥有了锁的获取释放的操做性、可中断的获取锁以及超时获取锁等多种 synchronized 不具有的同步特性。java
Lock接口提供的 synchronized 接口不具有的特性:node
Lock的经常使用方法:编程
void lock()
获取锁,获取锁后返回void lockInterruptibly throws InterruptedException
可中断的获取锁,在锁的获取中能够中断当前线程boolean tryLock()
尝试非阻塞的获取锁,调用后当即返回,获取了返回true,不然返回falseboolean tryLock(long time,TimeUnit unit) throws InterruptedException
超时获取锁。当前线程在时间内得到了锁,返回true;当前线程在时间内被中断,抛出异常;超出时间,返回falsevoid unlock()
释放锁Condition newCondition()
获取等待通知组件,该组件与当前的锁绑定。当前线程获取了锁,才能调用该组件的wait()方法,调用后,当前线程将释放锁队列同步器(AbstractQueuedSynchronizer)是用来构建锁或者其余同步组件的基础框架,它使用了一个int成员变量表示同步状态,经过内置的FIFO队列完成资源获取线程的排队工做。安全
同步器的时机是基于模板方法模式,使用者须要继承同步器并重写特定方法。重写时,使用如下三个方法访问或修改同步状态:并发
getState()
获取当前同步状态setState(int newState)
设置当前同步状态compareAndSetState(int expect,int update)
使用CAS设置当前状态,该方法能保证状态设置的原子性除此以外,同步器提供了模板方法,分为三类:独占式获取与释放同步状态,共享式获取与释放同步状态,查询同步队列中的等待线程状况。框架
AQS 内部依赖一个同步的 FIFO 双向队列来完成同步状态的管理。当前线程获取同步状态失败时,将当前线程以及等待状态等信息构形成一个节点并将其加入同步队列,同时阻塞该线程;当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。ui
同步队列中,一个几点表明一个线程,保存着线程的引用、等待状态、前驱和后继节点。this
等待状态包含以下:spa
同步器提供了一个基于CAS的设置尾节点的方法compareAndSetTail(Node expect,Node update)
,传递的两个参数是当前线程认为的尾节点和当前的节点,只有设置成功后,当前节点才正式与以前的尾节点创建关联。线程
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,会唤醒后继节点,后继节点将会在获取同步状态成功时将本身设置为首节点。
独占式也就是同一时刻仅有一个线程持有同步状态。
独占式同步状态获取采用acquire(int arg)
方法。该方法对中断不敏感,因为线程获取同步状态失败加入到同步队列中,后序对线程进行中断操做时,线程不会从队列中移除。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//获取同步状态
//首先生成节点加入队列
//而后等待前驱节点成为头节点并获取同步状态
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
addWaiter(Node mode)
用来构造节点以及加入同步队列。经过使用compareAndSetTail(Node expect,Node update)
来确保节点能被线程安全添加。在enq(final Node node)
方法中,使用死循环保证节点的正确添加。在死循环中,只有经过CAS将节点设置为尾节点以后,当前线程才能从该方法返回。
此时,节点进入同步队列以后,进入了一个自选的过程。当条件知足,得到了锁,退出队列。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//当前驱是头结点,尝试获取同步状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
在该方法中,只有前驱是头节点才能尝试获取同步状态。缘由有两个:1. 头节点是成功获取到同步状态的节点,头节点线程释放锁,唤醒其后继节点;2. 维护同步队列的FIFO原则。
总体流程以下图所示:
当前线程获取同步状态并执行了相应逻辑后,须要释放同步状态,使得后续节点能继续获取同步状态。调用release(int arg)
方法释放同步状态,在释放同步状态以后,会唤醒其后继节点。
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒头节点的后继节点
return true;
}
return false;
}
复制代码
总结:获取同步状态时,维护一个同步队列,获取状态失败的线程加入队列并进行自旋;移出队列的条件是前驱为头节点且成功获取同步状态。释放时,先释放同步状态,而后唤醒头节点的后继节点。
共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻能够有多个线程获取同步状态。例如读操做能够有多个线程同时进行,而写操做同一时刻只能有一个线程进行写操做,其余操做都会被阻塞。
AQS提供了acquireShared(int arg)
共享式获取同步状态
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//前驱是头结点,获取同步状态
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
acquireShared(int arg)
方法中,首先调用tryAcquireShared(int arg)
尝试获取同步状态,返回一个int,当返回值大于等于0,表示能获取到同步状态。不然,获取失败调用doAcquireShared(int arg)
自旋获取同步状态。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
该方法释放同步状态后,将会唤醒后续处于等待状态的节点。由于可能会存在多个线程同时进行释放同步状态资源,因此须要确保同步状态安全地成功释放,通常都是经过CAS和循环来完成的。
AQS提供了tryAcquireNanos(int arg,long nanos)
方法,是acquireInterruptibly(int arg)
的加强。除了响应中断以外,还有超时控制。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
复制代码
其中,doAcquireNanos(arg, nanosTimeout)
用来超时获取同步状态。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//获取同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//获取失败
//从新计算须要的休眠时间
nanosTimeout = deadline - System.nanoTime();
//超时返回
if (nanosTimeout <= 0L)
return false;
//未超时,等待
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//中断处理
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
与独占式获取同步状态的区别在于未获取到同步状态时的处理逻辑。独占式在未获取到同步状态时,会使当前线程一致处于等待状态,而超时获取会使当前线程等待nanosTimeout纳秒,若是没有获取到,将返回。
可重入锁也就是支持从新进入的锁,它表示该锁能支持一个线程对资源的重复加锁。它能够等同于 synchronized 的使用(synchronized 隐式支持重入),可是提供了比 synchronized 更强大更灵活的锁机制,能够减小死锁发生的几率。
ReentrantLock 还提供了公平锁和非公平锁的选择,构造方法中接受一个可选的公平参数,默认是非公平的。公平锁也就是等待时间最长的线程最优先获取锁。可是公平锁的效率每每没有非公平的几率高。
ReentrantLock 有一个内部类 Sync,Sync 继承AQS,有两个子类:公平锁 FairSync 和非公平锁 NonfairSync。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//锁处于空闲状态
if (c == 0) {
//获取锁成功,设置为当前线程全部
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断锁持有的线程是不是当前线程
//若是是持有锁的线程再次请求,将同步状态值进行增长并返回true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
首先判断同步状态,若是为0说明尚未被线程持有,经过CAS获取同步状态,若是成功返回true。不然,判断当前线程是否为获取锁的线程,若是是则获取锁,成功返回true。成功过去锁的线程再次获取锁,并将同步状态值增长。
protected final boolean tryRelease(int releases) {
//减掉releases
int c = getState() - releases;
//若是释放的不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state == 0 表示已经释放彻底了,其余线程能够获取同步状态了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
复制代码
将同步状态是否为0做为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
若是一个锁是公平的,那么按照请求的绝对时间顺序也就是FIFO进行锁的获取。公平锁的获取方法以下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
与非公平锁获取锁的方法不一样,二者区别在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()
。该方法用来判断当前线程是否位于CLH同步队列中的第一个,是则返回true,不然返回false。
公平锁保证了锁的获取按照FIFO原则,代价是进行大量的线程切换。非公平锁虽然可能形成线程“饥饿”,可是极少的线程切换,保证了其更大的吞吐量。
ReentrantLock 是排他锁,同一时刻只容许一个线程进行访问,而读写锁在同一时刻能够容许多个读线程访问,写线程访问时,其余的线程均被阻塞。经过读锁和写锁分离,使得并发性相比通常的排他锁有了很大提高。
读写锁的主要特性:
ReentrantReadWriteLock 实现了接口 ReadWriteLock,维护了一对相关的锁。
public interface ReadWriteLock {
Lock readLock();//返回读锁
Lock writeLock();//返回写锁
}
复制代码
ReentrantReadWriteLock 使用了三个内部类
/** 内部类 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** 同步器 */
final Sync sync;
复制代码
构造方法以下:
/** 使用默认(非公平)的排序属性建立一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}
/** 使用给定的公平策略建立一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
复制代码
读写锁一样依赖AQS实现同步功能,读写状态就是其同步器的同步状态。所以其同步器须要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
在 ReentrantLock 中使用一个 int 类型的 state 表示同步状态,表示锁被一个线程重复获取的次数。读写锁须要用一个变量维护多种状态,因此采用了“按位切割使用”的方式维护这个变量。读写锁将变量切分为两个部分,高16位表示读,低16位表示写。分割以后,读写锁经过位运算肯定读和写各自的状态。假设当前状态为S,写状态等于 S&0X0000FFFF (抹去高16位),读状态等于 S>>>16 (无符号补0右移16位)。写状态增长1,S = S+1,读状态加1,S = S + ( 1 << 16 )。
写锁是一个支持重进入的排它锁。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//当前同步状态
int c = getState();
//写锁状态
int w = exclusiveCount(c);
if (c != 0) {
//c != 0 && w == 0 表示存在读锁
//当前线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//超出最大范围
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
//是否须要阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置获取锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
复制代码
首先获取同步状态和写锁的同步状态。若是存在读锁,或者当前线程不是持有写锁的线程,不能得到写锁。若是能获取写锁,且未超出最大范围,则更新同步状态并返回true。
写锁的释放与 ReentrantLock 释放相似,每次释放减小写状态,当写状态为0表示写锁已经被释放。
// WriteLock类提供了unlock()释放写锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {//调用AQS方法释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
//这是定义在Sync类中的方法
protected final boolean tryRelease(int releases) {
//释放的线程不为锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//若写锁的新线程数为0,则将锁的持有者设置为null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
复制代码
读锁是一个支持重进入的共享锁,能被多个线程同时获取。
在写状态为0时,读锁总会被成功获取,而后增长读状态。若是当前线程已经获取了读锁,则增长读状态;若是获取读锁时,写锁已经被其余线程获取,则进入等待状态。读状态时全部线程获取读锁次数的总和,而每一个线程各自获取读锁的次数只能保存在 ThreadLocal 中。
读锁的每次释放均减小读状态。
锁降级指的是写锁降级为读锁。锁降级是指把持有的写锁把持住,再获取到读锁,而后释放拥有的写锁的过程。也就是须要遵循先获取写锁、获取读锁再释放写锁的次序才能够称为锁降级。
锁降级中读锁的获取是必要的,主要是为了保证数据的可见性。若是当前线程A不获取读锁而是直接释放写锁,此刻另外一线程B获取了写锁并修改了数据,那么当前线程A没法感知线程B的数据更新。若是当前线程A遵循锁降级的规则,则线程B会被阻塞,直到当前线程A使用数据并释放读锁以后,线程B才能获取写锁进行数据更新。
在 synchronized 控制同步时,配合 Object 的 wait(), notify(), notifyAll() 等方法实现等待/通知模式。Lock 提供了条件 Condition 接口,二者配合实现了等待/通知模式。
Condition 定义了等待/通知两种类型的方法,线程调用这些方法时,须要提早获取 Condition 关联的锁。Condition 对象是由 Lock 对象建立出来的。
public class ConditionCase {
Lock lock=new ReentrantLock();
Condition condition=lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal(){
lock.lock();
try{
condition.signal();
}finally {
lock.unlock();
}
}
}
复制代码
将 Condition 做为成员变量,调用 await()
方法形成当前线程在接到信号或被中断以前一直处于等待状态。调用signal()
或者signalAll()
方法会唤醒一个或全部的等待线程,可以从等待方法返回的线程必须获取与 Condition 相关的锁。
经过 Lock 的newCondition()
方法获取 Condition。Condition 是一个接口,其为一个的实现类是 ConditionObject,且是同步器AQS的内部类。
每一个 ConditionObject 包含一个FIFO的队列,在队列中的每一个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程。当一个线程调用 await()
方法,那么该线程将释放锁,构形成节点加入等待队列并进入等待状态。此处的节点依然是AQS的内部类 Node。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
//头节点
private transient Node firstWaiter;
//尾节点
private transient Node lastWaiter;
public ConditionObject() {
}
/** 省略方法 **/
}
复制代码
Condition 拥有头节点和尾节点的引用。当一个线程调用await()
方法,将该线程构形成一个几点,加入队列尾部。
在 Object 的监视器模型上,一个对象拥有一个同步队列和一个等待队列;并罚保中的同步器拥有一个同步队列和多个等待队列,每一个 Condition 对应一个等待队列。
调用 Condition 的await()
系列方法将使当前线程释放锁并进入等待状态。当从该方法返回时,当前线程必定获取了 Condition 相关的锁。从队列的角度看,当调用该方法时,至关于同步队列的首节点(也就是获取了锁的节点)移动到 Condition 的等待队列中。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//检查该节点的线程是否在同步队列上,若是不在,还不具有竞争锁的资格,继续等待
while (!isOnSyncQueue(node)) {
//挂起线程
LockSupport.park(this);
//线程已经中断则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
调用该方法的线程是同步队列中的首节点(获取锁的线程)。该方法将当前线程构形成节点加入等待队列,释放同步状态,唤醒后继节点,而后当前线程进入等待状态。而后不断监测该节点表明的线程是否出如今同步队列中(也就是收到了signal信号),若是不是,则挂起;不然开始竞争同步状态。
调用 Condition 的 signal()
方法,将会唤醒在等待队列的首节点。在唤醒节点以前,将节点移入同步队列中。
public final void signal() {
//检测当前线程是否为拥有锁的独
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒
}
复制代码
该方法首先会判断当前线程是否已经获取了锁,而后唤醒等待队列中的头节点,具体来讲,先将条件队列中的头节点移出,而后调用AQS的enq(Node node)
方法将其安全地移到CLH同步队列中。当节点移动到同步队列中,当前线程再唤醒该节点的线程。
一个线程获取锁后,经过调用Condition的await()
方法,会将当前线程先加入到条件队列中,而后释放锁,最后经过isOnSyncQueue(Node node)
方法不断自检看节点是否已经在CLH同步队列了,若是是则尝试获取锁,不然一直挂起。当线程调用signal()
方法后,程序首先检查当前线程是否获取了锁,而后经过doSignal(Node first)
方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()
方法中的while循环中退出来,而后调用acquireQueued()
方法竞争同步状态。
//生产者-消费者问题
public class ConditionCase {
private LinkedList<String> buffer;
private int maxSize;
private Lock lock;
private Condition fullCondition;
private Condition notFullCondition;
public ConditionCase(int maxSize){
this.maxSize=maxSize;
buffer=new LinkedList<>();
lock=new ReentrantLock();
fullCondition=lock.newCondition();
notFullCondition=lock.newCondition();
}
public void set(String string) throws InterruptedException {
lock.lock();
try {
while(maxSize==buffer.size()){
notFullCondition.await();
}
buffer.add(string);
fullCondition.signal();
} finally {
lock.unlock();
}
}
public String set() throws InterruptedException {
String string;
try {
lock.lock();
while(buffer.size()==0){
fullCondition.await();
}
string=buffer.poll();
notFullCondition.signal();
} finally {
lock.unlock();
}
return string;
}
}
复制代码
参考资料