AbstractQueuedSynchronizer&CountDownLatch源码分析

前言

要看CountDownLatch源码,你会发现其中的核心是由一个继承了AbstractQueuedSynchronizer类的静态内部类Sync。 实际上ReentrantLock,Semaphore等线程控制类的内部都是基于AbstractQueuedSynchronizer实现的。 先来了解一下AbstractQueuedSynchronizer。node

AbstractQueuedSynchronizer(简称AQS)是什么

队列同步器:用来构建锁或者其它同步组件的基础框架。它内部使用一个int型成员变量state表示同步状态,经过一个内置的FIFO队列来完成资源获取线程的排队工做。

AQS就是提供了获取锁,FIFO排队,释放锁这些逻辑的基础框架。ReentrantLock,Semaphore,CountDownLatch等并发类都是基于AQS实现的。他们的内部都有一个静态内部类继承了AQS。

至关于就是说,ReentrantLock,Semaphore,CountDownLatch等并发类只是提供给使用者的门面,内部核心逻辑是由AQS实现的。

AQS源码分析

基本介绍

AQS中的一些核心方法:并发

# 修改线程同步状态的方法:
getState(): 获取当前同步状态
setState(int newState): 设置当前同步装
compareAndSetState(int expect,int update):使用CAS乐观锁设置当前同步状态,保证原子性

# 独占式/共享式获取/释放同步状态:
tryAcquire(int arg) 独占式获取同步状态
tryRelease(int arg) 独占式释放同步状态
tryAcquireShared(int arg) 共享式获取同步状态
tryReleaseShared(int arg) 共享式释放同步状态
isHeldExclusively() 当前同步器释放在独占模式下被线程占用。返回true表示被当前线程独占

# 获取同步队列的相关信息
getQueuedThreads() 获取等待在同步队列上的线程集合

同步器依赖于内部的FIFO队列来进行同步状态管理。当线程获取同步状态失败时,同步器会将当前线程构形成一个节点Node并加入同步队列,而且会阻塞当前线程。当同步状态释放时,会把队列中的第一个线程唤醒,使其获取同步状态。框架

同步队列

那么首先来看看这个同步队列是什么样子的吧。 查看Node节点类:源码分析

static final class Node {
		/**标记这个节点是处于共享模式*/
        static final Node SHARED = new Node();
		/**标记这个节点是处于独占模式*/
        static final Node EXCLUSIVE = null;
		
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
		 /**表示线程的等待状态。
		 1表示cancelled, 表示这个等待的线程等待超时或者被中断,须要从同步队列中取消等待; 
		 -1表示signal,后继节点处于等待状态,而当前节点若是释放了同步状态或者被取消,将会唤醒后继节点,使其继续运行
		 -2表示condition,当前线程等待在condition上,当其余线程对condition调用了signal方法后,改节点就会移出等待队列,去获取同步状态;
		 -3表示propagate,表示下一次共享式同步状态将会无条件传播下去;
		 0表示initial,初始状态*/
        volatile int waitStatus;

       /**前一个节点*/
        volatile Node prev;

       /**后一个节点*/
        volatile Node next;

		 /**也是用来表示下一个节点。若是当前节点是共享的,那么这个字段就是SHARED常量*/
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

		 /**返回上一个节点*/
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {
        }

        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

能够看出,Node节点中保存了线程引用,等待状态,是共享仍是独占,先后节点引用等信息。ui

再看AQS的三个属性:this

/**等待队列的头节点*/
    private transient volatile Node head;

	 /**等待队列的尾节点*/
    private transient volatile Node tail;

	 /**这就是表示同步状态的属性*/
    private volatile int state;

在AQS中维护了同步队列的头节点和尾节点,以及同步状态。线程

因此同步队列的结构就很清楚了: code

独占锁的获取和释放

那么线程是怎么被放入队列的呢?从acquire方法开始跟踪:blog

//这个方法就是尝试获取独占锁
public final void acquire(int arg) {
       //若是获取不到锁,就把当前线程加入到等待队列进行等待
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
			// 打断当前线程,其实就是本身打断本身
            selfInterrupt();
    }

先看一下addWaiter方法,这个方法就是给当前线程构造一个Node节点。继承

private Node addWaiter(Node mode) {
       //mode表示当前线程是独占式仍是共享式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
			//将当前节点加入到等待队列的尾部
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
		//没有尾节点的话就进行初始化
        enq(node);
        return node;
    }

下面继续跟踪acquireQueued方法:

//这个方法就是在死循环等待锁
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
			    // 获取前一个节点
                final Node p = node.predecessor();
				//若是前一个节点是头节点,而且获取到了锁,那么当前节点这只为头节点,并返回。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

也就是说,等待队列的头节点实际上就是当前得到锁的节点,后面的节点死循环一直等待。 可是只有头节点的后一个节点才会不断尝试获取锁,由于要保证FIFO顺序。 新进来的线程是放在队列尾部进行死循环等待的。

那么实际上等待队列是这样的流程:

下面再跟踪释放锁的过程:

//释放独占锁
 public final boolean release(int arg) {
        //判断是否能够释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

这里当waitStatus不为0的时候,才会调用unparkSuccessor方法去唤醒后面一个方法。

什么场景下head节点的waitStatus不为0呢??

 

进入unparkSuccessor方法:

共享锁的获取和释放

TODO 先略了,后面在补充。。。。。

CountDownLatch源码分析

了解的AQS的原理,那么看 CountDownLatch的源码就so easy了。由于CountDownLatch只是一个门面,核心逻辑就是AQS。

相关文章
相关标签/搜索