千呼万唤始出来,终于写到AQS这个一章了,其实为了写这一章,前面也是作了不少的铺垫,好比以前的html
深度理解volatile关键字 线程之间的协做(等待通知模式) JUC 经常使用4大并发工具类 CAS 原子操做 显示锁 了解LockSupport工具类java
这些文章其实都是为了让你们理解AQS而写的铺垫,就像吃东西须要一口一口的吃同样编程
AQS,是AbstractQuenedSynchronizer的缩写,中文名称为抽象的队列式同步器,是java并发编程这一块的半壁江山,这个类存在于在java.util.concurrent.locks包,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,好比以前写的显示锁ReentrantLock,,读写锁ReentrantReadWriteLock,JUC的四大并发工具类中的Semaphore,CountDownLatch,线程池暂时还没写以后再写设计模式
在JDK1.7以前,FutureTask,应该也是继承了AQS来实现的,可是1.8以后就改变了安全
可是实现思想应该没有太大改变,,因此说AQS是并发编程的半壁江山网络
若是被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工做线程,并将共享资源设置为锁定状态,若是被请求的共享资源被占用,那么就须要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。数据结构
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。
AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。多线程
其实在我理解来讲,AQS就是基于CLH队列,用volatile修饰共享变量state,来保证变量的可见性,线程经过CAS去改变状态符,保证状态的原子性,成功则获取锁成功,失败则进入等待队列,等待被唤醒。并发
注意:AQS是自旋锁:在等待唤醒的时候,常常会使用自旋(while(!cas()))的方式,不停地尝试获取锁,直到被其余线程获取成功框架
经过这个图得知,AQS维护了一个volatile int state和一个FIFO线程等待队列,多线程争用资源被阻塞的时候就会进入这个队列。state就是共享资源,其访问方式有以下三种:
getState();setState();compareAndSetState();
AQS 定义了两种资源共享方式:
1.Exclusive:独占,只有一个线程能执行,如ReentrantLock
2.Share:共享,多个线程能够同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
不一样的自定义的同步器争用共享资源的方式也不一样。
同步器的设计是基于模板方法模式的,若是不了解的能够去看看模板方法设计模式,以前在写设计模式的六大设计原则的时候也说了,看看设计模式有助于理解源码,若是须要自定义同步器通常的方式是这样:
自定义同步器在实现的时候只须要实现共享资源state的获取和释放方式便可,至于具体线程等待队列的维护,AQS已经在顶层实现好了。自定义同步器实现的时候主要实现下面几种方法:
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其余线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。固然,释放锁以前,A线程本身是能够重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每一个子线程执行完后countDown()一次,state会CAS减1。等到全部子线程都执行完后(即state=0),会unpark()主调用线程,而后主调用线程就会从await()函数返回,继续后余动做。
通常来讲,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种便可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
在acquire() acquireShared()两种方式下,线程在等待队列中都是忽略中断的,acquireInterruptibly()/acquireSharedInterruptibly()是支持响应中断的。
说了那么多,可是说一千道一万不如本身手写试试,接下来看代码
锁
package org.dance.day4.aqs; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 采用主类实现Lock接口,内部类继承AQS,封装细节 * 自定义锁 * @author ZYGisComputer */ public class CustomerLock implements Lock { private final Sync sync = new Sync(); /** * 采用内部类来继承AQS,封装细节 * 实现独占锁,经过控制state状态开表示锁的状态 * state:1 表明锁已被占用 * state:0 表明锁能够被占用 */ private static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0,1)){ // 当前线程获取到锁 setExclusiveOwnerThread(Thread.currentThread()); return true; }else{ return false; } } @Override protected boolean tryRelease(int arg) { // 若是状态为没人占用,还去释放,就报错 if(getState()==0){ throw new UnsupportedOperationException(); } // 把锁的占用者制空 setExclusiveOwnerThread(null); setState(0); return true; } /** * 判断线程是否占用资源 * @return */ @Override protected boolean isHeldExclusively() { return getState()==1; } /** * 获取Condition接口 * @return */ public Condition getCondition(){ return new ConditionObject(); } } @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.getCondition(); } }
工具类:
package org.dance.tools; import java.util.concurrent.TimeUnit; /** * 类说明:线程休眠辅助工具类 */ public class SleepTools { /** * 按秒休眠 * @param seconds 秒数 */ public static final void second(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } /** * 按毫秒数休眠 * @param seconds 毫秒数 */ public static final void ms(int seconds) { try { TimeUnit.MILLISECONDS.sleep(seconds); } catch (InterruptedException e) { } } }
测试类:
package org.dance.day4.aqs; import org.dance.tools.SleepTools; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** *类说明:测试手写锁 */ public class TestMyLock { public static void main(String[] args) { TestMyLock testMyLock = new TestMyLock(); testMyLock.test(); } public void test() { // 先使用ReentrantLock 而后替换为咱们本身的Lock final Lock lock = new ReentrantLock(); class Worker extends Thread { @Override public void run() { while (true) { lock.lock(); try { SleepTools.second(1); System.out.println(Thread.currentThread().getName()); SleepTools.second(1); } finally { lock.unlock(); } SleepTools.second(2); } } } // 启动10个子线程 for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.setDaemon(true); w.start(); } // 主线程每隔1秒换行 for (int i = 0; i < 10; i++) { SleepTools.second(1); System.out.println(); } } }
执行结果:
Thread-0 Thread-1 Thread-2 Thread-3 Thread-4
经过结果能够看出来每次都是只有一个线程在执行的,线程的锁获取没有问题,接下来换咱们本身的锁
final Lock lock = new CustomerLock();
再次执行测试
执行结果:
Thread-0 Thread-1 Thread-2 Thread-3 Thread-4
因而可知,这个手写的锁,和ReentrantLock是同样的效果,是否是感受也挺简单的,也没有多少行代码
那么独占锁,被一个线程占用着,其余线程去了哪里?不要走开接下来进入AQS的源码看看
在AQS中的数据结构是采用同步器+一个双向循环链表的数据结构,来存储等待的节点的,由于双向链表是没有头的,可是为了保证唤醒的操做,同步器中的head标志了链表中的一个节点为头节点,也就是将要唤醒的,也标识了一个尾节点
这里咱们说下Node。Node结点是对每个等待获取资源的线程的封装,其包含了须要同步的线程自己及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的状况下),会触发变动为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
CONDITION(-2):表示结点等待在Condition上,当其余线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3):共享模式下,前继结点不只会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。
注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。因此源码中不少地方用>0、<0来判断结点的状态是否正常。
经过图能够看出来,在增长 尾节点的时候须要经过CAS设置,由于多是多个线程同时设置,可是移除首节点的时候是不须要的,由于这个操做是由同步器操做的,而且首节点只有一个
AQS的从线程的获取同步状态到,对同步队列的维护,到释放,的流程图就是这样的,有兴趣看源码的本身去跟一下,就是主要实现的模板方法,
注意:其实在这个给你们提个醒,看源码的时候,找核心的看,找主要的看,不要一行一行的扣着看,没有意义,还有就是调用过程复杂,体会核心流程就能够
以前写了<<Lock接口之Condition接口>>这一章,而后在这里写一下Condition接口在AQS里面的实现吧,由于无论本身写锁也好,默认锁的实现也好,用的Condition都是AQS默认写好的
一个锁是能够有多个Condition的,每一个Condition都包含一个本身的等待队列,不一样于Object属于同一个对象等待,他存在一个单链表结构的等待队列,清晰的知道要唤醒本身的等待队列中的节点,因此采用signal方法而不是signalall
固然采用的类仍是Node类固然单链表其实就是没有上一个节点的引用而已
等待队列和同步队列采用的是相同的类,只不过是实现的数据机构确是不同的而已
最终一个锁的实例化会成为上图中第二个图的这种形式,Demo也就是以前写的<<Lock接口之Condition接口>>中的用的锁最终造成的结构及时就是维持了一个同步队列和两个等待队列,锁用于控制并发,而两个队列用于控制地点变化和千米数变化的不一样的等待通知模式
就是在当前线程await的时候从同步队列移除后加入到等待队列尾部,而唤醒就是从等待队列移除后加入到同步队列尾部,两个队列相互转换的过程,之因此采用同一个类,就是为了方便的在不一样队列中相互转化
固然这也是为何不推荐使用SignalAll方法的缘由,由于若是一个等待队列中有不少的线程在等待,所有唤醒后,最多且只能有一个线程获取到同步状态,其余线程所有要被加入到同步队列的末尾,并且也可能当前的同步状态被别人持有,一个线程也获取不到,所有都要被加入同步队列中,因此不推荐使用SignalAll,推荐是用Signal
其实也能够想象,好比wait和notify/notifyAll 在写<<线程之间的协做(等待通知模式)>>这篇文章的时候的最后一个问题也能够大概想象一下,应该也是维持了一个同步队列,可是等待队列应该是只有一个,因此,被唤醒的是第一个等待的节点,可是它没有办法保证要被唤醒的节点必定是在头一个,只能唤醒所有的节点,来保证须要唤醒的线程必定被唤醒,大概也是这样的一个节点的移动,根据网络文章的描述,应该八九不离十
根据猜想,结合上方的Condition接口分析,因此说,在wait,notify/notifyAll中推荐使用notifyAll,防止第一个节点不是须要唤醒的节点,形成唤醒错误,可是Condition是知道的,被唤醒的必定是须要唤醒的,不会唤醒错误,因此说,推荐使用signal
能看到这里的证实你真的很爱这个行业,你是最棒的!加油
其实在上面手写的锁,是有一些缺陷的,由于判断的是否是等于1,因此他是一个不支持可重入的,一旦重入,就会形成死锁,本身锁住本身,可是ReentrantLock就不会
他支持锁的可重入,而且支持锁的公平和非公平
经过源码能够看到,他是经过状态的累加完成的锁的可重入,固然前提是已经拿到锁的线程,会有这样一个判断
因此可想而知,释放的时候,每次释放就递减,最终等于0的时候完成锁的释放
在实现公平锁的时候,就是判断当前节点是否有前期节点,是否是第一个,若是有,不是第一个,抱歉你不能抢锁
可想而知在非公平锁中就是不判断而已
由于不须要判断,而且是谁抢到锁,锁就是谁的,因此说非公平锁比公平锁效率高
在读写锁中,一个状态如何 保存两个状态呢?采用位数分割
应该有知道 int是32位的,他把32位一分为二,采用低位保存写的状态,高位保存读的状态
写锁,应该都知道,只能同时被一个线程持有,因此重入的话,也比较好保存
可是读锁不同,能够被多个线程同时持有,是共享锁,而且重入的次数是不同的,那么该则么保存呢?采用高位只保存被多少线程持有
采用每一个持有锁的线程中的一个HoldCounter对象保存,使用ThreadLocalHoldCounter继承ThreadLocal来保存线程变量,区别不一样线程
读写锁支持写锁降级为读锁,可是不支持读锁升级为写锁,为了保证线程安全和数据可见性,由于在写锁执行期间,读锁是被阻塞的,因此说写锁降级为读锁是没有问题的,可是若是是读锁升级为写锁,在其余线程使用完写锁的时候,读锁是看不见的,为了保证线程安全,因此不支持读锁升级成写锁
到此AQS就写完了,由于AQS涉及的知识太多,能看到如今的也都是大神了,恭喜大家,掌握了并发编程的半壁江上,为了本身的梦想更近了一步,加油,由于知识点多,因此你们多看几遍,不理解的能够百度,也能够评论区提问
做者:彼岸舞
时间:2020\11\18
内容关于:并发编程
本文来源于网络,只作技术分享,一律不负任何责任