简单解释一下J.U.C,是JDK中提供的并发工具包,
java.util.concurrent
。里面提供了不少并发编程中很经常使用的实用工具类,好比atomic原子操做、好比lock同步锁、fork/join等。
我想以lock做为切入点来说解AQS,毕竟同步锁是解决线程安全问题的通用手段,也是咱们工做中用得比较多的方式。java
Lock是一个接口,方法定义以下node
void lock() // 若是锁可用就得到锁,若是锁不可用就阻塞直到锁释放 void lockInterruptibly() // 和 lock()方法类似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常 boolean tryLock() // 非阻塞获取锁;尝试获取锁,若是成功返回true boolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法 void unlock() // 释放锁
实现Lock接口的类有不少,如下为几个常见的锁实现编程
读和读不互斥、读和写互斥、写和写互斥
。也就是说涉及到影响数据变化的操做都会存在互斥。如何在实际应用中使用ReentrantLock呢?咱们经过一个简单的demo来演示一下api
public class Demo { private static int count=0; static Lock lock=new ReentrantLock(); public static void inc(){ lock.lock(); try { Thread.sleep(1); count++; } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } }
这段代码主要作一件事,就是经过一个静态的incr()
方法对共享变量count
作连续递增,在没有加同步锁的状况下多线程访问这个方法必定会存在线程安全问题。因此用到了ReentrantLock
来实现同步锁,而且在finally语句块中释放锁。
那么我来引出一个问题,你们思考一下安全
多个线程经过lock竞争锁时,当竞争失败的锁是如何实现等待以及被唤醒的呢?
aqs全称为AbstractQueuedSynchronizer,它提供了一个FIFO队列,能够当作是一个用来实现同步锁以及其余涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等。
AQS是一个抽象类,主要是经过继承的方式来使用,它自己没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。
能够这么说,只要搞懂了AQS,那么J.U.C中绝大部分的api都能轻松掌握。数据结构
从使用层面来讲,AQS的功能分为两种:独占和共享多线程
仍然以ReentrantLock为例,来分析AQS在重入锁中的使用。毕竟单纯分析AQS没有太多的含义。先理解这个类图,能够方便咱们理解AQS的原理架构
AQS的实现依赖内部的同步队列,也就是FIFO的双向队列,若是当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构形成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁之后,会从队列中唤醒一个阻塞的节点(线程)。并发
AQS队列内部维护的是一个FIFO的双向链表,这种结构的特色是每一个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。因此双向链表能够从任意一个节点开始很方便的访问前驱和后继。每一个Node实际上是由线程封装,当线程争抢锁失败后会封装成Node加入到ASQ队列中去
Node类的组成以下函数
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; //前驱节点 volatile Node next; //后继节点 volatile Thread thread;//当前线程 Node nextWaiter; //存储在condition队列中的后继节点 //是否为共享锁 final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } //将线程构形成一个Node,添加到等待队列 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //这个方法会在Condition队列使用,后续单独写一篇文章分析condition Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
当出现锁竞争以及释放锁的时候,AQS同步队列中的节点会发生变化,首先看一下添加节点的场景。
这里会涉及到两个变化
head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,若是后继节点得到锁成功,会把本身设置为头结点,节点的变化过程以下
这个过程也是涉及到两个变化
这里有一个小的变化,就是设置head节点不须要用CAS,缘由是设置head节点是由得到锁的线程来完成的,而同步锁只能由一个线程得到,因此不须要CAS保证,只须要把head节点设置为原首节点的后继节点,而且断开原head节点的next引用便可
清楚了AQS的基本架构之后,咱们来分析一下AQS的源码,仍然以ReentrantLock为模型。
调用ReentrantLock中的lock()方法,源码的调用过程我使用了时序图来展示
从图上能够看出来,当锁获取失败时,会调用addWaiter()方法将当前线程封装成Node节点加入到AQS队列,基于这个思路,咱们来分析AQS的源码实现
public void lock() { sync.lock(); }
这个是获取锁的入口,调用sync这个类里面的方法,sync是什么呢?
abstract static class Sync extends AbstractQueuedSynchronizer
sync是一个静态内部类,它继承了AQS这个抽象类,前面说过AQS是一个同步工具,主要用来实现同步控制。咱们在利用这个工具的时候,会继承它来实现同步控制功能。
经过进一步分析,发现Sync这个类有两个具体的实现,分别是NofairSync(非公平锁)
,FailSync(公平锁)
.
公平锁和非公平锁的实现上的差别,我会在文章后面作一个解释,接下来的分析仍然以非公平锁
做为主要分析逻辑。
final void lock() { if (compareAndSetState(0, 1)) //经过cas操做来修改state状态,表示争抢锁的操做 setExclusiveOwnerThread(Thread.currentThread());//设置当前得到锁状态的线程 else acquire(1); //尝试去获取锁 }
这段代码简单解释一下
compareAndSetState
compareAndSetState的代码实现逻辑以下
// See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这段代码其实逻辑很简单,就是经过cas乐观锁的方式来作比较并替换。上面这段代码的意思是,若是当前内存中的state的值和预期值expect相等,则替换为update。更新成功返回true,不然返回false. 这个操做是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操做,一级涉及到state这个属性的意义。 **state**
private volatile int state;须要注意的是:不一样的AQS实现,state所表达的含义是不同的。
Unsafe
Unsafe类是在sun.misc包下,不属于Java标准。可是不少Java的基础类库,包括一些被普遍使用的高性能开发库都是基于Unsafe类开发的,好比Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操做,如直接内存访问、线程调度等public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);这个是一个native方法, 第一个参数为须要改变的对象,第二个为偏移量(即以前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值
整个方法的做用是若是当前时刻的值等于预期值var4相等,则更新为新的指望值 var5,若是更新成功,则返回true,不然返回false;
acquire是AQS中的方法,若是CAS操做未能成功,说明state已经不为0,此时继续acquire(1)操做,这里你们思考一下,acquire方法中的1的参数是用来作什么呢?若是没猜中,往前面回顾一下state这个概念
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这个方法的主要逻辑是
若是你们看过我写的 Synchronized源码分析的文章,就应该可以明白自旋存在的意义
这个方法的做用是尝试获取锁,若是成功返回true,不成功返回false
它是重写AQS类中的tryAcquire方法,而且你们仔细看一下AQS中tryAcquire方法的定义,并无实现,而是抛出异常。按照通常的思惟模式,既然是一个不实现的模版方法,那应该定义成abstract,让子类来实现呀?你们想一想为何
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
tryAcquire(1)在NonfairSync中的实现代码以下
ffinal boolean nonfairTryAcquire(int acquires) { //得到当前执行的线程 final Thread current = Thread.currentThread(); int c = getState(); //得到state的值 if (c == 0) { //state=0说明当前是无锁状态 //经过cas操做来替换state的值改成1,你们想一想为何要用cas呢? //理由是,在多线程环境中,直接修改state=1会存在线程安全问题,你猜到了吗? if (compareAndSetState(0, acquires)) { //保存当前得到锁的线程 setExclusiveOwnerThread(current); return true; } } //这段逻辑就很简单了。若是是同一个线程来得到锁,则直接增长重入次数 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //增长重入次数 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
当tryAcquire方法获取锁失败之后,则会先调用addWaiter将当前线程封装成Node,而后添加到AQS队列
private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE //将当前线程封装成Node,而且mode为独占锁 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail是AQS的中表示同步队列队尾的属性,刚开始为null,因此进行enq(node)方法 Node pred = tail; if (pred != null) { //tail不为空的状况,说明队列中存在节点数据 node.prev = pred; //讲当前线程的Node的prev节点指向tail if (compareAndSetTail(pred, node)) {//经过cas讲node添加到AQS队列 pred.next = node;//cas成功,把旧的tail的next指针指向新的tail return node; } } enq(node); //tail=null,将node添加到同步队列中 return node; }
enq就是经过自旋操做把当前节点加入到队列中
private Node enq(final Node node) { //自旋,不作过多解释,不清楚的关注公众号[架构师修炼宝典] for (;;) { Node t = tail; //若是是第一次添加到队列,那么tail=null if (t == null) { // Must initialize //CAS的方式建立一个空的Node做为头结点 if (compareAndSetHead(new Node())) //此时队列中只一个头结点,因此tail也指向它 tail = head; } else { //进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,而后使用CAS将tail指向Node node.prev = t; if (compareAndSetTail(t, node)) { //t此时指向tail,因此能够CAS成功,将tail从新指向Node。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点 t.next = node; return t; } } } }
假若有两个线程t1,t2同时进入enq方法,t==null表示队列是首次使用,须要先初始化
另一个线程cas失败,则进入下次循环,经过cas操做将node添加到队尾
到目前为止,经过addwaiter方法构造了一个AQS队列,而且将线程添加到了队列的节点中
将添加到队列中的Node做为参数传入acquireQueued方法,这里面会作抢占锁的操做
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException if (p == head && tryAcquire(arg)) {// 若是前驱为head才有资格进行锁的抢夺 setHead(node); // 获取锁成功后就不须要再进行同步操做了,获取锁成功的线程做为新的head节点 //凡是head节点,head.thread与head.prev永远为null, 可是head.next不为null p.next = null; // help GC failed = false; //获取锁成功 return interrupted; } //若是获取锁失败,则根据节点的waitStatus决定是否须要挂起线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志 interrupted = true; } } finally { if (failed) // 若是抛出异常则取消锁的获取,进行出队(sync queue)操做 cancelAcquire(node); } }
前面的逻辑都很好理解,主要看一下shouldParkAfterFailedAcquire这个方法和parkAndCheckInterrupt的做用
从上面的分析能够看出,只有队列的第二个节点能够有机会争用锁,若是成功获取锁,则此节点晋升为头节点。对于第三个及之后的节点,if (p == head)条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)操做
shouldParkAfterFailedAcquire方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为Node.SIGNAL,若是是,是说明此节点已经将状态设置-若是锁释放,则应当通知它,因此它能够安全的阻塞了,返回true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前继节点的状态 if (ws == Node.SIGNAL)//若是是SIGNAL状态,意味着当前线程须要被unpark唤醒 return true; 若是前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操做实际是把队列中CANCELLED的节点剔除掉。 if (ws > 0) {// 若是前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 若是前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若是shouldParkAfterFailedAcquire返回了true,则会执行:parkAndCheckInterrupt()
方法,它是经过LockSupport.park(this)将当前线程挂起到WATING状态,它须要等待一个中断、unpark方法来唤醒它,经过这样一种FIFO的机制的等待,来实现了Lock的操做。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
LockSupport
LockSupport类是Java6引入的一个类,提供了基本的线程同步原语。LockSupport其实是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time);unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,可是这个“许可”是不能叠加的,“许可”是一次性的。
permit至关于0/1的开关,默认是0,调用一次unpark就加1变成了1.调用一次park会消费permit,又会变成0。 若是再调用一次park会阻塞,由于permit已是0了。直到permit变成1.这时调用unpark会把permit设置为1.每一个线程都有一个相关的permit,permit最多只有一个,重复调用unpark不会累积
加锁的过程分析完之后,再来分析一下释放锁的过程,调用release方法,这个方法里面作两件事,1,释放锁 ;2,唤醒park的线程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
这个动做能够认为就是一个设置锁状态的操做,并且是将状态减掉传入的参数值(参数是1),若是结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。
在排它锁中,加锁的时候状态会增长1(固然能够本身修改这个值),在解锁的时候减掉1,同一个锁,在能够重入后,可能会被叠加为二、三、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,并且也只有这种状况下才会返回true。
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 这里是将锁的数量减1 if (Thread.currentThread() != getExclusiveOwnerThread())// 若是释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 因为重入的关系,不是每次释放锁c都等于0, // 直到最后一次释放锁时,才会把当前线程释放 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),当前线程被释放以后,须要唤醒下一个节点的线程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {//判断后继节点是否为空或者是不是取消状态, s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //而后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点, 至于为何从尾部开始向前遍历,由于在doAcquireInterruptibly.cancelAcquire方法的处理过程当中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,若是这时执行了unparkSuccessor方法,而且向后遍历的话,就成了死循环了,因此这时只有prev是稳定的 s = t; } //内部首先会发生的动做是获取head节点的next节点,若是获取到的节点不为空,则直接经过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁 if (s != null) LockSupport.unpark(s.thread); //释放许可 }
经过这篇文章基本将AQS队列的实现过程作了比较清晰的分析,主要是基于非公平锁的独占锁实现。在得到同步锁时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或中止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,而后唤醒头节点的后继节点。