JAVA多线程之AQS的实现重入锁ReentrantLock

在JAVA体系当中锁是实现多线程中同步的一个重要机制, JAVA的锁有synchronized这个是JVM层面 实现的锁, JAVA实现的锁同步有: Lock、ReadWriteLock、ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier,包括上一篇中讲到的ThreadPoolExecutor中Worker执行用到锁 也是来自于本文所讲的AQS实现,因此要理解JAVA的锁,除了JVM里实现的外,只要搞懂AQS的就能够了, 话很少说,赶忙开始吧java

AQS的数据结构

image.png

AbstractQueuedSynchronizer的类以及重要成员变量

下面代码中能够看出AQS的类继承关系比较简单,就是继承AbstractOwnableSynchronizer类 并实现了Serializable接口,node

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
复制代码

AbstractOwnableSynchronizer类功能比较简单,主要是设置锁的排他线程,设置排它线程表示是排它锁,没有设置表示共享锁。markdown

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
复制代码

下面是AQS中比较重要的是三个成员变量, head是等待队列中头节点, tail是等待队列中尾结点,state是同步的状态值。数据结构

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

/**
 * The synchronization state.
 */
private volatile int state;
复制代码

加下来看下NOde类的数据结构的定义:
等待队列中waitStatus(等待状态)的的枚举:
1.CANCELLED(1) 这个节点是因为超时或者中断。
2.SIGNAL(-1) 这个节点的后继是被阻塞的,因此当前Node取消或者释放锁时,须要unpark去唤醒后继节点。
3.CONDITION(-2) 这个节点是在条件等待队列。
4.PROPAGATE(-3) 就是对当前的节点操做传播到其余节点, 好比doReleaseShared共享模式的释放锁就是传播后续节点。 5.0 不是上面的任何一种状态多线程

static final class Node {

