Java并发——AbstractQueuedSynchronizer(AQS)同步器

简介

在此以前介绍ReentrantLockReentrantReadWriteLock中都有sync属性,而sync正是继承了AQS(AbstractQueuedSynchronizer)同步器。AQS采用模板设计模式,调用其模板方法(独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程状况),重写指定方法,咱们自身就能利用AQS构造出自定义同步组件。java

AQS解析

重要属性

//等待队列的头节点
    private transient volatile Node head;
    //等待队列的尾节点
    private transient volatile Node tail;
    //同步状态
    private volatile int state;
复制代码

AQS内部经过head、tail定义了一个FIFO队列,state表示同步状态(0表示未有线程获取同步状态或锁,大于0表示有线程占有),都经过volatile修饰,保证了内存可见性node

重要内部类

static final class Node {
    /** 共享模式 */
    static final Node SHARED = new Node();
    /** 独占模式 */
    static final Node EXCLUSIVE = null;
    /**由于在同步队列中等待的线程等待超时或者被中断,须要从同步队列中取消等待,节点进入该状态将不会变化 */
    static final int CANCELLED =  1;
    /** 
     * 后继节点的线程处于等待状态,而当前节点的线程若是释放了同步状态或者取消 
     * 将会通知后继节点,使后继节点的线程运行
     */ 
    static final int SIGNAL    = -1;
    /**
     * 节点在等待队列中,节点线程等待在Condtion上,当其余线程对Condtion调用了signal()方法后
     * 该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中 
     */
    static final int CONDITION = -2;
    /**  
     * 表示下一次共享式同步状态获取将会无条件被传播下去
     */
    static final int PROPAGATE = -3;
    /** 当前节点等待状态 */
    volatile int waitStatus;
    /** 前驱节点 */
    volatile Node prev;
    /** 后继节点 */
    volatile Node next;
    /** 节点关联的线程 */
    volatile Thread thread;
    /** 
     * 等待队列中的后继节点,若是当前节点是共享的,那么这个字段是一个SHARED常量,即节点类型(独占和共享)
     * 和等待队列中的后继节点公用同一个字段
     */
    Node nextWaiter;
    }
复制代码

node节点是构成同步队列的基础,pre、next前驱后继维护了一个双向队列,同步队列结构如图:编程

当一个线程成功地获取了同步状态(或者锁),其余线程将没法获取到同步状态,转而被构形成为节点并加入到同步队列中

