AQS队列同步器

队列同步器(AbstractQueuedSynchronizer)是构建锁和其余同步组件的基础框架。看一下源码的介绍:java

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.node

它使用FIFO队列来完成资源获取线程的排队工做,使用一个int变量来表示同步状态。app

队列同步器的接口

状态的访问和修改

Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.框架

队列同步器(如下简称AQS)使用以下三个方法来访问和修改同步状态:ide

protected final int getState() {
        return state;
    }

 protected final void setState(int newState) {
        state = newState;
    }

 protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
复制代码

AQS可重写的方法

To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState, setState and/or compareAndSetState:ui

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

AQS可重写的5种方法默认都抛出UnsupportedOperationException,分别为独占式获取同步状态tryAcquire、独占式释放同步状态tryRelease、共享式获取同步状态tryAcquireShared、共享式释放同步状态tryReleaseShared、当前同步器是否在独占模式下被线程占用isHeldExclusively。this

模板方法

实现自定义同步组件时将会调用同步器提供的模板方法。模板方法可分为三类:独占式获取和释放同步准备状态、共享式获取和释放同步状态、查询同步队列中的等待线程状态。这些方法以下:atom

方法名称 描述
void acquire(int arg) 独占式获取同步状态,若是当前线程获取同步状态成功,则由该方法返回,不然将会进入同步队列等待。该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与acquire(int arg)相同,但该方法响应中断。若是该线程被中断,则该方法会抛出中断异常并返回。
boolean tryAcquireNanos(int arg,long nanos) 在acquireInterruptibly(int arg)基础上增长了超时限制和返回值。
void acquireShared(int arg) 共享式获取同步状态,若是当前线程未获取到同步状态,将会进入同步队列等待。
void acquireSharedInterruptibly(int arg) 与acquireShared(int arg)相同,但该方法响应中断。
boolean tryAcquireSharedNanos(int arg,long nanos) 在acquireSharedInterruptibly(int arg)基础上增长了超时限制和返回值。
boolean release(int arg) 独占式的释放同步状态。
boolean releaseShared(int arg) 共享式的释放同步状态
Collection getQueuedThreads() 获取等待在同步队列上的线程集合

实现分析

同步队列

当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构形成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程。Node源码以下:spa

static final class Node {
        
        static final Node SHARED = new Node();
       
        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;
        
        static final int SIGNAL    = -1;
        
        static final int CONDITION = -2;
       
        static final int PROPAGATE = -3;

        
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

复制代码

等待状态状态共有5种,分别是0、CANCELLED、SIGNAL、CONDITION、PROPAGATE。CANCELLED表示同步队列中等待的线程等待超时或者被中断,节点进入该状态将再也不变化。SIGNAL:后继节点的线程处于等待状态,而当前节点的线程若是释放了同步状态或者被取消将会通知后继节点。CONDITION:节点在等待队列中,节点线程等待在Condition上,当其余线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中。 回到上面的独占式状态获取方法acquire(int arg):线程

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工做。节点构造和加入队列源码以下:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
复制代码
private Node enq(final Node node) {
       for (;;) {
           Node t = tail;
           if (t == null) { // Must initialize
               if (compareAndSetHead(new Node()))
                   tail = head;
           } else {
               node.prev = t;
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
               }
           }
       }
   }
复制代码

节点进入同步队列以后,就进入到一个自旋的过程。每一个节点都在自省地观察,当条件知足,获取到了同步状态,就能够从这个自旋过程当中退出。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

当前线程获取同步状态并执行了相应的逻辑以后,就须要释放同步状态,使得后续节点可以继续获取同步状态release(int arg)代码以下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码
private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码
相关文章
相关标签/搜索