Java 并发编程 ----- AQS(抽象队列同步器)

1、什么是 AQS ?

AQS即AbstractQueuedSynchronizer的缩写,是并发编程中实现同步器的一个框架。框架,框架,重要的事情说三遍,框架就是说它帮你处理了很大一部分的逻辑,其它功能须要你来扩展。想一想你使用Spring框架的场景,Spring帮助开发者实现IOC容器的bean依赖管理,标签解析等,咱们只须要对bean进行配置便可,其余不用管。java

AQS基于一个FIFO双向队列实现,被设计给那些依赖一个表明状态的原子int值的同步器使用。咱们都知道,既然叫同步器,那个确定有个表明同步状态(临界资源)的东西,在AQS中即为一个叫state的int值,该值经过CAS进行原子修改。node

在AQS中存在一个FIFO队列,队列中的节点表示被阻塞的线程,队列节点元素有4种类型, 每种类型表示线程被阻塞的缘由,这四种类型分别是:编程

  • CANCELLED : 表示该线程是由于超时或者中断缘由而被放到队列中
  • CONDITION : 表示该线程是由于某个条件不知足而被放到队列中,须要等待一个条件,直到条件成立后才会出队
  • SIGNAL : 表示该线程须要被唤醒
  • PROPAGATE : 表示在共享模式下,当前节点执行释放release操做后,当前结点须要传播通知给后面全部节点

因为一个共享资源同一时间只能由一条线程持有,也能够被多个线程持有,所以AQS中存在两种模式,以下:设计模式

  • 一、独占模式安全

    独占模式表示共享状态值state每次只能由一条线程持有,其余线程若是须要获取,则须要阻塞,如JUC中的ReentrantLockbash

  • 二、共享模式数据结构

    共享模式表示共享状态值state每次能够由多个线程持有,如JUC中的CountDownLatch多线程

2、AQS 中的核心数据结构和方法

一、既然AQS是基于一个FIFO队列的框架,那么咱们先来看下队列的元素节点Node的数据结构,源码以下:
static final class Node {
    /**共享模式*/
    static final Node SHARED = new Node();
    /**独占模式*/
    static final Node EXCLUSIVE = null;

    /**标记线程因为中断或超时,须要被取消,即踢出队列*/
    static final int CANCELLED =  1;
    /**线程须要被唤醒*/
    static final int SIGNAL = -1;
    /**线程正在等待一个条件*/
    static final int CONDITION = -2;
    /**
     * 传播
     */
    static final int PROPAGATE = -3;
    
    // waitStatus只取上面CANCELLED、SIGNAL、CONDITION、PROPAGATE四种取值之一
    volatile int waitStatus;

    // 表示前驱节点
    volatile Node prev;

    // 表示后继节点
    volatile Node next;

    // 队列元素须要关联一个线程对象
    volatile Thread thread;

    // 表示下一个waitStatus值为CONDITION的节点
    Node nextWaiter;

    /**
     * 是否当前结点是处于共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 返回前一个节点,若是没有前一个节点,则抛出空指针异常
     */
    final Node predecessor() throws NullPointerException {
        // 获取前一个节点的指针
        Node p = prev;
        // 若是前一个节点不存在
        if (p == null)
            throw new NullPointerException();
        else
        // 不然返回
            return p;
    }

    // 初始化头节点使用
    Node() {}