独占式获取与释放同步状态

  • 独占式获取同步状态
  • 独占锁的lock方法都会调用AQS所提供的模板方法acquire(), 当线程获取同步状态失败后进入同步队列中,后续对线程进行中断操做时,线程不会从同步队列中移出
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    复制代码
    模板主要思路:

    ①.调用tryAcquire()方法保证线程安全地获取同步状态(或者锁),此方法自定义同步器本身实现
    ②.若获取失败,则构造同步节点,调用addWaiter()将该节点加入同步队列尾部
    ③.最后调用acquireQueued()死循环获取同步状态
    ④.若是获取不到,调用shouldParkAfterFailedAcquire()方法判断是否须要阻塞,若返回true阻塞节点中的线程,能够依靠前驱节点的出队或阻塞线程被中断来唤醒阻塞线程

    设计模式

    tryAcquire

    tryAcquire()方法体内部只抛异常,若自定义同步器为独占式获取同步状态必须重写此方法安全

    protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    复制代码

    addWaiter

    将节点加入同步队列并发

    private Node addWaiter(Node mode) {
            //新建Node
            Node node = new Node(Thread.currentThread(), mode);
            // CAS快速尝试尾插节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //屡次尝试
            enq(node);
            return node;
        }
    复制代码

    若队列为空或者cas设置失败后,调用enq自旋再次设置工具

    private Node enq(final Node node) {
            // 死循环
            for (;;) {
                // 获取尾结点
                Node t = tail;
                // 若队列为空,初始化
                if (t == null) { // Must initialize
                    // cas设置头节点
                    if (compareAndSetHead(new Node()))
                        // 设置尾结点
                        tail = head;
                } else {
                    // CAS设置尾结点
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    复制代码

    从源码中能够发现若同步队列添加节点失败后,会死循环一直尾插下去直至添加成功

    post

    acquireQueued

    节点进入同步队列以后,就进入了一个自旋的过程,当条件知足,获取到了同步状态,就能够从这个自旋过程当中退出,不然会一直执行下去ui

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                // 中断标记
                boolean interrupted = false;
                // 自旋死循环
                for (;;) {
                    // 获取当前节点的前驱节点
                    final Node p = node.predecessor();
                    // 若前驱节点为头节点且获取同步状态成功
                    if (p == head && tryAcquire(arg)) {
                        // 设置头节点
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 判断线程是否须要阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    复制代码

    从源码中能够发现只有当前节点的前驱节点是头节点才能尝试获取同步状态,其缘由在于:
    ①.头节点是成功获取到同步状态的节点,而头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后须要检查本身是否为头节点
    ②.保持FIFO同步队列原则
    this

    阻塞

    加入队列后,会自旋不断获取同步状态,可是自旋过程当中须要判断当前线程是否须要阻塞

    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
        
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 前驱节点等待状态
            int ws = pred.waitStatus;
            // 若前驱节点状态为SIGNAL,代表当前节点处于等待状态,返回true
            if (ws == Node.SIGNAL)
                return true;
            // 若前驱节点状态>0即取消状态,代表前驱节点已经等待超时或者被中断了,须要从同步队列中取消
            if (ws > 0) {
                // 循环遍历,直至处于当前节点前面的节点不为取消状态为止
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            // 前驱节点状态为CONDITION,PROPAGATE
            } else {
                // CAS设置前驱节点状态为SINNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    复制代码

    若shouldParkAfterFailedAcquire返回true,会调用parkAndCheckInterrupt方法,其方法内部主要调用LockSupport工具类的park()方法阻塞线程

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            // 返回当前线程的中断状态
            return Thread.interrupted();
        }
    复制代码

    acquire()方法流程

  • 独占式释放同步状态
  • 当线程获取同步状态后,执行完相应逻辑后就须要释放同步状态,AQS提供了release()方法释放同步状态,方法在释放了同步状态以后,会唤醒其后继节点(进而使后继节点从新尝试获取同步状态),一样自定义同步器须要重写tryRelease()释放同步状态

    public final boolean release(int arg) { // 若释放同步状态成功 if (tryRelease(arg)) { // 获取同步队列头节点 Node h = head; if (h != null && h.waitStatus != 0) // 唤醒头节点的后继节点 unparkSuccessor(h); return true; } return false; }
    private void unparkSuccessor(Node node) {
        // 获取当前节点等待状态
        int ws = node.waitStatus;
        // 若状态为SIGNAL、CONDITION或PROPAGATE,CAS将其状态置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 获取后继节点
        Node s = node.next;
        // 若后继节点为null或其状态为CANCELLED(等待超市或者被中断)
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾结点遍历
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒节点所关联的线程
            LockSupport.unpark(s.thread);
    }
    复制代码
    复制代码private void unparkSuccessor(Node node) { // 获取当前节点等待状态 int ws = node.waitStatus; // 若状态为SIGNAL、CONDITION或PROPAGATE,CAS将其状态置为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取后继节点 Node s = node.next; // 若后继节点为null或其状态为CANCELLED(等待超市或者被中断) if (s == null || s.waitStatus > 0) { s = null; // 从尾结点遍历 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒节点所关联的线程 LockSupport.unpark(s.thread); } 复制代码复制代码

    从源码中能够发现唤醒的节点从尾遍历而不是从头遍历,缘由是当前节点的后继可能为null、等待超时或被中断,因此从尾部向前进行遍

    共享式同步状态获取与释放

    共享式获取与独占式获取最主要的区别在于同一时刻可否有多个线程同时获取到同步状 态,例如ReentrantReadWriteLock中的读锁

  • 共享式获取同步状态
  • public final void acquireShared(int arg) {
            // 若获取失败自旋再次尝试
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    复制代码
    首先tryAcquireShared()尝试获取同步状态,若返回值大于等于0时代表获取成功,不然调用doAcquireShared()自旋获取同步状态
    private void doAcquireShared(int arg) {
            // 将共享节点加入同步队列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                // 中断标记
                boolean interrupted = false;
                // 死循环
                for (;;) {
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    // 若前驱节点为头节点
                    if (p == head) {
                        // 获取同步
                        int r = tryAcquireShared(arg);
                        // 若获取成功
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    // 判断线程是否须要阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    复制代码

  • 共享式释放同步状态
  • public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    复制代码
    释放了同步状态以后,会唤醒后续处于等待状态的节点,一样自定义同步器须要重写tryRelease()释放同步状态。不过由于是共享,会存在多个线程同时释放同步状态,因此 采用CAS,当CAS操做失败自旋循重试

    超时获取同步状态

    使用内置锁synchronized同步,可能会形成死锁,而AQS提供了超时获取同步状态,即在指定时间段内获取同步状态

  • 独占式超时获取同步状态
  • 相对于上面介绍的acquire()方法(此方法没法响应中断),AQS为了响应中断额外提供了acquireInterruptibly()方法,若当前线程被中断会当即响应中断抛出异常
    public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    复制代码
    该方法首先判断线程是否中断,如果抛出异常;不然执行tryAcquire()方法获取同步状态,获取成功直接结束不然执行doAcquireInterruptibly(),与acquireQueued()相似,最大区别在于其再也不使用interrupted标志,直接抛出InterruptedException异常
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    复制代码

    tryAcquireNanos()方法超时获取同步状态是响应中断获取同步状态的"加强版",增长了 超时控制

    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 超时时间    
        final long deadline = System.nanoTime() + nanosTimeout;
        // 将独占节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            // 死循环自旋
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 若前驱节点为头节点且获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 若获取失败,判断是否超时
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 判断线程是否中断    
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    复制代码
    复制代码private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 超时时间 final long deadline = System.nanoTime() + nanosTimeout; // 将独占节点 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 死循环自旋 for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 若前驱节点为头节点且获取同步状态成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 若获取失败,判断是否超时 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 判断线程是否中断 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码复制代码
    其思路:首先记录deadline超时时间获取同步状态,若获取失败判断是否超时,没有超时则计算剩余等待时间,若剩余时间小于等于0代表已经超时,若没有则判断是否大于spinForTimeoutThreshold(1000L),若是大于使用阻塞方式等待,不然仍然自旋等待,使用了LockSupport.parkNanos()方法来实现限时地等待,并支持中断

  • 共享式超时获取同步状态
  • 共享式获取响应中断doAcquireSharedInterruptibly()方法与共享式获取同步状态也相似,区别也是再也不使用interrupted标志,直接抛出InterruptedException异常。共享式超时获取大致思路也差很少,再也不多述。

    感谢

    《java并发编程的艺术》

    相关文章
    相关标签/搜索