Java并发AQS原理分析(一)

咱们说的AQS就是AbstractQueuedSynchronizer,他在java.util.concurrent.locks包下,这个类是Java并发的一个核心类。第一次知道有这个类是在看可重入锁ReentrantLock中,在ReentrantLock中有一个内部类Sync继承于AbstractQueuedSynchronizer,是ReentrantLock的核心实现。在并发包中的锁几乎都是基于AQS来构建的,可是在看源码的时候就会发现他们并无直接继承AbstractQueuedSynchronizer,而是经过内部类Sync实现。java

abstract static class Sync extends AbstractQueuedSynchronizer

这里注意的是AbstractQueuedSynchronizer是一个抽象类,定义了基本的框架。AQS核心是用一个变量state来表示状态.
AQS也就是AbstractQueuedSynchronizer这个类只是定义了一个队列管理线程,对于线程的状态是子类维护的,咱们能够理解为师一个同步队列,当有线程获取锁失败时(多线程争用资源被阻塞时会进入此队列),线程会被添加到队列的队尾node

总结:安全

  • AQS只是负责管理线程阻塞队列。
  • 线程的阻塞和唤醒

同步器是实现锁的关键(例如AQS队列同步器),利用同步器实现锁的定义。锁匙面向用户的,它定义了使用者和锁交互的接口,可是隐藏了实现的细节。同步器则是锁的实现,因此他是在锁的背后默默作着贡献,用户不能直接的接触到他,他简化了锁的实现方式,屏蔽了同步状态管理、线程之间的排队、等待、唤醒等操做。这样设计很好的隔离了使用者和实现者关注的领域。多线程

上面的表示了队列的形态,head表示队列的头节点,tail表示队列的尾节点。在源码中他们的定义使用volatile定义的。使用volatile关键字保证了变量在内存中的可见性,详见:volatile关键字解析。保证某个线程在出队入队时被其余线程看到。并发

private transient volatile Node head;//头节点
private transient volatile Node tail;//尾节点

AbstractQueuedSynchronizer这个类中还有一个内部类Node,用于构建队列元素的节点类。框架


在AQS中定义了两种资源共享方式:源码分析

  • Exclusive:独占式
  • Share:共享式ui

    当以独占模式获取时,尝试经过其余线程获取不能成功。 多线程获取的共享模式可能(但不须要)成功。 当共享模式获取成功时,下一个等待线程(若是存在)也必须肯定它是否也能够获取。 在不一样模式下等待的线程共享相同的FIFO队列。.net

在不一样的实现类中为了实现不一样的功能,会采用不一样的共享方式,例如可重入锁ReentrantLock采用的就是独占锁。
AQS的不一样实现类,不须要关注线程等待队列的维护和管理(线程阻塞入队、唤醒出队),在AQS中这些是已经定义好的,不一样的同步器只须要对如下方法进行实现便可:线程

//独占方式尝试获取资源
protected boolean tryAcquire(int arg)
//独占方式尝试释放资源
protected boolean tryRelease(int arg)
//共享方式尝试获取资源,返回值0表示成功可是没有剩余资源,负数表示失败,正数表示成功且有剩余资源
protected int tryAcquireShared(int arg)
//共享方式尝试释放资源
protected boolean tryReleaseShared(int arg)

全部自定义的同步器只须要肯定本身是那种资源贡献方式便可:共享式、独占式。也能够同时实现共享式和独占式ReentrantReadWriteLock读写锁,多个线程能够同时进行读操做,可是只能有一个线程进行写操做。


独占模式同步状态获取:

首先先从代码开始执行的地方看:

以独占模式获取资源,忽略中断。(若是获取到资源,直接返回结果,不然进入等待队列,等待再次获取资源。) 经过调用至少一次tryAcquire(int)实现,成功返回。 不然线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

方法执行的顺序:

  • 调用tryAcquire()方法尝试去获取资源,具体在子类中进行实现
  • 调用addWaiter()方法把当前线程标记为独占式,并加入到队列的尾部

这里须要讲一下addWaiter()方法中的第一个参数,线程等待队列中的元素都是利用Node这个内部类存储的,在Node中有两个成员变量分别声明了资源共享方式:

static final Node SHARED = new Node();//共享式
        static final Node EXCLUSIVE = null;//独占式
  • 调用acquireQueued()方法,让线程在队列中等待获取资源,获取资源后返回,若是在这个等待过程当中线程被中断过,返回true,不然返回false

在方法中首先调用tryAcquire(int)方法,该方法在AbstractQueuedSynchronizer并无实现,须要子类去实现:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

第二步调用addWaiter()方法:该方法是负责维护线程等待队列的方法,因此在AbstractQueuedSynchronizer中实现了该方法:具体是建立了一个节点类,把节点放在队尾,若是失败调用enq(node)方法(队尾节点为空)。

addWaiter()方法:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