    /**标志它是共享模式 */
    static final Node SHARED = new Node();
    /** 标志他是排它模式 */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    ```
    volatile int waitStatus;
     /**
     * 链接当前节点的前一个Node,
     */
    volatile Node prev;
     /**
     * 链接当前节点的下一个一个Node,
     */
    volatile Node next;
    
    volatile Thread thread;

    /**
     * 指向条件等待队列的下一个Node或者是SHARED的Node,由于条件队列
     * 只有排它模式下才能访问, 这里只须要一个简单的单向链表去保存等待条件
     * 变量的node,由于条件变量只能是排它锁,因此用这个nextWaiter也能够
     * SHARED变量去表示当前是共享锁
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     * 判断是否共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    /**
    * 返回等待队列的前一个node
    */
   final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
复制代码

加锁流程

首先看下ReentrantLock的获取锁的过程,它的构造函数初始化一个NonfairSync,也就是说 这个重入锁是非公平锁,app

public ReentrantLock() {
    sync = new NonfairSync();
}
复制代码

从这里看到ReentrantLoc默认建立的非公平锁,调用lock会直接进行CAS尝试抢锁(这里state=0表明无所,1表明有一个线程已经抢到了锁),若是没有抢到锁,则正常调用AQS的acquire方法默认参数1去获取锁,函数

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            //设置排他线程为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
          // 抢锁
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
复制代码

那么接下来重点看下AQS的acquire方法获取锁的逻辑 1 tryAcquire尝试获取锁, 2. 若是获取失败,则加入一个排他的Node到等待队列ui

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
         //获取队列
        acquireQueued(
        //加入等待队列
        addWaiter(Node.EXCLUSIVE), arg))
        //中断当前线程
        selfInterrupt();
}
复制代码

非公平锁,一开始就会尝试抢锁,而无论以前等待队列中是否有等待的抢锁的线程,this

  1. 此时state值为0,则直接经过CAS将其值修改为acquires(ReentrentLock是1),并设置排他线程为它本身,这里说明ReentrantLock的加的锁是排它锁。
  2. 当排它线程就是当前线程,则表示当前线程已经获取锁,这个时候表明是偏向锁,增长acquires值,并赋值给state变量 ,表示同一线程同时同一把锁的次数,
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //直接尝试获取锁
        if (compareAndSetState(0, acquires)) {
           //设置排它锁为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //当前线程已经获取锁,偏向锁的处理
    else if (current == getExclusiveOwnerThread()) {
        // 以前的state值,加上申请的值
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
            //赋值当前state
            setState(nextc);
        return true;
    }
    return false;
}
复制代码

上面的tryAcquire这一步尝试抢锁失败后,而后执行addWaiter(Node.EXCLUSIVE),新建一个排它模式的Node并加入等待队列。atom

private Node addWaiter(Node mode) {
    //新建Node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //保存队列尾部节点
    Node pred = tail;
    //若是尾节点不为空
    if (pred != null) {
       //将新的节点prev指针指向以前的尾指针
        node.prev = pred;
       //CAS的原子操做将新建的Node加入队列尾部
        if (compareAndSetTail(pred, node)) {
            //将以前的尾部的next指针新建的node
            pred.next = node;
            return node;
        }
    }
    //若是上面的经过CAS失败了,则表示有其余线程已经加入了队列,则经过enq加入
    //新建的Node到等待队列
    enq(node);
    return node;
}
复制代码

下图是上面node加入到队列尾部的过程,这里注意双向队列首先是新增的node的prev指针指向原先的tail,至于为何在下面会有分析 image.png

下面enq就是经过for的无限循环中再以CAS方式加入了新建的Node,这样保证新建的Node必定加入到等待队列中。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //等待队列为空
        if (t == null) { // Must initialize
            //设置虚头节点,就是空的Node
            if (compareAndSetHead(new Node()))
                //将尾部节点指向都节点
                tail = head;
        } else {
           //首先让新建的node的前驱节点指向当前队列的尾部节点
            node.prev = t;
            //CAS设置新建的node为尾部接节点
            if (compareAndSetTail(t, node)) {、
               //将新建的Node设置为新的尾部节点
                t.next = node;
                return t;
            }
        }
    }
}
复制代码

将新建的尾部节点加入等待队列后,而后执行acquireQueued方法,参数是上面加入到等待队列中的Node以及args(tryAcquire的acquire变量),这个方法的逻辑是处理新的Node加入等待队列,而后尝试从队列中唤醒以前的等待队列中的线程去运行,

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
           //获取上一步加入队列尾部的节点的前驱节点
            final Node p = node.predecessor();
             //若是前驱是head,说明以前乜有的线程了,则再次调用tryAcquire获取锁
            if (p == head && tryAcquire(arg)) {
           //设置新加入的node为头节点,而且设置node的thread为空,status并无改变
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //若是node的前驱不是头节点,则调用判断前驱节点的waitStatus
            //是SIGNAL,则返回true
            if (shouldParkAfterFailedAcquire(p, node) &&
               //能走到这一步,说明上一步中前驱节点的waitStatus是SIGNAL,则执行  // LockSupport的park让线程等待,直到有线程释放锁后执行LockSupport.unpark,而且检查线程是否中断
                parkAndCheckInterrupt())
                //设置中断标志为true
                interrupted = true;
        }
    } finally {
        //若是此时failed是true,表示获取锁失败,则执行cancelAcquire的,则取消获        // 取锁
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

shouldParkAfterFailedAcquire是获取锁后失败后,须要park阻塞等待锁,

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取前驱节点的waitStatus
    int ws = pred.waitStatus;
    //若是是Node的等待状态是SIGNAL,则直接返回true,表示当释放锁的时候能够直接唤     //醒它,因此它是能够直接能够阻塞去等待锁。
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
       //若是等待状态大于0,表明是1(CANCELLED)取消状态,
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
           //若是pred是取消状态的Node,则删除pred节点
          node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //前驱节点的next节点指向新增的尾部node
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         //若是等待状态是0或者PROPAGATE,表示须要唤醒,可是不须要park,
         //调用者须要重试去保证它在park以前不能获取锁
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
复制代码

若是获取锁失败,acquireQueued方法中failed为true时,调用cancelAcquire取消获取锁,

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors
    //移除取消状态的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 获取前驱的next节点
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    //设置当前状态为取消状态
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    //若是当前节点
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        // 前驱节点不是头节点
        if (pred != head &&
            //当前驱的waitStatus等于SIGNAL状态
            ((ws = pred.waitStatus) == Node.SIGNAL ||
              //或则状态小于0,表明不是取消状态, 那么设置状态为SIGNAL状态
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
             //前驱节点的thread不为空,表示不是虚头节点
            pred.thread != null) {
            //获取node的next节点
            Node next = node.next;
            //若是此时next不为空 而且waitStatus小于0,表示有后续节点而且后续节点不是取消状态
            if (next != null && next.waitStatus <= 0)
           //则将pred的next直接pred的下一个指针指向node的next, 至关于直接移除node节点
             compareAndSetNext(pred, predNext, next);
        } else {
          //1.前驱节点是头节点
          //2.前驱节点不是SIGNAL状态而且(waiteStatus是取消状态或者成功设置waitStatus为SIGNAL状态)
          // 3.前驱的thread为空,表明是前驱是虚头节点
          // 以上三种状况须要唤醒链表中从尾部开始遍历开始往前找到最前的第一个
          //waitStatus不是取消状态的节点
           unparkSuccessor(node);
        }
        //将node的next节点指向当前节点
        node.next = node; // help GC
    }
}
复制代码

唤醒前驱节点的逻辑是从tail节点从尾部往前,经过找到链表上一个waiteStatus小于0的节点(即不是取消状态的线程),而后执行LockSupport的unpark唤醒等待锁的线程, 这个就是上面提到双向链表是两个指针,分两步首先是加的prev指针,因此得从从后往前遍历就能保证遍历到所有节点

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    //表明此时节点须要唤醒
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
复制代码

释放锁

接来下咱们看下如何释放锁,他同样是来自于AQS父类的release方法,

public final boolean release(int arg) {
   //首先经过tryRelease方法抽象方法,由子类实现,实际由Sync类实现
    if (tryRelease(arg)) {
        // 获取虚头节点
        Node h = head;
        //若是头节点不为空,而且waitStatus不是为0,则说有须要等待锁的线程
        if (h != null && h.waitStatus != 0)
            //唤醒第一个等待锁的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
复制代码

将state变量减去release后,若是c = 0,说明已经没有无锁了,那么设置排它线程为NULL, 并设置state变量为0

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
复制代码

总结 本文主要对于ReentrantLock的加排它锁和释放排它锁的过程作了一次详细的分析,主要是围绕state变量还有CLH的双端队列展开,以及线程阻塞(LockSupport.park)和唤醒(LockSupport.park),最后ConditionObject没有分析到,后面再写。

相关文章
相关标签/搜索