为了节省各位的时间,我简单介绍一下这篇文章。这篇文章主要分为三块:Lock的实现,AQS的由来(经过演变的方式),JUC三大工具类的使用与原理剖析。java
Lock的实现:简单介绍ReentrantLock,ReentrantReadWriteLock两种JUC下经典Lock的实现,并经过手写简化版的ReentrantLock和ReentrantReadWriteLock,从而了解其实现原理。缓存
AQS的由来:经过对两个简化版Lock的屡次迭代,从而得到AQS。而且最终的Lock实现了J.U.C下Lock接口,既可使用咱们演变出来的AQS,也能够对接JUC下的AQS。这样一方面能够帮助你们理解AQS,另外一方面你们能够从中了解,如何利用AQS实现自定义Lock。而这儿,对后续JUC下的三大Lock工具的理解有很是大的帮助。安全
JUC三大工具:通过前两个部分的学习,这个部分不要太easy。能够很容易地理解CountDownLatch,Semaphore,CyclicBarrier的内部运行及实现原理。数据结构
不过,因为这三块内容较多,因此我将它拆分为三篇子文章进行论述。多线程
Lock接口位于J.U.C下locks包内,其定义了Lock应该具有的方法。ide
Lock 方法签名:工具
ReentrantLock是一个可重入锁,一个悲观锁,默认是非公平锁(可是能够经过Constructor设置为公平锁)。学习
ReentrantLock经过构造方法得到lock对象。利用lock.lock()方法对当前线程进行加锁操做,利用lock.unlock()方法对当前线程进行释放锁操做。ui
经过this
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition();
得到Condition对象(Condition是J.U.C下locks包下的接口)。
经过Condition对象的.await(*),能够将当前线程的线程状态切换到Waiting状态(若是是有参,则是Time Waiting状态)。而.signal(),.signalAll()等方法则正好相反,恢复线程状态为Runnable状态。
ReentrantLock和Synchronized功能相似,更加灵活,固然,也更加手动了。
你们都知道,只有涉及资源的竞争时,采用同步的必要。写操做天然属于资源的竞争,可是读操做并不属于资源的竞争行为。简单说,就是写操做最多只能一个线程(由于写操做涉及数据改变,多个线程同时写,会产生资源同步问题),而读操做能够有多个(由于不涉及数据改变)。
因此在读多写少的场景下,ReentrantLock就比较浪费资源了。这就须要一种可以区分读写操做的锁,那就是ReentrantReadWriteLock。经过ReentrantReadWriteLock,能够得到读锁与写锁。当写锁存在时,有且只能有一个线程持有锁。当写锁不存在时,能够有多个线程持有读锁(写锁,必须等待读锁释放完,才能够持有锁)。
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); readLock.lock(); readLock.unlock(); readLock.newCondition(); ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); writeLock.lock(); writeLock.unlock(); writeLock.newCondition();
与以前ReentrantLock应用的区别,就是须要经过lock.readLock()与lock.writeLock()来获取读锁,写锁,再进行加锁,释放锁的操做,以及Condition的获取操做。
终于上大餐了。
首先第一步操做,咱们须要肯定咱们要作什么。
咱们要作一个锁,这里姑且命名为JarryReentrantLock。
这个锁,须要具有如下特性:可重入锁,悲观锁。
另外,为了更加规范,之后更好地融入到AQS中,该锁须要实现Lock接口。
而Lock的方法签名,在文章一开始,就已经写了,这里再也不赘述。
固然,咱们这里只是一个demo,因此就不实现Condition了。另外tryLock(long,TimeUnit)也再也不实现,由于实现了总体后,这个实现其实并无想象中那么困难。
既然须要已经肯定,而且API也肯定了。
那么第二步操做,就是简单思考一下,如何实现。
首先,咱们须要一个owner属性,来保存持有锁的线程对象。
其次,因为是可重入锁,因此咱们须要一个count来保存重入次数。
最后,咱们须要一个waiters属性,来保存那些竞争锁失败后,还在等待(不死不休型)的线程对象。
那么接下来,就根据以前的信息,进行编码吧。
package tech.jarry.learning.netease; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; /** * @Description: 仿ReentrantLock,实现其基本功能及特性 * @Author: jarry */ public class JarryReentrantLock implements Lock { // 加锁计数器 private AtomicInteger count = new AtomicInteger(0); // 锁持有者 private AtomicReference<Thread> owner = new AtomicReference<>(); // 等待池 private LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(); @Override public boolean tryLock() { // 判断当前count是否为0 int countValue = count.get(); if (countValue != 0){ // countValue不为0,意味着锁被线程持有 // 进而判断锁的持有者owner是否为当前线程 if (Thread.currentThread() == owner.get()){ // 锁的持有者为当前线程,那么就重入加锁 // 既然锁已经被当前线程占有,那么就不用担忧count被其余线程修改,即不须要使用CAS count.set(countValue+1); // 执行重入锁,表示当前线程得到了锁 return true; }else{ // 若是当前线程不是锁的持有者,返回false(该方法是tryLock,即浅尝辄止) return false; } }else { // countValue为0,意味着当前锁不被任何线程持有 // 经过CAS操做将count修改成1 if (count.compareAndSet(countValue,countValue+1)){ // count修改为功,意味着该线程得到了锁(只有一个CAS成功修改count,那么这个CAS的线程就是锁的持有者) // 至于这里为何不用担忧可见性,其实一开始我也比较担忧其发生相似doubleCheck中重排序形成的问题(tryUnlock是会设置null的) // 看了下源码,AtomicReference中的value是volatile的 owner.set(Thread.currentThread()); return true; } else { // CAS操做失败,表示当前线程没有成功修改count,即获取锁失败 return false; } } } @Override public void lock() { // lock()【不死不休型】就等于执行tryLock()失败后,仍然不断尝试获取锁 if (!tryLock()){ // 尝试获取锁失败后,就只能进入等待队列waiers,等待机会,继续tryLock() waiters.offer(Thread.currentThread()); // 经过自旋,不断尝试获取锁 // 其实我一开始也不是很理解为何这样写,就能够确保每一个执行lock()的线程就在一直竞争锁。其实,想想执行lock()的线程都有这个循环。 // 每次unlock,都会将等待队列的头部唤醒(unpark),那么处在等待队列头部的线程就会继续尝试获取锁,等待队列的其它线程仍然,继续阻塞(park) // 这也是为何须要在循环体中执行一个检测当前线程是否为等待队列头元素等一系列操做。 // 另外,还有就是:处于等待状态的线程可能收到错误警报和伪唤醒,若是不在循环中检测等待条件,程序就会在没有知足结束条件的状况下退出。反正最后不管那个分支,都return,结束方法了。 // 即便没有伪唤醒问题,while仍是须要的,由于线程须要二次尝试得到锁 while (true){ // 获取等待队列waiters的头元素(peek表示获取头元素,但不删除。poll表示获取头元素,并删除其在队列中的位置) Thread head = waiters.peek(); // 若是当前线程就是等待队列中的头元素head,说明当前等待队列就刚刚加入的元素。 if (head == Thread.currentThread()){ // 尝试再次得到锁 if (!tryLock()){ // 再次尝试获取锁失败,即将该线程(即当前线程)挂起, LockSupport.park(); } else { // 获取锁成功,即将该线程(等待队列的头元素)从等待队列waiters中移除 waiters.poll(); return; } } else { // 若是等待队列的头元素head,不是当前线程,表示等待队列在当前线程加入前,就还有别的线程在等待 LockSupport.park(); } } } } private boolean tryUnlock() { // 首先肯定当前线程是否为锁持有者 if (Thread.currentThread() != owner.get()){ // 若是当前线程不是锁的持有者,就抛出一个异常 throw new IllegalMonitorStateException(); } else { // 若是当前线程是锁的持有者,就先count-1 // 另外,同一时间执行解锁的只多是锁的持有者线程,故不用担忧原子性问题(原子性问题只有在多线程状况下讨论,才有意义) int countValue = count.get(); int countNextValue = countValue - 1; count.compareAndSet(countValue,countNextValue); if (countNextValue == 0){ // 若是当前count为0,意味着锁的持有者已经彻底解锁成功,故应当失去锁的持有(即设置owner为null) // 其实我一开始挺纠结的,这里为何须要使用CAS操做呢。反正只有当前线程才能够走到程序这里。 // 首先,为何使用CAS。因为count已经设置为0,其它线程已经能够修改count,修改owner了。因此不用CAS就可能将owner=otherThread设置为owner=null了,最终的结果就是完全卡死 //TODO_FINISHED 可是unlock()中的unpark未执行,根本就不会有其它线程啊。囧 // 这里代码仍是为了体现源码的一些特性。实际源码是将这些所的特性,抽象到了更高的层次,造成一个AQS。 // 虽然tryUnlock是由实现子类实现,但countNextValue是来自countValue(而放在JarryReadWriteLock中就是writeCount),在AQS源码中,则是经过state实现 // 其次,有没有ABA问题。因为ABA须要将CAS的expect值修改成currentThread,而当前线程只能单线程执行,因此不会。 // 最后,这里owner设置为null的操做到底需不须要。实际源码多是须要的,可是这里貌似真的不须要。 owner.compareAndSet(Thread.currentThread(),null); // 解锁成功 return true; } else { // count不为0,解锁还没有彻底完成 return false; } } } @Override public void unlock() { if (tryUnlock()){ // 若是当前线程成功tryUnlock,就表示当前锁被空置出来了。那就须要从备胎中,啊呸,从waiters中“放“出来一个 Thread head = waiters.peek(); // 这里须要作一个简单的判断,防止waiters为空时,抛出异常 if (head != null){ LockSupport.unpark(head); } } } // 非核心功能就不实现了,起码如今不实现了。 @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
这里就不进行一些解释了。由于须要的解释,在注释中都写的很明确了,包括我踩的一些坑。
若是依旧有一些看不懂的地方,或者错误的地方,欢迎@我,或者私信我。
与ReentrantLock同样,首先第一步操做,咱们须要肯定咱们要作什么。
咱们要作一个锁,这里姑且命名为JarryReadWriteLock。
这个锁,须要具有如下特性:读写锁,可重入锁,悲观锁。
一方面了为了更好理解(初版本,重在理解基础,不是嘛),另外一方面也是为了更好地复用前面ReentrantLock的代码(毕竟ReentrantLock其实就是读写锁的写锁,不是嘛),这里的JarryReadWriteLock的API再也不与官方的ReentrantReadWriteLock相同,而是作了小小调整。直接调用相关读锁的加解锁API,已经相关写锁的加解锁API。具体看代码部分。
既然须要已经肯定,而且API也肯定了。
那么第二步操做,就是简单思考一下,如何实现。
首先,咱们须要一个owner属性,来保存持有写锁的线程对象。
其次,因为写锁是可重入锁,因此咱们须要一个readCount来保存重入次数。
而后,因为读锁是能够有多个线程持有的,因此咱们须要一个writeCount来保存读锁持有线程数。
最后,咱们须要一个waiters属性,来保存那些竞争锁失败后,还在等待(不死不休型)的线程对象。
到这这里,就不由会有一个疑问。如何判断尝试获取锁的线程想要得到的锁是什么类型的锁。在API调用阶段,咱们能够根据API判断。可是放入等待队列后,咱们如何判断呢?若是仍是如以前那样,等待队列只是保存竞争锁的线程对象,是彻底不够的。
因此咱们须要新建一个WaitNode的Class,用来保存等待队列中线程对象及相关必要信息。因此,WaitNode会有以下属性:
package tech.jarry.learning.netease; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; /** * @Description: * @Author: jarry */ public class JarryReadWriteLock { // 用于读锁(共享锁)的锁计数器 这里真的有必要volatile嘛(Atomic中的value时volatile的),再看看后续代码 // 这里确实不须要volatile,至于源码,更过度,源码是经过一个变量state的位运算实现readCount与writeCount volatile AtomicInteger readCount = new AtomicInteger(0); // 用于写锁(独占锁)的锁计数器 这里之因此不用volatile是由于独占锁,只有一个线程在改变writeCount(即便有缓存,也仍是这个线程,因此不会由于缓存问题,致使问题) AtomicInteger writeCount = new AtomicInteger(0); // 用于保存锁的持有者(这里专指写锁(独占锁)的锁持有者) AtomicReference<Thread> owner = new AtomicReference<>(); // 用于保存指望得到锁的线程(为了区分线程但愿得到的锁的类型,这里新建一个新的数据类型(经过内部类实现)) public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<>(); // 内部类实现等待队列中的自定义数据类型 class WaitNode{ // 表示该等待者的线程 Thread thread = null; // 表示但愿争取的锁的类型。0表示写锁(独占锁),1表示读锁(共享锁) int type = 0; // 参数,acquire,状态相关,再看看 int arg = 0; public WaitNode(Thread thread, int type, int arg) { this.type = type; this.thread = thread; this.arg = arg; } } /** * 尝试获取独占锁(针对独占锁) * @param acquires 用于加锁次数。通常传入waitNode.arg(本代码中就是1。为何不用一个常量1,就不知道了?)(能够更好的对接AQS) * @return */ public boolean tryLock(int acquires){ //TODO_FINISHED 这里readCount的判断,与修改writeCount的操做能够被割裂,并非原子性的。不就有可能出现readCount与writeCount的值同时大于零的状况。 // 该示例代码,确实存在该问题,但实际源码,writeCount与readCount是经过同一变量state实现的,因此能够很好地经过CAS确保原子性 // readCount表示读锁(共享锁)的上锁次数 if (readCount.get() == 0){ // readCount的值为0,表示读锁(共享锁)空置,因此当前线程是有可能得到写锁(独占锁)。 // 接下来判断写锁(独占锁)是否被占用 int writeCountValue = writeCount.get(); if (writeCountValue == 0){ // 写锁(独占锁)的锁次数为0,表示写锁(独占锁)并没未被任何线程持有 if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ // 修改writeCount,来得到锁。该机制与ReentrantLock相同 // 设置独享锁的持有者owner owner.set(Thread.currentThread()); // 至此,表示当前线程抢锁成功 return true; } } else { // 写锁(独占锁)的锁次数不为0,表示写锁(独占锁)已经被某线程持有 if (Thread.currentThread() == owner.get()){ // 若是持有锁的线程为当前线程,那就进行锁的重入操做 writeCount.set(writeCountValue+acquires); // 重入锁,表示当前线程是持有锁的 return true; } // 读锁未被占用,但写锁被占用,且占据写锁的线程不是当前线程 } } // 读锁被占据 // 其它状况(1.读锁被占据,2读锁未被占用,但写锁被占用,且占据写锁的线程不是当前线程),都返回false return false; } /** * 获取独占锁(针对独占锁) */ public void lock(){ // 设定waitNote中arg参数 int arg = 1; // 尝试获取独占锁。成功便退出方法,失败,则进入“不死不休”逻辑 if (!tryLock(arg)){ // 须要将当前保存至等待队列,在这以前,须要封装当前线程为waitNote WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg); // 将封装好的waitNode放入等待队列waiters中(offer方法会在队列满时,直接返回false。put则是阻塞。add则是抛出异常) waiters.offer(waitNode); // 如ReentrantLock通常,开始循环尝试拿锁 while (true){ // 获取队列头部元素 WaitNode headNote = waiters.peek(); // 若是等待队列头部元素headNote不为null(有多是null嘛?),而且就是当前线程,那就尝试获取锁 if (headNote !=null && headNote.thread == Thread.currentThread()){ // 若是再次尝试获取锁失败,那就只能挂起了 if (!tryLock(headNote.arg)){ LockSupport.park(); } else { // 再次尝试获取锁成功,那就将队列头部元素,踢出等待队列waiters waiters.poll(); return; } }else { // 若是headNote不是当前线程的封装,就直接挂起(这里就没处理headNote==null的状况) LockSupport.park(); } } } } /** * 尝试解锁(针对独占锁) * @param releases 用于设定解锁次数。通常传入waitNode.arg * @return */ public boolean tryUnlock(int releases){ // 首先判断锁的持有者是否为当前线程 if (owner.get() != Thread.currentThread()){ // 锁的持有者不是当前线程(即便锁的持有者为null,锁的持有者是null,还解锁,仍然是抛出异常) throw new IllegalMonitorStateException(); } // 锁的持有者就是当前线程 // 首先按照releases进行解锁(通过一番思考后,这里不会出现相似DoubleCheck中的问题(Atomic中的value是volatile的),因此这个值同时只会有一个线程对其操做) int writeCountValue = writeCount.get(); // 为writeCount设置新值 writeCount.set(writeCountValue-releases); // 根据writeCount的新值,判断锁的持有者是否发生变化 if (writeCount.get() == 0){ // writeCount的值为0,表示当前线程已经彻底解锁,因此修改锁的持有者为null owner.set(null); // 而这表示彻底解锁成功 return true; } else { // writeCount的值不为0,表示当前线程还没有彻底解锁,故锁的持有者未发生变化。即尝试解锁失败 return false; } } /** * 解锁(针对独占锁) */ public void unlock(){ // 设定tryUnlock的参数releases int arg = 1; // 先尝试解锁 if (tryUnlock(arg)){ // 得到等待队列的头部元素 WaitNode head = waiters.peek(); // 检测一下头部元素head是否null(也许等待队列根本就没有元素) if (head == null){ // 若是头部元素head为null,说明队列为null,直接return return; } // 解锁成功,就要把等待队列中的头部元素唤醒(unpark) // 这里有一点注意,即便队列的头元素head被唤醒了,也不必定就是这个头元素head得到锁(详见tryLock,新来的线程可能得到锁) // 若是这个头元素没法得到锁,就会park(while循环嘛)。而且一次park,能够屡次unpark(已实践) LockSupport.unpark(head.thread); } } /** * 尝试获取共享锁(针对共享锁) * @param acquires * @return */ public boolean tryLockShared(int acquires){ // 判断写锁(独占锁)是否被别的线程持有(这个条件意味着:同一个线程能够同时持有读锁与写锁) // 该方法是为了进行 锁降级****** if (writeCount.get() == 0 || owner.get() == Thread.currentThread()){ // 若是写锁(独占锁)没有别的被线程持有,就能够继续尝试获取读锁(共享锁) // 经过循环实现自旋,从而实现加锁(避免加锁失败) while(true){ // 因为读锁(共享锁)是共享的,不存在独占行为,故直接在writeCount增长当前线程加锁行为的次数acquires int writeCountValue = writeCount.get(); // 经过CAS进行共享锁的次数的增长 if (writeCount.compareAndSet(writeCountValue, writeCountValue+acquires)){ break; } } } // 写锁已经被别的线程持有,共享锁获取失败 return false; } /** * 获取共享锁(针对共享锁) */ public void lockShared(){ // 设定waitNote中arg参数 int arg = 1; // 判断是否获取共享锁成功 if (!tryLockShared(arg)){ // 若是获取共享锁失败,就进入等待队列 // 与获取同步锁操做同样的,须要先对当前线程进行WaitNote的封装 WaitNode waitNode = new WaitNode(Thread.currentThread(),1,arg); // 将waitNote置入waiters(offer方法会在队列满时,直接返回false。put则是阻塞。add则是抛出异常) waiters.offer(waitNode); // 使用循环。一方面避免伪唤醒,另外一方面便于二次尝试获取锁 while (true){ // 获取等待队列waiters的头元素head WaitNode head = waiters.peek(); // 校验head是否为null,并判断等待队列的头元素head是否为当前线程的封装(也许head时当前线程的封装,但并不意味着head就是刚刚放入waiters的元素) if (head != null && head.thread == Thread.currentThread()){ // 若是校验经过,而且等待队列的头元素head为当前线程的封装,就再次尝试获取锁 if (tryLockShared(head.arg)){ // 获取共享锁成功,就从当前队列中移除head元素(poll()方法移除队列头部元素) waiters.poll(); // 在此处就是与独占锁不一样的地方了,独占锁意味着只可能有一个线程得到锁,而共享锁是能够有多个线程得到的 // 得到等待队列的新头元素newHead WaitNode newHead = waiters.peek(); // 校验该元素是否为null,并判断它的锁类型是否为共享锁 if (newHead != null && newHead.type == 1){ // 若是等待队列的新头元素是争取共享锁的,那么就唤醒它(这是一个相似迭代的过程,刚唤醒的线程会会作出一样的举动) //TODO_FINISHED 这里有一点,我有些疑惑,那么若是等待队列是这样的{共享锁,共享锁,独占锁,共享锁,共享锁},共享锁们被一个独占锁隔开了。是否是就不能唤醒后面的共享锁了。再看看后面的代码 // 这个实际源码,并非这样的。老师表示现有代码是这样的,不用理解那么深刻,后续有机会看看源码 LockSupport.unpark(newHead.thread); } } else { // 若是再次获取共享锁失败,就挂起 LockSupport.park(); } } else { // 若是校验未经过,或等待队列的头元素head不是当前线程的封装,就挂起当前线程 LockSupport.park(); } } } } /** * 尝试解锁(针对共享锁) * @param releases * @return */ public boolean tryUnlockShared(int releases){ // 经过CAS操做,减小共享锁的锁次数,即readCount的值(因为是共享锁,因此是可能多个线程同时减小该值的,故采用CAS) while (true){ // 获取读锁(共享锁)的值 int readCountValue = readCount.get(); int readCountNext = readCountValue - releases; // 只有成功修改值,才能够跳出 if (readCount.compareAndSet(readCountValue,readCountNext)){ // 用于代表共享锁彻底解锁成功 return readCountNext == 0; } } // 因为读锁没有owner,因此不用进行有关owner的操做 } /** * 解锁(针对共享锁) */ public boolean unlockShared(){ // 设定tryUnlockShared的参数releases int arg = 1; // 判断是否尝试解锁成功 if (tryUnlockShared(arg)){ // 若是尝试解锁成功,就须要唤醒等待队列的头元素head的线程 WaitNode head = waiters.peek(); // 校验head是否为null,毕竟可能等待队列为null if (head != null){ // 唤醒等待队列的头元素head的线程 LockSupport.unpark(head.thread); } //TODO_FINISHED 尝试共享锁解锁成功后,就应当返回true(虽然有些不大理解做用) // 用于对应源码 return true; } //TODO_FINISHED 尝试共享锁解锁失败后,就应当返回false(虽然有些不大理解做用) // 用于对应源码 return false; } }
这里一样不进行相关解释了。由于须要的解释,在注释中都写的很明确了,包括我踩的一些坑。
若是依旧有一些看不懂的地方,或者错误的地方,欢迎@我,或者私信我。
其实,这两个demo有两个重要的方面。一方面是能够亲自感觉,一个锁是怎么实现的,它的方案是怎样的。另外一方面就是去思量,其中有关原子性,以及可见性的思量与设计。
大家能够尝试改动一些东西,而后去考虑,这样改动后,是否存在线程安全问题。这样的考虑对本身在线程安全方面的提高是巨大的。反正我当时那一周,就不断的改来改去。甚至有些改动,根本调试不出来问题,而后咨询了别人,才知道其中的一些坑。固然也有一些改动是能够的。
若是有问题,能够@我,或者私信我。
若是以为这篇文章不错的话,请点击推荐。这对我,以及那些须要的人,很重要。
谢谢。