上面的方法判断,若是添加到队尾失败
enq()方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //若是队列为空(队尾元素为空)建立节点添加进去
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    //把tail指向head
                    tail = head;
            } else {
                //正常添加到队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面的代码中添加节点都用到了比较和交换(CAS,能够说是一种在并发环境下的解决方法),compareAndSetTail()方法可以确保节点能被安全的添加进队列中,在多线程环境下没法保证一个元素被正确的添加到队列的尾部。由于进入队列的元素都是放在队尾的,为了保证数据的正确性,因此在设置尾节点的时候使用CAS
第三步调用acquireQueued()方法,目的是为了在队列中等待被唤醒使用资源,由于以前的操做失败后,线程会被放入队尾,队列是先进先出的结构,因此在队尾的线程必须等待被唤醒。方法中主要有一个死循环,咱们称他叫自旋,只有当条件知足的时候,得到同步状态,退出自旋。
acquireQueued()方法:

final boolean acquireQueued(final Node node, int arg) {
        //设置成功标记
        boolean failed = true;
        try {
            //设置中断标记
            boolean interrupted = false;
            for (;;) {
                //得到node的前驱节点
                final Node p = node.predecessor();
                //判断前驱结点是不是头节点
                if (p == head && tryAcquire(arg)) {
                    //把node设置为头结点
                    setHead(node);
                    //把p节点的前驱设置为null,见下面的解释
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判断是否继续等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

把p节点的前驱设置为null,也就是以前的head节点,在上面源码中后面的注释标记为help GC功能,解释一下:在调用上面的setHead()方法的时候,方法的内部已经将当前节点的前驱结点设置为null,在这里再次设置一遍,为了保证当前节点的前驱结点顺利被回收(当前节点设置为头节点,那么以前的头节点就要被释放,模拟一个正常的出队过程)。本身画图更好理解。

setHead()方法:

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

这里分析上面调用的acquireQueued()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取前驱节点的状态
        int ws = pred.waitStatus;
        //若是当前节点状态值为SIGNAL这个值,表明当前线程应该被挂起,等待被唤醒
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            //若是大于0表明将当前节点的前驱节点移除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //小于0时把前驱结点状态值设置为SIGNAL,目的是为了前驱判断后将当前节点挂起(通知本身一下)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

在这里咱们须要看一下Node这个类中定义的关于状态值的定义:

//表示线程已取消,做废状态
        static final int CANCELLED =  1;
        //表示后继节点应该等待当前节点释放资源后唤醒其后继节点
        static final int SIGNAL    = -1;
        //表示当前正处于等待状态
        static final int CONDITION = -2;
        //表示状态须要向后传播
        static final int PROPAGATE = -3;
  • CANCELLED 取消状态
  • SIGNAL 等待触发状态
  • CONDITION 等待条件状态
  • PROPAGATE 状态须要向后传播

等待队列是FIFO先进先出,只有前一个节点的状态为SIGNAL时,当前节点的线程才能被挂起。 因此在方法调用的时候把前驱结点设置为SIGNAL。
由于前一节点被置为SIGNAL说明后面有线程须要执行,可是还轮不到它后面的线程执行,后面线程必定要找一个前驱节点不为CANCEL的节点,而后把它设置为SIGNAL而后原地挂起,等待唤醒。 由于SIGNAL执行完了会唤醒紧接着的后面一个。


总结:
AQS中定义的acquire()模板方法,具体经过调用子类中的tryAcquire()方法尝试去获取资源,成功则返回,失败调用addWaiter()将当前线程添加到阻塞队列的队尾,同时标记为独占状态。acquireQueued()方法经过自旋获取同步状态(该方法使线程在等待队列中等待休息,当有机会时尝试获取资源),节点尝试获取资源的条件是当前节点的前驱节点是头节点,尝试获取到资源后才返回,在整个等待过程当中若是发生过中断,不作响应,在获取资源后调用selfInterrupt()方法设置中断。


独占模式下同步状态的释放:

上面根据源码分析了独占模式下得到锁的过程主要调用了模板方法acquire()方法向下分析,接着咱们分析它的相反的方法,独占模式下释放锁的过程,仍是一个模板方法release()

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //获得头节点
            Node h = head;
            //判断头节点不为空,状态值符合条件
            if (h != null && h.waitStatus != 0)
                //唤醒下一个等待线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease()方法依然须要子类去本身实现

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

unparkSuccessor()方法:

private void unparkSuccessor(Node node) {
        //得到当前线程的状态值
        int ws = node.waitStatus;
        if (ws < 0)
            //小于0时置零
            compareAndSetWaitStatus(node, ws, 0);
        //得到当前节点的后继节点
        Node s = node.next;
        //判断为空和状态值是否大于0
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从尾节点向前遍历,须要唤醒的线程一般是存储在下一个节点中的
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒线程
            LockSupport.unpark(s.thread);
    }

unpark()方法唤醒的是等待队列中最前面的线程,以后会再次执行上面的过程。

总结:在获取同步状时,在使用者的角度看在使用锁时,同步器会维护一个同步队列,获取状态失败的线程会被加入这个队列并进行自旋;当该节点的前驱节点是头节点的时候而且得到了同步状态时移出队列。在释放的时候,调用tryRelease()释放并唤醒后继节点。

相关文章
相关标签/搜索