ReentrantLock:实现了Lock接口,是一个可重入锁,而且支持线程公平竞争和非公平竞争两种模式,默认状况下是非公平模式。ReentrantLock算是synchronized的补充和替代方案。node
公平竞争:听从先来后到的规则,先到先得
非公平竞争:正常状况下是先到先得,可是容许见缝插针。即持有锁的线程刚释放锁,等待队列中的线程刚准备获取锁时,忽然半路杀出个程咬金,抢到了锁,等待队列中等待获取锁的线程只能干瞪眼,接着等抢到锁的线程释放锁安全
ReentrantLock与synchronized比较:
一、ReentrantLock底层是经过将阻塞的线程保存在一个FIFO队列中,synchronized底层是阻塞的线程保存在锁对象的阻塞池中
二、ReentrantLock是经过代码机制进行加锁,因此须要手动进行释放锁,synchronized是JAVA关键字,加锁和释放锁有JVM进行实现
三、ReentrantLock的加锁和释放锁必须在方法体内执行,可是能够不用同一个方法体,synchronized能够在方法体内做为方法块,也能够在方法声明上
四、synchronized进行加锁时,若是获取不到锁就会直接进行线程阻塞,等待获取到锁后再往下执行。ReentrantLock既能够阻塞线程等待获取锁,也能够设置等待获取锁的时间,超过等待获取时间就放弃获取锁,再也不阻塞线程多线程
ReentrantLock源码分析:less
/** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { .... } /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { .... } /** * Sync object for fair locks */ static final class FairSync extends Sync { .... }
从省略细节的源码中咱们能够很清晰的看到,ReentrantLock内部定义了三个内部类,所有直接或者间接的继承AbstractQueuedSynchronizer,ReentrantLock有一个属性sync,默认状况下为NonfairSync类型,即为非公平锁。实际上ReentrantLock的全部操做都是有sync这个属性进行的,ReentrantLock只是一层外皮。源码分析
ReentrantLock既然实现类Lock接口,咱们就先以加锁、解锁进行分析:
一、加锁(非公平):ui
public void lock() { sync.lock(); }
就像上面说的同样,ReentrantLock的功能都是靠底层sync进行实现,ReentrantLock的加锁很简单,就一句话使用sync的lock()方法,咱们直接看源码this
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
sync的lock()方法也是比较简单的,先经过CAS试图将AQS的state由0变为1。若是成功,表示该线程获取锁成功(设值时没有线程在持有锁),就将当前线程设置为锁的拥有者(这就是前面所说得见缝插针,后面还有);若是失败,只表示设置值没有成功,不表示该线程获取锁失败(由于有多是重入加锁),开始调用AQS的acquire(int)方法进行获取锁,咱们直接看acquire(int)方法spa
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这是AQS定义的模板方法,由AQS的子类去重写tryAcquire(int) 方法,由AQS去调用,进行尝试获取锁。若是获取锁成功,方法直接结束;若是获取锁失败,就将当前线程进行阻塞并加入到FIFO的CLH队列中。咱们先看ReentrantLock内部类是如何重写tryAcquire(int)方法的线程
protected final boolean tryAcquire(int acquires) {
// 直接调用父类nonfairTryAcquire(int)方法 return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
咱们直接看sync声明的nonfairTryAcquire(int)方法,先获取保存的state,若是值为0,表示当前没有线程持有锁,处理和前面的一致,直接见缝插针,经过CAS尝试直接设置值,设值失败就表示获取锁失败了,直接返回失败结果;若是state不是0,表示锁被线程持有,就比较下持有锁的线程是不是当前线程,若是是,表示线程重入持有锁,进行state值的累加。若是不是,直接返回持有锁失败结果。tryAcquire(int)方法获取锁失败后,会去执行AQS声明的acquireQueued(Node, int)方法将当前线程封装到CLH队列的节点Node中,并进行阻塞。咱们先看看AQS是将当前线程封装到Node中都作了什么操做code
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 将队尾的node改成新建的node if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
将当前线程装进Node实例中,并设置改Node为独享模式。而后获取队尾的Node实例,由于走到这一步的时候有两种获取锁失败的场景:一、竞争锁时,没有线程持有锁;二、竞争锁时,已有别的线程持有锁。因此会先判断下队尾Node是否为null,若是为空,表示是第一种场景获取锁失败,若是不为空,这是第二种场景获取锁失败。先看第2种场景的处理流程,直接将队尾的Node设置为新建Node实例的prev(表示在队列中的前一个Node节点),而后经过CAS尝试将新建的Node节点设置为队尾(这个时候用CAS是由于有可能存在多线程竞争),若是设置队尾成功,就将前任队尾的next节点设置为新建的node节点;若是设置队尾失败(多线程竞争才会出现),和场景1进行相同的处理,先看源码
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
处理很简单,先获取队尾,若是获取的队尾为null,表示上一步场景1过来的,经过CAS将队首设置为一个空的Node节点(该节点表示正在持有锁的线程封装的Node,仅仅是表明,没有实值),并将队尾和队首指向同一个节点;若是获取的队尾不为null,将队尾设置为参数Node的上一个节点,并经过CAS尝试将参数Nodo设置为队尾,若是设置成功,将新队尾设置为前任队尾的next节点,并直接返回;若是设置失败,往复循环,直到成功为止。
经过前面对addWaiter(Node)源码的分析,咱们能够清楚的了解到addWaiter方法将当前线程封装到CLH队列的独享模式的Node节点中,并经过CAS将当前线程的Node节点设置为队尾。下面咱们接着看acquireQueued(final Node node, int arg)方法会作什么处理
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前任节点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
先获取参数Node节点的前任节点,若是前任节点是队首时,会去再次调用tryAcquire(int)方法去尝试持有锁,若是成功持有锁,就将参数Node直接设置为队首,同时将前任队首的next节点设置为null(去除引用,利于GC),而后直接返回当前线程是否要进行中断操做。若是前任节点不是队首或者再次尝试持有锁失败,会先调用shouldParkAfterFailedAcquire(Node pre, Node node)进行判断是否须要进行线程阻塞,若是须要线程阻塞再调用parkAndCheckInterrupt()进行线程阻塞(该方法返回值表示该线程是不是中断状态)。线程阻塞后会等待前任节点释放锁时唤醒结束阻塞,线程结束阻塞后会循环再次去获取锁。可是若是结束阻塞后去获取锁时,有新的线程见缝插针直接获取到锁了,那就只能再次在队列中进行阻塞了。其实shouldParkAfterFailedAcquire和parkAndCheckInterrupt看方法名称就能猜个大概了,咱们仍是直接先看shouldParkAfterFailedAcquire(Node pre, Node node)方法的源码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 这个节点已经设置了状态,请求释放信号通知它,这样它就能够安全地进行阻塞了。 */ return true; if (ws > 0) { /* * 此时前任被取消了,跳过前任并重试。 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 等待状态必须为0或传播。将节点状态设置为SIGNAL,告诉前任节点后面节点须要释放信号通知,但先不进行阻塞。呼叫者将须要重试,以确保它不能在阻塞前得到锁。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
先获取前任节点的waitStatus(等待状态),1.若是前任节点的等待状态为Node.SIGNAL(表示后面的节点须要前任节点释放锁时进行通知,结束后面节点的阻塞),直接返回true,执行后面的阻塞流程;2.1若是前任节点的等待转态值大于0,表示前任节点被取消了(争夺锁的线程因过了设定时间,获取锁失败,从队列中删除节点的时候,Node节点会被设为取消状态),跳过前任节点,往前找,直到找到不是取消状态的节点,直接将找到的有效前任节点的next节点设置为当前节点;2.2若是前任节点不是取消状态(多是初始值0、等待状态或者传播状态),经过CAS尝试将前任节点的waitStatus设置为Node.SIGNAL(无论设置成功与否,都直接返回false,后面会再次循环执行,用来确保该节点的线程在阻塞前必定不会获取到锁,由于存在见缝插针去争取持有锁的线程)。shouldParkAfterFailedAcquire方法返回true,会进行线程阻塞,返回false,调用层会进行循环让当前线程再次获取一次锁,失败后再次被调用进行清洗已经取消的节点或者进行前任节点的等待状态设置为Node.SIGNAL。
下面咱们再来看看parkAndCheckInterrupt()方法的源码
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt()方法的内容很简单,使用LockSupport的park(Object blocker)方法进行线程阻塞,有兴趣的同窗能够自行去深刻了解,等待前任节点调用调用LockSupport的unpark(Thread thread)唤醒该线程。而后在该线程结束阻塞后返回该线程是不是中断的状态。
至此,非公平锁获取锁的流程就分析完了,总结下非公平锁加锁流程:
1.无论线程有没有被持有,先尝试获取锁
2.锁未被持有,直接获取锁。2.1获取锁成功,结束;2.2获取锁失败,封装成CLH队列的Node节点,并进行线程阻塞
3.锁被持有。3.1线程重入加锁,进行state累加,等待释放锁时进行减除;3.2封装成CLH队列的Node节点,并进行线程阻塞
二、定长时间加锁(非公平)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
没什么可说的,依然是使用sync进行操做,咱们直接看sync的实现源码
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
这里直接先判断当前线程是不是中断状态(多是由于ReentrantReadWriteLock中的WriteLock也会使用才进行判断),若是是中断状态,直接抛异常。咱们这边确定不会是中断状态的啦,接着往下走,调用tryAcquire(int)方法尝试获取锁,忘了过程的同窗请往前翻。若是获取锁成功,那就直接结束。获取锁失败时,执行doAcquireNanos(int arg, long nanosTimeout)方法以独占定时模式获取锁。咱们直接看源码
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
一样是先建立一个Node节点并放置到CLH队列的队尾,而后一样的是开始进行循环处理,不一样的地方是,定长获取锁使用了LockSupport.pargNanos(Object blocker, long nanos)进行阻塞一段时间,若是在线程阻塞未自动结束时,前一个节点释放了锁,该节点同样会被解除阻塞去争夺锁,若是不幸被别的线程见缝插针抢去了锁,那就接着去阻塞定长时间(这个定长时间是根据最初设定的时间和当前时间的差值),等待锁的释放。若是超过了定长时间仍是没有获取到锁,就会调用cacelAcquire(Node node)去删除该节点。
三、公平竞争加锁
公平锁对象FairSync的lock()方法直接调用了acquire(int)方法,前面咱们分析了,acquire(int)方法会先调用tryAcquire(int)方法去尝试获取锁,根据返回结果去判断是否须要加入到队列中。下面咱们直接FairSync的源码
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
源码的逻辑比较清晰简单,先判断当前锁是不是空闲状态(state是不是0),若是是空闲的,就去尝试获取锁,返回争夺结果;若是不是空闲的,判断是不是线程重入,若是是就累加state,返回成功,若是不是重入,直接返回失败。若是tryAcquire方法返回了false,那么就会将该线程封装到CLH队列的Node中并进行线程阻塞,后面的流程和非公平锁时一致的。
总结下公平锁加锁的流程:
1.锁未被持有,尝试直接获取锁。2.1获取锁成功,结束;2.2获取锁失败,封装成CLH队列的Node节点,并进行线程阻塞
2.锁被持有。2.1线程重入加锁,进行state累加,等待释放锁时进行减除;2.2封装成CLH队列的Node节点,并进行线程阻塞
四、释放锁:
public void unlock() { sync.release(1); }
ReentrantLock的释放锁直接调用的sync属性的release(int)方法,实际是直接调用的AbstractQueuedSynchronizer的release(int)方法,咱们直接看源码
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
依然是一样的配方,调用子类重写的tryRelease(int)方法去真正释放锁,若是释放成功,并且队列中有节点在等待队首释放锁后进行通知,就会调用unparkSuccessor(Node node)去解除下一个节点的线程阻塞状态,让下一个线程去获取锁。
咱们先看看子类重写的tryRelease(int)方法
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
源码比较简单,先比较当前线程是不是持有锁的线程,若是不是,直接抛异常,说明调用者使用不规范,没有先去获取锁。而后进行state的减除,先判断此时state是否为0,为0表示线程彻底释放了锁。若是为0,就将锁的持有者变为null。无论最后有没有彻底释放锁都会将state设置成新值。
咱们再看看unparkSuccessor(Node node)都作了什么
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
此时参数node表明的head,若是head的waitStatus小于0,表示后面节点须要在等待通知锁被释放的信号,先将head的waitStatus改成0,而后去看看head的next节点是存在而且next节点的waitStatus小于0,。若是next节点为null或者waitStatus大于0,就从队尾tail节点依次往前找,找到head节点后第一个waitStatus不大于0的节点,而后结束该节点的线程阻塞状态;若是head的next节点存在而且waitStatus不大于0,直接解除head的next节点线程阻塞状态。
总结下锁释放过程:
1.先判断当前线程是否能够进行锁释放
2.state减除,若是减除后的state为0,就将锁的持有者设为空,并解除下一个等待节点的线程阻塞状态
为了加深印象,我专门还花了三个流程图,一块儿看看
到这里基本上ReentrantLock的主要功能点都说完了,还有一个Condition功能没说,接着搞起来。
ReentrantLock的Condition:
ReentrantLock方法: public Condition newCondition() { return sync.newCondition(); } Sync内部类方法: final ConditionObject newCondition() { return new ConditionObject(); }
仍是一如既往的配方,ReentrantLock的newCondition依然用的是sync属性去实现功能,Sync也简单,直接就是建立一个AbstractQueuedSynchronizer的内部列ConditionObject的实例。咱们直接去看ConditionObject的源码去分析
Condition主要是wait()、signal()两个方法,咱们先来看wait()方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
分析以前先说明下,ConditionObject中一样保存的有CLH队列,和外部类AbstractQueuedSynchronizer类似。
咱们经过await()方法的源码简单分析下:
一、首先调用了addConditionWaiter()方法将当前线程封装到Node.CONDITION模式的Node中,放入到ConditionObject的CLH队列中,顺便去除了一些队列中waitStatus不是Node.CONDITION的节点。
二、调用fullyRelease(Node node)去彻底释放掉锁,并去解除AbstractQueuedSynchronizer中队列的head节点,方法返回节点彻底释放前的state值
三、循环:对当前节点进行线程阻塞,直到被其余线程使用signal()或者signalAll()方法解除线程阻塞状态
四、将节点加入到AbstractQueuedSynchronizer的CLH队列中,等待争夺锁
总体上Condition的wait()方法的内容就这么多,源码也比较简单,有兴趣的能够本身深刻看看。
如今看下signal()的源码:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
signal()方法的主要工做仍是放在了doSignal(Node first)方法和transferForSignal(Node node)两个方法上。在doSignal(Node first)方法中,获取到ConditionObject的CLH队列的队首,而后调用transferForSignal(Node node)方法先将节点的waitStatus改成0,而后将节点放入到AbstractQueuedSynchronizer的CLH队列队尾,若是前任队尾的waitStatus大于0或者将前任队尾的waitStatus改成Node.SIGNAL失败时,直接解除节点的线程阻塞状态,结束wait()方法中的循环,调用acquireQueued(Node node, int savedState)去尝试抢夺锁,由于此时当前线程仍然持有锁,因此节点最后仍是会被线程阻塞。由于此时节点node已经从ConditionObject的CLH队列迁移到了AQS的CLH队列队尾,即便if条件不知足,不能解除node节点的线程阻塞状态,等到前任队尾节点释放锁时仍是会解除node节点的线程阻塞状态。
Condition还有await(long time, TimeUnit unit)、signalAll()等等其它方法,原理差很少,这里就不一一赘述了。至此,ReentrantLock的知识点基本上也说的差很少了。