    /**
     *  当有线程须要入队时,那么就建立一个新节点,而后关联该线程对象,由addWaiter()方法调用
     */
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    /**
     * 一个线程须要等待一个条件阻塞了,那么就建立一个新节点,关联线程对象
     */
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
复制代码

总结下Node节点数据结构设计,队列中的元素,确定是为了保存因为某种缘由致使没法获取共享资源state而被入队的线程,所以Node中使用了waitStatus表示节点入队的缘由,使用Thread对象来表示节点所关联的线程。至于prev,next,则是通常双向队列数据结构必须提供的指针,用于对队列进行相关操做。并发

二、AQS中的共享状态值

以前提到,AQS是基于一个共享的int类型的state值来实现同步器同步的,其声明以下:框架

/**
 * 同步状态值
 */
private volatile int state;

/**
 * 获取同步状态值
 */
protected final int getState() {
    return state;
}

/**
 * 修改同步状态值
 */
protected final void setState(int newState) {
    state = newState;
}
复制代码

由源码咱们能够看出,AQS声明了一个int类型的state值,为了达到多线程同步的功能,必然对该值的修改必须多线程可见,所以,state采用volatile修饰,并且getState()setState()方法采用final进行修饰,目的是限制AQS的子类只能调用这两个方法对state的值进行设置和获取,而不能对其进行重写自定义设置/获取逻辑。

AQS中提供对state值修改的方法不只仅只有setState()getState(),还有诸如采用CAS机制进行设置的compareAndSetState()方法,一样,该方法也是采用final修饰的,不容许子类重写,只能调用。

三、AQS中的tryXXX方法

通常基于AQS实现的同步器,如ReentrantLock,CountDownLatch等,对于state的获取操做,子类只需重写其tryAcquire()tryAcquireShared()方法便可,这两个方法分别对应独占模式和共享模式下对state的获取操做;而对于释放操做,子类只需重写tryRelease()tryReleaseShared()方法便可。

至于如何维护队列的出队、入队操做,子类不用管,AQS已经帮你作好了。

3、AQS 设计妙处

优秀的项目总会有亮点可挖,AQS也是。小编在看了AQS的源码以后,结合其余做者相关博客,总结了如下两点感受很优秀的设计点,这是咱们应该学习的,前辈老是那么优秀。

一、自旋锁

当咱们执行一个有肯定结果的操做,同时又须要并发正确执行,一般能够采用自旋锁实现。在AQS中,自旋锁采用 死循环 + CAS 实现。针对AQS中的enq()进行讲解:

private Node enq(final Node node) {
    // 死循环 + CAS ,解决入队并发问题
    /**
     * 假设有三个线程同时都须要入队操做,那么使用死循环和CAS可保证并发安全,同一时间只有一个节点安全入队,入队失败的线程则循环重试
     * 
     * 一、若是不要死循环能够吗?只用CAS.
     *   不能够,由于若是其余线程修改了tail的值,致使1处代码返回false,那么方法enq方法将推出,致使该入队的节点却没能入队
     * 
     * 二、若是只用死循环,不须要CAS能够吗?
     *   不能够,首先不须要使用CAS,那就不必再使用死循环了,再者,若是不使用CAS,那么当执行1处代码时,将会改变队列的结构
     */
    for (;;) {
        // 获取尾部节点
        Node t = tail;
        // 若是尚未初始化,那么就初始化
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                // 刚开始确定是头指针和尾指针相等
                tail = head;
        } else {
            // 当前结点的前驱节点等于尾部节点
            node.prev = t;
            // 若是当前尾结点仍然是t,那么执行入队并返回true,不然返回false,而后重试
            if (compareAndSetTail(t, node)) {   // 1
                t.next = node;
                return t;
            }
        }
    }
}
复制代码

首先入队操做要求的最终结果必须是一个节点插入到队列中去,只能成功,不能失败!然而这个入队的操做是须要并发执行的,有可能同时有不少的线程须要执行入队操做,所以咱们须要采起相关的线程同步机制。自旋锁采起乐观策略,即便用了CAS中的compareAndSet()操做,若是某次执行返回fasle,那么当前操做必须重试,所以,采用for死循环直到成功为止,成功,则break跳出for循环或者直接return操做退出方法。

二、模板方法

在AQS中,模板方法设计模式体如今其acquire()、release()方法上,咱们先来看下源码:

public final void acquire(int arg) {
        // 首先尝试获取共享状态,若是获取成功,则tryAcquire()返回true
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

其中调用tryAcquire()方法的默认实现是抛出一个异常,也就是说tryAcquire()方法留给子类去实现,acquire()方法定义了一个模板,一套处理逻辑,相关具体执行方法留给子类去实现。

关于更多模板方法设计模式,能够查阅谈一谈我对‘模板方法’设计模式的理解(Template)

4、自定义本身的并发同步器

下边以JDK文档的一个实例进行介绍:

class Mutex implements Lock, java.io.Serializable {
    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断是否锁定状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 尝试获取资源,当即返回。成功则返回true,不然false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 尝试释放资源,当即返回。成功则为true,不然false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定为1个量
            if (getState() == 0)//既然来释放,那确定就是已占有状态了。只是为了保险,多层判断!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//释放资源,放弃占有状态
            return true;
        }
    }

    // 真正同步类的实现都依赖继承于AQS的自定义同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。二者语义同样:获取资源,即使等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。二者语义同样:尝试获取资源,要求当即返回。成功则为true,失败则为false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。二者语文同样:释放资源。
    public void unlock() {
        sync.release(1);
    }

    //锁是否占有状态
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}
复制代码

实现本身的同步类通常都会自定义同步器(sync),而且将该类定义为内部类,供本身使用;而同步类本身(Mutex)则实现某个接口,对外服务。固然,接口的实现要直接依赖sync,它们在语义上也存在某种对应关系!!而sync只用实现资源state的获取-释放方式tryAcquire-tryRelelase,至于线程的排队、等待、唤醒等,上层的AQS都已经实现好了,咱们不用关心。

除了Mutex,ReentrantLock/CountDownLatch/Semphore这些同步类的实现方式都差很少,不一样的地方就在获取-释放资源的方式tryAcquire-tryRelelase。掌握了这点,AQS的核心便被攻破了!

相关文章
相关标签/搜索