AQS,全称是 AbstractQueuedSynchronizer,中文译为抽象队列式同步器。这个抽象类对于JUC并发包很是重要,JUC包中的ReentrantLock,,Semaphore,ReentrantReadWriteLock,CountDownLatch 等等几乎全部的类都是基于AQS实现的。html
AQS 中有两个重要的东西,一个以Node为节点实现的链表的队列(CHL队列),还有一个STATE标志,而且经过CAS来改变它的值。java
CLH队列:web
链表结构,在头尾结点中,须要特别指出的是头结点是一个空对象结点,无任何意义,即傀儡结点;算法
每个Node结点都维护了一个指向前驱的指针和指向后驱的指针,结点与结点之间相互关联构成链表;安全
入队在尾,出队在头,出队后须要激活该出队结点的后继结点,若后继结点为空或后继结点waitStatus>0,则从队尾向前遍历取waitStatus<0的触发阻塞唤醒;并发
队列中节点状态值(waitStatus,只能为如下值)ide
//常量:表示节点的线程是已被取消的 static final int CANCELLED = 1; //常量:表示当前节点的后继节点的线程须要被唤醒 static final int SIGNAL = -1; //常量:表示线程正在等待某个条件 static final int CONDITION = -2; //常量:表示下一个共享模式的节点应该无条件的传播下去 static final int PROPAGATE = -3;
线程获取或释放锁的本质是去修改AQS内部那个能够表征同步状态的变量的值。好比说,咱们建立一个ReentrantLock的实例,此时该锁实例内部的状态的值为0,表征它尚未被任何线程所持有。当多个线程同时调用它的lock()方法获取锁时,它们的本质操做其实就是将该锁实例的同步状态变量的值由0修改成1,第1个抢到这个操做执行的线程就成功获取了锁,后续执行操做的线程就会看到状态变量的值已经为1了,即代表该锁已经被其余线程获取,它们抢占锁失败了。这些抢占锁失败的线程会被AQS放入到CHL队列里面去维护起来。svg
AQS是一个抽象类,当咱们继承AQS去实现本身的同步器时,要作的仅仅是根据本身同步器须要知足的性质实现线程获取和释放资源的方式(修改同步状态变量的方式)便可,至于具体线程等待队列的维护(如获取资源失败入队、唤醒出队、以及线程在队列中行为的管理等),AQS在其顶层已经帮咱们实现好了,AQS的这种设计使用的正是模板方法模式。测试
AQS支持线程抢占两种锁——独占锁和共享锁:ui
AQS的全部子类中,要么使用了它的独占锁,要么使用了它的共享锁,不会同时使用它的两个锁。
经过AQS能够很方便的实现一个自定义的同步器,子类只须要经过重写如下方法来控制AQS内部的一个叫作state的同步变量:
//独占模式获取锁与释放锁 //返回值表示获取锁是否成功 protected boolean tryAcquire(int arg) //返回值表示释放锁是否成功 protected boolean tryRelease(int arg) //共享模式获取锁与释放锁 //返回值表示得到锁后还剩余的许可数量 protected int tryAcquireShared(int arg) //返回值表示释放锁是否成功 protected boolean tryReleaseShared(int arg) //调用该方法的线程是否持有独占锁,通常用到了condition的时候才须要实现此方法 protected boolean isHeldExclusively()
这些方法是子类须要实现的,能够选择实现其中的一部分。根据实现方式的不一样,能够分为两种:独占锁和共享锁。其中JUC中锁的分类为:
其实现方式为:
独占锁须要实现的是 tryAcquire(int)、tryRelease(int)
共享锁须要实现的是 tryAcquireShared(int)、tryReleaseShared(int)
在重写这些方法时,若是想要使用state同步变量,必须使用AQS内部提供的如下方法来控制:
/** 返回同步状态的当前值(此操做具备volatile变量的读语义) **/ protected final int getState() { //方法被final修饰,不容许被重写 return state; } /** 设置同步状态的值(此操做具备volatile变量的写语义) **/ protected final void setState(int newState) { //方法被final修饰,不容许被重写 state = newState; } /** * 原子地(CAS操做)将同步状态值设置为给定值update若是当前同步状态的值等于expect(此操做具备volatile变量的读写语义) * @return 成功返回true,失败返回false,意味着当操做进行时同步状态的当前值不是expect **/ protected final boolean compareAndSetState(int expect, int update) { //方法被final修饰,不容许被重写 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS中还提供了一个内部类ConditionObject,它实现了Condition接口,能够用于await/signal。采用CLH队列的算法,唤醒当前线程的下一个节点对应的线程,而signalAll唤醒全部线程。
总的来讲,AQS提供了三个功能:
一个AQS的使用示例以下(这里用AQS实现一个简单的不可重入锁):
public class SimpleLock extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int unused) { //使用compareAndSetState控制AQS中的同步变量 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); //使用setState控制AQS中的同步变量 setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } /** * 发现线程是顺序得到锁的 * 由于AQS是基于CLH锁的一个变种实现的FIFO调度 */ public static void main(String[] args) throws InterruptedException { final SimpleLock lock = new SimpleLock(); lock.lock(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { lock.lock(); System.out.println(Thread.currentThread().getId() + " acquired the lock!"); lock.unlock(); } }).start(); // 简单的让线程按照for循环的顺序阻塞在lock上 //目的是让线程顺序启动 Thread.sleep(100); } System.out.println("main thread unlock!"); lock.unlock(); } }
其实这个基本上就是ThreadPoolExecutor的内部类Worker对AQS的实现。
再看一个栗子:
public class MyLock implements Lock { private Sync sync = new Sync(); public boolean isLocked() { return sync.isHeldExclusively(); } @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } private class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg){ //若是第一个线程进来,拿到锁,返回TRUE //若是第二个线程进来,返回FALSE,拿不到锁 int state = getState(); if(state == 0){ if(compareAndSetState(0,arg)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } } return false; } @Override protected boolean tryRelease(int arg) { //锁的获取和释放须要11对应,那么调用这个方法的线程,必定是当前线程让。 if(Thread.currentThread() != getExclusiveOwnerThread()){ throw new RuntimeException(); } int state = getState() - arg; setState(state); if(state == 0){ setExclusiveOwnerThread(null); return true; } return false; } // 判断当前独占锁是否被持有 protected boolean isHeldExclusively() { return getState() == 1; } // 提供条件队列功能 Condition newCondition() { return new ConditionObject(); } } }
上面MyLock的大多数方法,其实咱们根本不用操心,统一交给AQS的方法来帮咱们完成就好。而咱们真正要实现的就是tryAcquire和tryRelease 这2个操做。
首先咱们来实现tryAcquire,本质就是若是第一个线程进来就拿到锁,后面进来的RETURN FALSE。然而以前咱们声明了一个BOOL变量。这里咱们能够直接AQS 的STATE变量。咱们能够用初始值为0来表示没有线程拿到锁,一旦第一个线程进来了,就把它设置为非0.
咱们再来思考释放,由于锁的获取和释放是一一对应的,咱们首要判断锁住的线程是否是同一个,若是是,咱们就把STATE - ARG。若是STATE回到0,则表明锁释放成功。
测试类Sequence来测试下咱们写的锁了。能够发现输出的是50,锁是成功的。
public class Sequence { private Lock lock = new MyLock(); private int value; public int getNext(){ lock.lock(); value++; lock.unlock(); return value; } public static void main(String[] args) { Sequence s = new Sequence(); for(int i = 0; i < 5; i++){ new Thread(new Runnable() { @Override public void run() { for(int i = 0;i<10;i++){ System.out.println(s.getNext()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } } }
而后去测试重入锁的DEMO,发现会卡主。咱们要再加一些逻辑来实现重入锁。
protected boolean tryAcquire(int arg){ //若是第一个线程进来,拿到锁,返回TRUE //若是第二个线程进来,返回FALSE,拿不到锁 Thread t = Thread.currentThread(); int state = getState(); if(state == 0){ if(compareAndSetState(0,arg)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } }else if(getExclusiveOwnerThread() == t){ //由于只有一个线程能够进这个IF,因此没有线程安全性问题 setState(state+arg); return true; } return false; }
测试可重入锁
public class Test { Lock lock = new MLock(); public void a(){ lock.lock(); System.out.println("a"); b(); lock.unlock(); } public void b(){ lock.lock(); System.out.println("b"); lock.unlock(); } public static void main(String[] args) { Test d = new Test(); new Thread(new Runnable() { @Override public void run() { d.a(); } }).start(); } }
最后来看看基于AQS实现的 CountDownLatch 源码, CountDownLatch 用同步状态持有当前计数,countDown方法调用 release从而致使计数器递减;当计数器为0时,解除全部线程的等待;await调用acquire,若是计数器为0,acquire 会当即返回,不然阻塞。一般用于某任务须要等待其余任务都完成后才能继续执行的情景。源码以下:
public class CountDownLatch { /** * 基于AQS的内部Sync * 使用AQS的state来表示计数count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 使用AQS的getState()方法设置状态 setState(count); } int getCount() { // 使用AQS的getState()方法获取状态 return getState(); } // 覆盖在共享模式下尝试获取锁 protected int tryAcquireShared(int acquires) { // 这里用状态state是否为0来表示是否成功,为0的时候能够获取到返回1,不然不能够返回-1 return (getState() == 0) ? 1 : -1; } // 减小count的值,若是count为0则发出释放信号。 //这里使用了"自旋+CAS”的方式来原子性的将state的值减小1, //若是在此过程当中state已经为0了(在并发状况下,可能已经被其余线程修改成了0), //则返回false。不然根据修改后state的值是否等于0来返回boolean值。 protected boolean tryReleaseShared(int releases) { // 在for循环中Decrement count直至成功; // 当状态值即count为0的时候,返回false表示 signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; // 使用给定计数值构造CountDownLatch public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 让当前线程阻塞直到计数count变为0,或者线程被中断 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // count递减 public void countDown() { sync.releaseShared(1); } // 获取当前count值 public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
使用AQS分别实现独占锁与共享锁
① 独占模式,独占模式采起的例子是银行服务窗口,假如某个银行网点只有一个服务窗口,那么此银行服务窗口只能同时服务一我的,其余人必须排队等待,因此这种银行窗口同步装置是一个独占模型。第一个类是银行窗口同步装置类,它按照推荐的作法使用一个继承AQS同步器的子类实现,并做为子类出现。第二个类是测试类,形象一点地说,有三位良民到银行去办理业务,分别是tom、jim和jay,咱们使用BankServiceWindow就能够约束他们排队,一个一个轮着办理业务而避免陷入混乱的局面。
/** * 独占模式 */ public class BankServiceWindow { private final Sync sync; public BankServiceWindow() { sync = new Sync(); } private static class Sync extends AbstractQueuedSynchronizer { public boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int releases) { if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } } public void handle() { sync.acquire(1); } public void unhandle() { sync.release(1); } }
测试类
public class BankServiceWindowTest { public static void main(String[] args) { final BankServiceWindow bankServiceWindow = new BankServiceWindow(); Thread tom = new Thread() { public void run() { bankServiceWindow.handle(); System.out.println("tom开始办理业务"); try { this.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("tom结束办理业务"); bankServiceWindow.unhandle(); } }; Thread jim = new Thread() { public void run() { bankServiceWindow.handle(); System.out.println("jim开始办理业务"); try { this.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("jim结束办理业务"); bankServiceWindow.unhandle(); } }; Thread jay = new Thread() { public void run() { bankServiceWindow.handle(); System.out.println("jay开始办理业务"); try { this.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("jay结束办理业务"); bankServiceWindow.unhandle(); } }; tom.start(); jim.start(); jay.start(); } }
明显tom、jim、jay仨是排队完成的,可是没法保证三者的顺序,多是tom、jim、jay,也多是tom、jay、jim,由于在入列之前的执行前后是没法肯定的,它的语义是保证一个接一个办理。若是没有同步器限制的状况,输出结果将不可预测。
② 共享模式,共享模式采起的例子一样是银行服务窗口,随着此网点的发展,办理业务的人愈来愈多,一个服务窗口已经没法知足需求,因而又分配了一位员工开了另一个服务窗口,这时就能够同时服务两我的了,但两个窗口都有人占用时一样也必须排队等待,这种服务窗口同步器装置就是一个共享型。第一个类是共享模式的同步装置类,跟独占模式不一样的是它的状态的初始值能够自由定义,获取与释放就是对状态递减和累加操做。第二个类是测试类,tom、jim和jay再次来到银行,一个有两个窗口甚是高兴,他们能够两我的同时办理了,时间缩减了很多。
/** * 共享模式 */ public class BankServiceWindowsShare { private final Sync sync; public BankServiceWindowsShare(int count) { sync = new Sync(count); } private static class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } public int tryAcquireShared(int interval) { for (; ; ) { int current = getState(); int newCount = current - 1; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared(int interval) { for (; ; ) { int current = getState(); int newCount = current + 1; if (compareAndSetState(current, newCount)) { return true; } } } } public void handle() { sync.acquireShared(1); } public void unhandle() { sync.releaseShared(1); } }
测试类
public class BankServiceWindowShareTest { public static void main(String[] args) { final BankServiceWindowsShare bankServiceWindows = new BankServiceWindowsShare(2); Thread tom = new Thread() { public void run() { bankServiceWindows.handle(); System.out.println("tom开始办理业务"); try { this.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("tom结束办理业务"); bankServiceWindows.unhandle(); } }; Thread jim = new Thread() { public void run() { bankServiceWindows.handle(); System.out.println("jim开始办理业务"); try { this.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("jim结束办理业务"); bankServiceWindows.unhandle(); } }; Thread jay = new Thread() { public void run() { bankServiceWindows.handle(); System.out.println("jay开始办理业务"); try { this.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("jay结束办理业务"); bankServiceWindows.unhandle(); } }; tom.start(); jim.start(); jay.start(); } }
参考文章:https://www.jianshu.com/p/8b5bdce4efdd;https://blog.csdn.net/wangyangzhizhou/article/details/40052353
其余文章:http://www.cnblogs.com/dream-to-pku/p/9186126.html