目录java
ReentrantLock是基于同步器AbstractQueuedSynchronizer(AQS)实现的独占式重入锁,支持公平锁、非公平锁(默认是非公平锁)、申请锁可响应中断以及限时获取锁等高级功能,分析ReentrantLock就离不开同步器AQS,关系图以下:node
在AQS中实现了如何获取锁和释放锁的模板方法,重入锁ReentrantLock实现时经过内部类继承Sync同步器AbstractQueuedSynchronizer。并调用同步器提供的模板方法,而这些模板方法将会调用ReentrantLock重写的方法,这是典型的模板方法设计模式。AQS实现同步器功能离不开三大基础组件:设计模式
AQS中使用了一个int型的volatile变量来表示同步状态,线程在尝试获取锁的时候,就回去比较同步器同步状态state是否为0,为0,那么线程就拿到了锁并改变同步状态;不为0,说明有其余线程拿到了锁。AQS中提供了如下三个方法来访问或修改同步状态:数据结构
//AQS成员变量,同步状态 private volatile int state; //获取当前同步状态 protected final int getState() { return state; } //设置当前同步状态 protected final void setState(int newState) { state = newState; } //使用CAS设置当前状态,该方法可以保证状态设置的原子性 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
当有多个线程竞争获取锁时,只有一个线程能获取到锁,那么这些没有获取到锁的线程就须要等待,等到线程把锁释放了再唤醒等待线程去获取锁,为了实现等待-唤醒机制,AQS提供了基于CLH队列(Craig, Landin,Hagersten)实现的等待队列,是一个先入先出的双向队列。同步队列是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是经过自旋锁和CAS保证节点插入和移除的原子性。
并发
AQS中的内部类Node是构建同步队列和等待队列(后面介绍Condition再介绍)的基础节点类,Node类部分源码以下:函数
static final class Node { //等待状态 volatile int waitStatus; //前驱结点 volatile Node prev; //后继节点 volatile Node next; //等待获取锁的线程 volatile Thread thread; //condition队列的后继节点 Node nextWaiter; }
关于节点Node的waitStatus,它反映的是节点中线程的等待状态,有以下取值:ui
从关系图能够看出,ReentrantLock实现了Lock接口,内部类Sync是AQS的子类,Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。ReentrantLock只有一个成员变量sync,经过构造函数初始化,能够看到经过默认的构造函数构造的ReentrantLock是非公平锁。this
private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock获取锁方法以下:线程
public void lock() { sync.lock(); }
公平锁调用的是FairSync的lock方法:设计
final void lock() { acquire(1); }
acquire方法是AQS实现的方法,介绍一下参数的1的意思:AQS规定同步状态state,想要得到锁就去改变同步状态,就是把同步状态加1。acquire方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
获取锁的过程:
公平锁尝试获取,在FairSync里实现,获取同步状态成功返回true,不然返回false
protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取同步状态 int c = getState(); //同步状态为0,没有其余线程占据锁 if (c == 0) { //检测同步队列没有其余线程等待(确保公平性),若是没有获取锁就以CAS方式尝试改变同步状态 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //设置锁的拥有者为当前线程 setExclusiveOwnerThread(current); return true; } } //同步状态不为0,检测是不是当前线程拥有锁 else if (current == getExclusiveOwnerThread()) { //当前线程拥有锁,直接更新同步状态,重入锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final boolean hasQueuedPredecessors() { //同步队列尾节点 Node t = tail; //同步队列头节点 Node h = head; Node s; //h!=t 头节点和尾节点不一样,说明同步队列不为空 //同步队列不为空,检测下一个等待获取锁的线程(h.next.thread)是否是当前线程 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }
addWaiter在AQS中实现,以当前线程构成节点加入到同步队列末尾,并返回这个节点Node。
private Node addWaiter(Node mode) { //以当前线程和给定模式构成节点Node Node node = new Node(Thread.currentThread(), mode); // 同步队列不为空,以CAS方式把当前线程加入到队列末尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //队列为空,创建同步队列,再把当前线程加入同步队列 enq(node); return node; }
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
若是当前线程的节点的前驱结点,就去尝试获取同步状态,若是不是或者获取失败根据waitStatus对同步队列进行清理:把waitStatus为CANCELLED从同步队列清除,修改错误的waitStatus,而后把线程堵塞,返回当前线程是否被中断。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //当前节点的前驱结点 final Node p = node.predecessor(); //前驱结点是head头节点,尝试获取同步状态 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
ReentrantLock公平锁与非公平锁的释放机制是同样的,释放锁方法以下:
public void unlock() { sync.release(1); }
unlock方法调用的release方法是在AQS中实现的,这里的1相似于acquire(1),适用于用来设置同步状态的,释放锁时会把同步状态减1。release方法会先调用tryRelease来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。不然,直接返回false
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease尝试获取锁,当同步状态为0时清空占据锁的线程,返回true;若是同步状态不为0返回false,由于ReentrantLock是重入锁,只有完全释放tryRelease才会返回true。
protected final boolean tryRelease(int releases) { // c是本次释放锁以后的同步状态 int c = getState() - releases; //当前线程不是锁的拥有者,抛出IllegalMonitorStateException异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是“锁”已经被当前线程完全释放,则设置“锁”的持有者为null,即锁是可获取状态。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
当前线程释放锁成功的话,会唤醒当前线程的后继线程。从aquireQueued方法能够看出,一旦头结点的后继结点被唤醒,那么后继结点就尝试去获取锁,若是获取成功就将头结点设置为自身,并将前一个头节点清空。
private void unparkSuccessor(Node node) { // 获取当前线程(要释放锁)的等待状态 int ws = node.waitStatus; if (ws < 0) //设置为初始状态 compareAndSetWaitStatus(node, ws, 0); //同步队列头节点的下一个等待节点 Node s = node.next; //等待节点无效,从同步节点尾部开始遍历找到有效的等待节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒等待节点的线程 if (s != null) LockSupport.unpark(s.thread); }
NonfairSync类中lock()实现,首先尝试用CAS更改同步状态,若是成功,把当前线程设置为独占锁的拥有者;而后调用acquire(1)方法。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
acquire方法除了tryAcquire是由AQS的子类实现的,其余方法都是在AQS类实现的,tryAcquire的实现机制不一样体现了公平锁与非公平锁的不一样。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
ReentrantLock中的NonfairSync的tryAcquire方法,调用了nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
非公平锁的尝试获取锁时,若是同步状态为0,即没有其余线程获取到锁,当前线程直接以CAS方式改变同步状态,不会去同步队列找是否有其余线程早于当前线程等在同步队列中,效率较高。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //同步状态为0,尝试以CAS方式改变同步状态 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
本文介绍了ReentrantLock基于AQS同步器实现的公平锁和非公平锁的获取和释放,基于CAS改变同步状态是得到独占锁的基础,为了不多个线程同时对进行竞争,在AQS中维护了FIFO的同步队列,当独占锁释放时,AQS同步器调度同步队列队首等待节点的线程去获取锁,有效避免了海量竞争独占锁形成资源的浪费,是一个很是巧妙的方法。