一个可重入的互斥锁Lock,它具备与使用synchronized方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。node
ReentrantLock将由最近成功得到锁,而且尚未释放该锁的线程所拥有。当锁没有被另外一个线程所拥有时,调用lock的线程将成功获取该锁并返回。若是当前线程已经拥有该锁,此方法将当即返回。api
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
ReentrantLock的锁是经过它内部的一个同步器实现的。安全
/** Synchronizer providing all implementation mechanics */ private final Sync sync;
同步器的类型有两个子类,公平同步器与非公平同步器,也即公平锁与非公平锁。这两个子类是锁的具体实现,咱们对锁的操做也都是做用在这两个类的实例上的。多线程
咱们在使用ReentrantLock时,经过无参构造器建立的lock是就是非公平的。并发
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); }
若是想要使用公平锁,咱们可使用有参构造器来建立lock,传入true表示公平锁,传入false表示非公平锁。
框架
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
建立完成后,咱们使用锁的方式就是调用lock方法,那lock方法又是怎么实现的呢?咱们来看一下代码。oop
/** * Acquires the lock. * * <p>Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * * <p>If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * * <p>If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */ public void lock() { sync.lock(); }
经过代码咱们能够看到,ReentrantLock的lock方法是经过调用sync的lock方法来实现的。根据咱们前面的介绍,sync又是FairSync和NonfairSync中的一个,那咱们下面就来具体分析一下这两个类中lock方法的具体实现。ui
在开始以前,咱们先作点准备工做。从下面的类继承关系图中能够看到,ReentrantLock中的公平锁与非公平锁都是从AQS(AbstractQueuedSynchronizer)扩展来的,因此在详细介绍ReentrantLock以前,咱们先简单的了解一下AQS。this
在API中,对AQS的解释是:spa
为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件、等等)提供一个框架。此类的设计目标是成为依靠单个原子int值来表示状态的大多数同步器的一个有用基础。
经过这两句话,咱们能够看出,AQS中有两个重要的概念,一个是表示状态的单个int值(state),一个是FIFO的队列。在Java并发包中的各类同步器就是经过对这两个概念进行不一样的操做而实现的。其中须要额外说明的就是,对表示状态的int值的操做,是经过CAS的方式完成的,这样才能保证在多线程的环境下的安全性。这两个概念具体的内容与相关操做,咱们等待用到的时候再说。
在ReentrantLock中表示状态的int值表示的是锁的数量。
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
经过代码能够看出,得到锁的第一步是经过CAS的方式,尝试将state从0变为1。若是成功,就表示当前线程成功抢到了锁,接着就会将当前线程设为ExclusiveOwnerThread,即独占线程。若是失败,就会接着调用acquire方法。经过这抢锁的第一步,咱们能够明显感觉到,这就是一个赤裸裸的插队行为。当新线程抢夺锁的时候,它不会关注此时到底有多少个线程在排队等待,而是直接尝试抢锁。若是此时锁正好被释放了,那新线程就有了成功的可能。这就是不公平锁第一处体现不公平的地方。
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
当第一步插队失败后,会调用require方法,并传入1。接着会调用tryAcquire方法再次抢锁。在非公平锁中,tryAcquire方法会直接调用nonfairTryAcquire方法。
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
在此方法中,首先会查看当前的锁数量,若是数量为0,表示当前无锁,那么就会再使用一次插队抢锁。若是抢锁成功,就会直接返回true,这就是不公平锁第二处体现不公平的地方。若是抢锁失败,就会看当前持锁的线程是否是抢锁的线程,若是是的话,锁数nextc+1,并返回true。这也就是为何叫可重入锁的缘由,持锁线程可重复的获取锁。其余状况则会返回false,回到acquire方法中。
当前面的操做都不能成功抢到锁的时候,就会将线程加入到等待队列中。
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
咱们前面提到过AQS中的两个重要概念,以前已经用到一个state了,如今是FIFO队列登场的时候了。
在AQS内部有一条双向链表来存放等待线程,链表中的每一个节点都是一个Node对象。Node对象中维护了线程,等待状态以及先后Node的指针。线程在加入队列以前须要包装成Node对象,而包装线程的方法就是addWaiter。
Node有个表示共享模式的字段,在包装时须要提供,这个值是由参数传来的,默认是独占模式。当把线程包装成Node对象之后,就须要入队了。若是队列不为空,那么此方法就会尝试着快速入队,若是快速入队失败或者队列为空,那么就会使用正常的入队方法。快速入队是使用CAS的方式将新建的node放到队尾,由于在多线程的环境下,有可能在放入队尾的时候被其余线程捷足先登,因此后面会有个正常入队的操做。正常入队操做就是循环入队,直到成功为止。入队操做以后,须要作的就是阻塞等待线程。
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
首先经过node的predecessor方法获取到前一个节点。若是前一个节点是头结点(头结点是获取了锁的线程),说明队列中只有一个节点(有误),说明此节点是即将运行的节点,那么就会直接开始自旋抢锁。若是前节点不是头结点或抢锁失败,那么就会调用shouldParkAfterFailedAcquire方法。
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
这个方法会检查当前节点前节点的等待状态,若是是SIGNAL,则表示前节点已经park了,即已经暂停了,那么当前线程也就能够安全暂定等待唤醒了。若是等待状态大于0,则表示前节点已经被取消了,那么会从当前节点往前遍历,去掉已经被取消的节点。其余状况则将前节点的等待状态改成SIGNAL。由于这个方法外部是一个死循环,因此最终会返回true 的。若是返回true,那就会执行parkAndCheckInterrupt方法。
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
这个方法比较简单,就是阻塞当前线程,以防外层死循环消耗系统资源并等待被前节点唤醒。
至此,非公平锁获取锁的过程就结束了。简单总结一下抢锁的过程:
1. 经过CAS的方式将state(锁数量)从0设置为1(第一次插队)。
1.1 若是设置成功,设置当前线程为独占锁的线程。
1.2 若是没有成功,那么会再获取一次锁数量。
1.2.1 若是锁数量为0,那么会再次用经过CAS的方式将state(锁数量)从0设置为1(第二次插队)。
1.2.2 若是锁数量不为0,或者第二次抢锁失败,那么会查看独占锁线程是不是当前线程。
1.2.2.1 若是是,则将当前锁数量+1。
1.2.2.2 若是不是,则将当前线程包装为一个Node对象,并打入到阻等待队列中去,等待被前节点唤醒。