领统Java并发半壁江山的AQS你真的懂了吗?

领统Java并发半壁江山的AQS你真的懂了吗?

1、JUC的由来

synchronized 关键字是JDK官方人员用C++代码写的,在JDK6之前是重量级锁。Java大牛 Doug Lea对 synchronized 在并发编程条件下的性能表现不满意就本身写了个JUC,以此来提高并发性能,本文要讲的就是JUC并发包下的AbstractQueuedSynchronizernode

在JUC中 CountDownLatch、ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock 等底层用的都是AQS,AQS几乎占据了JUC并发包里的半壁江山,若是想要获取锁能够被中断、超时获取锁、尝试获取锁那就用AQS吧。编程

**2、AQS前置知识点

2.一、模板方法设计模式

AbstractQueuedSynchronizer是个抽象类,全部用到方法的类都要继承此类的若干方法,对应的设计模式就是模版模式安全

模版模式定义:一个抽象类公开定义了执行它的方法的方式/模板。它的子类能够按须要重写方法实现,但调用将以抽象类中定义的方式进行。这种类型的设计模式属于行为型模式。多线程

抽象类:并发

public abstract class SendCustom {
 public abstract void to();
 public abstract void from();
 public void date() {
  System.out.println(new Date());
 }
 public abstract void send();
 // 注意此处 框架方法-模板方法
 public void sendMessage() {
  to();
  from();
  date();
  send();
 }
}

模板方法派生类:框架

public class SendSms extends SendCustom {

 @Override
 public void to() {
  System.out.println("sowhat");
 }

 @Override
 public void from() {
  System.out.println("xiaomai");
 }

 @Override
 public void send() {
  System.out.println("Send message");
 }

 public static void main(String[] args) {
  SendCustom sendC = new SendSms();
  sendC.sendMessage();
 }
}

2.二、LookSupportide

LockSupport 是一个线程阻塞工具类,全部的方法都是静态方法,可让线程在任意位置阻塞,固然阻塞以后确定得有唤醒的方法。经常使用方法以下:函数

public static void park(Object blocker); // 暂停当前线程
public static void parkNanos(Object blocker, long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(Object blocker, long deadline); // 暂停当前线程,直到某个时间
public static void park(); // 无期限暂停当前线程
public static void parkNanos(long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(long deadline); // 暂停当前线程,直到某个时间
public static void unpark(Thread thread); // 恢复当前线程
public static Object getBlocker(Thread t);

park是由于park英文意思为停车。咱们若是把Thread当作一辆车的话,park就是让车停下,unpark就是让车启动而后跑起来。高并发

与Object类的wait/notify机制相比,park/unpark有两个优势:

1.以thread为操做对象更符合阻塞线程的直观定义
2.操做更精准,能够准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll 唤醒全部等待的线程),增长了灵活性。

park/unpark调用的是 Unsafe(提供CAS操做) 中的 native代码。

park/unpark 功能在Linux系统下是用的Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。mutexcondition保护了一个 _counter 的变量,当 park 时,这个变量被设置为0。当unpark时,这个变量被设置为1。

2.三、CAS

CAS 是 CPU指令级别实现了原子性的比较和交换(Conmpare And Swap)操做,注意CAS不是锁只是CPU提供的一个原子性操做指令。

领统Java并发半壁江山的AQS你真的懂了吗?

CAS在语言层面不进行任何处理,直接将原则操做实如今硬件级别实现,之因此能够实现硬件级别的操做核心是由于CAS操做类中有个核心类UnSafe类。

关于CAS引起的ABA问题、性能开销问题、只能保证一个共享变量之间的原则性操做问题,之前 CAS 中写过,在此再也不重复讲解。

注意:并非说 CAS 必定比SYN好,若是高并发执行时间久 ,用SYN好, 由于SYN底层用了wait() 阻塞后是不消耗CPU资源的。若是锁竞争不激烈说明自旋不严重,此时用CAS。

3、AQS重要方法

模版方法分为独占式共享式,子类根据须要不一样调用不一样的模版方法(讲解有点多,想看底层可直接下滑到第四章节)。

3.1 模板方法

3.1.1 独占式获取

3.1.1.1 accquire

不可中断获取锁accquire是获取独占锁方法,acquire尝试获取资源,成功则直接返回,不成功则进入等待队列,这个过程不会被线程中断,被外部中断也不响应,获取资源后才再进行自我中断selfInterrupt()

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

1.acquire(arg) tryAcquire(arg) 顾名思义,它就是尝试获取锁,须要咱们本身实现具体细节,通常要求是:

若是该锁没有被另外一个线程保持,则获取该锁并当即返回,将锁的保持计数设置为 1。

若是当前线程已经保持该锁,则将保持计数加 1,而且该方法当即返回。

若是该锁被另外一个线程保持,则出于线程调度的目的,禁用当前线程,而且在得到锁以前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。

2.addWaiter(Node.EXCLUSIVE)

主要功能是 一旦尝试获取锁未成功,就要使用该方法将其加入同步队列尾部,因为可能有多个线程并发加入队尾产生竞争,所以采用compareAndSetTail锁方法来保证同步

3.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

一旦加入同步队列,就须要使用该方法,自旋阻塞 唤醒来不断的尝试获取锁,直到被中断或获取到锁。

3.1.1.2 acquireInterruptibly

可中断获取锁acquireInterruptibly相比于acquire支持响应中断。

一、若是当前线程未被中断,则尝试获取锁。

二、若是锁空闲则获锁并当即返回,state = 1。

三、若是当前线程已持此锁,state + 1,而且该方法当即返回。

四、若是锁被另外一个线程保持,出于线程调度目的,禁用当前线程,线程休眠ing,除非锁由当前线程得到或者当前线程被中断了,中断后会抛出InterruptedException,而且清除当前线程的已中断状态。

五、此方法是一个显式中断点,因此要优先考虑响应中断。

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
     throw new InterruptedException(); // acquireInterruptibly 选择
      interrupted = true; // acquire 的选择

3.1.1.3 tryAcquireNanos

该方法能够被中断,增长了超时则失败的功能。能够说该方法的实现与上述两方法没有任何区别。时间功能上就是用的标准超时功能,若是剩余时间小于0那么acquire失败,若是该时间大于一次自旋锁时间(spinForTimeoutThreshold = 1000),而且能够被阻塞,那么调用LockSupport.parkNanos方法阻塞线程。

doAcquireNanos内部:

nanosTimeout = deadline - System.nanoTime();
  if (nanosTimeout <= 0L)
      return false;
  if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
      LockSupport.parkNanos(this, nanosTimeout);
  if (Thread.interrupted())
      throw new InterruptedException();

该方法通常会有如下几种状况产生:

1.在指定时间内,线程获取到锁,返回true。
2.当前线程在超时时间内被中断,抛中断异常后,线程退出。
3.到截止时间后线程仍未获取到锁,此时线程得到锁失败,再也不等待直接返回false。

3.1.2 共享式获取

3.1.2.1 acquireShared

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

该模版方法的工做:

1.调用tryAcquireShared(arg) 尝试得到资源,返回值表明以下含义:

负数表示失败。

0 表示成功,但没有剩余可用资源。

正数表示成功,且有剩余资源。

doAcquireShared做用:

建立节点而后加入到队列中去,这一块和独占模式下的 addWaiter 代码差很少,不一样的是结点的模式是SHARED,在独占模式 EXCLUSIVE。

3.1.2.2 acquireSharedInterruptibly

无非就是可中断性的共享方法

public final void acquireSharedInterruptibly(long arg)  throws InterruptedException {
    if (Thread.interrupted()) // 若是线程被中断,则抛出异常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)  
        // 若是tryAcquireShared()方法获取失败,则调用以下的方法
        doAcquireSharedInterruptibly(arg);
}

3.1.2.3. tryAcquireSharedNanos

尝试以共享模式获取,若是被中断则停止,若是超过给定超时期则失败。实现此方法首先要检查中断状态,而后至少调用一次 tryacquireshared(long),并在成功时返回。不然,在成功、线程中断或超过超时期以前,线程将加入队列,可能反复处于阻塞或未阻塞状态,并一直调用 tryacquireshared(long)

public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

3.1.3 独占式释放

独占锁的释放调用unlock方法,而该方法实际调用了AQS的release方法,这段代码逻辑比较简单,若是同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,而且该节点的状态值不为0的话才会执行unparkSuccessor()方法。

public final boolean release(long arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

3.1.4 共享式释放

releaseShared首先去尝试释放资源tryReleaseShared(arg),若是释放成功了,就表明有资源空闲出来,那么就用doReleaseShared()去唤醒后续结点。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

好比CountDownLatch的countDown()具体实现:

public void countDown() {
        sync.releaseShared(1);
    }

3.2 子类需实现方法

子类要实现父类方法也分为独占式共享式

3.2.1 独占式获取

tryAcquire 顾名思义,就是尝试获取锁,AQS在这里没有对其进行功能的实现,只有一个抛出异常的语句,咱们须要本身对其进行实现,能够对其重写实现公平锁、不公平锁、可重入锁、不可重入锁

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}

3.2.2 独占式释放

tryRelease 尝试释放 独占锁,须要子类实现。

protected boolean tryRelease(long arg) {
        throw new UnsupportedOperationException();
    }

3.2.3 共享式获取

tryAcquireShared 尝试进行共享锁的得到,须要子类实现。

protected long tryAcquireShared(long arg) {
        throw new UnsupportedOperationException();
    }

3.2.4 共享式释放

tryReleaseShared尝试进行共享锁的释放,须要子类实现。

protected boolean tryReleaseShared(long arg) {
        throw new UnsupportedOperationException();
    }

3.3  状态标志位

state由于用 volatile 修饰 保证了咱们操做的可见性,因此任何线程经过getState()得到状态都是能够获得最新值,可是setState()没法保证原子性,所以AQS给咱们提供了compareAndSetState方法利用底层UnSafe的CAS功能来实现原子性。

private volatile long state;

    protected final long getState() {
        return state;
    }

    protected final void setState(long newState) {
        state = newState;
    }

   protected final boolean compareAndSetState(long expect, long update) {
        return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
    }

3.4 查询是否独占模式

isHeldExclusively 该函数的功能是查询当前的工做模式是不是独占模式。须要子类实现。

protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

3.5 自定义实现锁

这里须要重点说明一点,JUC中通常是用一个子类继承自Lock,而后在子类中定义一个内部类来实现AQS的继承跟使用

public class SowhatLock implements Lock
{
 private Sync sync = new Sync();

 @Override
 public void lock()
 {
  sync.acquire(1);
 }

 @Override
 public boolean tryLock()
 {
  return false;
 }

 @Override
 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
 {
  return sync.tryAcquireNanos(1,unit.toNanos(time));
 }

 @Override
 public void unlock()
 {
  sync.release(1);
 }

 @Override
 public Condition newCondition()
 {
  return sync.newCondition();
 }

 @Override
 public void lockInterruptibly() throws InterruptedException
 {
 }

 private class Sync extends AbstractQueuedSynchronizer
 {
  @Override
  protected boolean tryAcquire(int arg)
  {
   assert arg == 1;
   if (compareAndSetState(0, 1))
   {
    setExclusiveOwnerThread(Thread.currentThread());
    return true;
   }
   return false;
  }

  @Override
  protected boolean tryRelease(int arg)
  {
   assert arg == 1;
   if (!isHeldExclusively())
   {
    throw new IllegalMonitorStateException();
   }
   setExclusiveOwnerThread(null);
   setState(0);
   return true;
  }

  @Override
  protected boolean isHeldExclusively()
  {
   return getExclusiveOwnerThread() == Thread.currentThread();
  }

  Condition newCondition() {
   return new ConditionObject();
  }
 }
}

自定义实现类:

public class SoWhatTest
{
 public static int m = 0;
 public  static CountDownLatch latch  = new CountDownLatch(50);
 public static Lock lock = new SowhatLock();

 public static void main(String[] args) throws  Exception
 {
  Thread[] threads = new Thread[50];
  for (int i = 0; i < threads.length ; i++)
  {
   threads[i] = new Thread(()->{
    try{
     lock.lock();
     for (int j = 0; j <100 ; j++)
     {
      m++;
     }
    }finally
    {
     lock.unlock();
    }
    latch.countDown();
  });
  }
  for(Thread t : threads) t.start();
  latch.await();
  System.out.println(m);
 }
}

4、AQS底层

4.1 CLH

CLH(Craig、 Landin、 Hagersten locks三我的名字综合而命名):

1.是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。
2.CLH 锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,若是发现前驱释放了锁就结束自旋。

4.2 Node

CLH 队列由Node对象组成,其中Node是AQS中的内部类。

static final class Node {
 // 标识共享锁
 static final Node SHARED = new Node();
 // 标识独占锁
 static final Node EXCLUSIVE = null;
 // 前驱节点
 volatile Node prev;
 // 后继节点
 volatile Node next;
 // 获取锁失败的线程保存在Node节点中。
 volatile Thread thread;
 // 当咱们调用了Condition后他也有一个等待队列
 Node nextWaiter;
 //在Node节点中通常经过waitStatus得到下面节点不一样的状态,状态对应下方。
 volatile int waitStatus;
 static final int CANCELLED =  1;
 static final int SIGNAL    = -1;
 static final int CONDITION = -2;
 static final int PROPAGATE = -3;

waitStatus 有以下5中状态:

1.CANCELLED = 1

表示当前结点已取消调度。当超时或被中断(响应中断的状况下),会触发变动为此状态,进入该状态后的结点将不会再变化。

2.SIGNAL = -1

表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。

3.CONDITION = -2

表示结点等待在 Condition 上,当其余线程调用了 Condition 的 signal() 方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

4.PROPAGATE = -3

共享模式下,前继结点不只会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

5.INITIAL = 0

新结点入队时的默认状态。

4.3 AQS实现

4.3.1 公平锁和非公平锁

银行售票窗口营业中:

公平排队:每一个客户来了自动在最后面排队,轮到本身办理业务的时候拿出身份证等证件取票。

非公平排队:有个旅客火车立刻开车了,他拿着本身的各类证件着急这想跟窗口工做人员说是否能够加急办理下,能够的话则直接办理,不能够的话则去队尾排队去。

在JUC中一样存在公平锁非公平锁通常非公平锁效率好一些。由于非公平锁状态下打算抢锁的线程不用排队挂起了

4.3.2 AQS细节

AQS内部维护着一个FIFO的队列,即CLH队列,提供先来先服务的公平性。AQS的同步机制就是依靠CLH队列实现的。CLH队列是FIFO的双端双向链表队列(方便尾部节点插入)。线程经过AQS获取锁失败,就会将线程封装成一个Node节点,经过CAS原子操做插入队列尾。当有线程释放锁时,会尝试让队头的next节点占用锁,我的理解AQS具备以下几个特色:

1.在AQS 同步队列中 -1 表示线程在睡眠状态
2.当前Node节点线程会把前一个Node.ws = -1。当前节点把前面节点ws设置为-1,你能够理解为:你本身能知道本身睡着了吗?只能是别人看到了发现你睡眠了
3.持有锁的线程永远不在队列中
4.在AQS队列中第二个才是最早排队的线程
5.若是是交替型任务或者单线程任务,即便用了Lock也不会涉及到AQS 队列
6.不到万不得已不要轻易park线程,很耗时的!因此排队的头线程会自旋的尝试几个获取锁

4.4 加锁跟解锁流程图

以最经典的 ReentrantLock 为例逐步分析下 lock 跟 unlock 底层流程图(要原图的话公众号回复:lock)。

private Lock lock = new ReentrantLock();
public void test(){
    lock.lock();
    try{
        doSomeThing();
    }catch (Exception e){
      ...
    }finally {
        lock.unlock();
    }
}

领统Java并发半壁江山的AQS你真的懂了吗?

4.4.1 独占式加入同步队列

同步器AQS中包含两个节点类型的引用:一个指向头结点的引用(head),一个指向尾节点的引用(tail),若是加入的节点是OK的则会直接运行该节点,当若干个线程抢锁失败了那么就会抢着加入到同步队列的尾部,由于是抢着加入这个时候用CAS来设置尾部节点。入口代码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1.tryAcquire

该方法是须要自我实现的,在上面的demo中可见一斑,就是返回是否得到了锁。

protected final boolean tryAcquire(int acquires) {
     final Thread current = Thread.currentThread();
     int c = getState();
     if (c == 0) {
         //  是否须要加入队列,不须要的话则尝试CAS得到锁,得到成功后 设置当前锁的拥有者
         if (!hasQueuedPredecessors() &&
             compareAndSetState(0, acquires)) {
             setExclusiveOwnerThread(current);
             return true;
         }
     }
     else if (current == getExclusiveOwnerThread()) {
         // 这就是可重入锁的实现  
         int nextc = c + acquires;
         if (nextc < 0)
             throw new Error("Maximum lock count exceeded");
         setState(nextc);
         return true;
     }
     return false;
 }

2.addWaiter(Node.EXCLUSIVE,arg)

/**
 * 若是尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),经过addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。
 */
 private Node addWaiter(Node mode) {
     // 用当前线程构造一个Node对象,mode是一个表示Node类型的字段,或者说是这个节点是独占的仍是共享的
     Node node = new Node(Thread.currentThread(), mode);
     // 将目前队列中尾部节点给pred
     Node pred = tail;
     // 队列不为空的时候
     if (pred != null) {
         node.prev = pred;
         // 先尝试经过AQS方式修改尾节点为最新的节点,若是修改失败,意味着有并发,
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     //第一次尝试添加尾部失败说明有并发,此时进入自旋
     enq(node);
     return node;
 }

3.自旋enq 方法将并发添加节点的请求经过CAS跟自旋将尾节点的添加变得串行化起来。说白了就是让节点放到正确的队尾位置。

/**
* 这里进行了循环,若是此时存在了tail就执行同上一步骤的添加队尾操做,若是依然不存在,
* 就把当前线程做为head结点。插入节点后,调用acquireQueued()进行阻塞
*/
private Node enq(final Node node) {
   for (;;) {
       Node t = tail;
       if (t == null) { // Must initialize
           if (compareAndSetHead(new Node()))
               tail = head;
       } else {
           node.prev = t;
           if (compareAndSetTail(t, node)) {
               t.next = node;
               return t;
           }
       }
   }
}

4.acquireQueued 是当前Node节点线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取锁,缘由是:

1.头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查本身的前驱节点是否为头结点。
2.维护同步队列的FIFO原则,节点进入同步队列以后,会尝试自旋几回。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋检查当前节点的前驱节点是否为头结点,才能获取锁
        for (;;) {
            // 获取节点的前驱节点
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
            // 节点中的线程循环的检查,本身的前驱节点是否为头节点
            // 只有当前节点 前驱节点是头节点才会 再次调用咱们实现的方法tryAcquire
            // 接下来无非就是将当前节点设置为头结点,移除以前的头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 不然检查前一个节点的状态,看当前获取锁失败的线程是否要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
           //若是须要挂起,借助JUC包下面的LockSupport类的静态方法park挂起当前线程,直到被唤醒
                parkAndCheckInterrupt())
                interrupted = true; // 两个判断都是true说明 则置true
        }
    } finally {
        //若是等待过程当中没有成功获取资源(如timeout,或者可中断的状况下被中断了),那么取消结点在队列中的等待。
        if (failed)
           //取消请求,将当前节点从队列中移除
            cancelAcquire(node);
    }
}

若是成功就返回,不然就执行shouldParkAfterFailedAcquireparkAndCheckInterrupt来达到阻塞效果。

5.shouldParkAfterFailedAcquire 第二步的addWaiter()构造的新节点,waitStatus的默认值是0。此时,会进入最后一个if判断,CAS设置pred.waitStatus SIGNAL,最后返回false。因为返回false,第四步的acquireQueued会继续进行循环。假设node的前继节点pred仍然不是头结点或锁获取失败,则会再次进入shouldParkAfterFailedAcquire()。上一轮循环中已经将pred.waitStatu = -1了,则此次会进入第一个判断条件,直接返回true,表示应该阻塞调用parkAndCheckInterrupt

那么何时会遇到ws &gt; 0呢?当pred所维护的获取请求被取消时(也就是node.waitStatus = CANCELLED,这时就会循环移除全部被取消的前继节点pred,直到找到未被取消的pred。移除全部被取消的前继节点后,直接返回false。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       int ws = pred.waitStatus; // 得到前驱节点的状态
       if (ws == Node.SIGNAL) //此处是第二次设置
           return true;
       if (ws > 0) {
          do {
               node.prev = pred = pred.prev;
           } while (pred.waitStatus > 0);
           pred.next = node;
       } else {
          //  此处是第一次设置 unsafe级别调用设置
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       return false;
   }

6.parkAndCheckInterrupt 主要任务是暂停当前线程而后查看是否已经暂停了。

private final boolean parkAndCheckInterrupt() {
    // 调用park()使线程进入挂起状态,何时调用了unpark再继续执行下面
    LockSupport.park(this); 
    // 若是被唤醒,查看本身是否是已经被中断了。
    return Thread.interrupted();
}

7.cancelAcquireacquireQueued方法的finally会判断 failed值,正常运行时候自旋出来的时候会是false,若是中断或者timeout了 则会是true,执行cancelAcquire,其中核心代码是node.waitStatus = Node.CANCELLED

8.selfInterrupt

static void selfInterrupt() {
      Thread.currentThread().interrupt();
  }

4.4.2 独占式释放队列头节点

release()会调用tryRelease方法尝试释放当前线程持有的锁,成功的话唤醒后继线程,并返回true,不然直接返回false。

public final boolean release(long arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

1.tryRelease 这个是子类须要自我实现的,没啥说的根据业务须要实现。
2.unparkSuccessor 唤醒头结点的后继节点。

private void unparkSuccessor(Node node) {
   int ws = node.waitStatus; // 得到头节点状态
    if (ws < 0) //若是头节点装小于0 则将其置为0
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next; //这个是新的头节点
    if (s == null || s.waitStatus > 0) { 
    // 若是新头节点不知足要求
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
        //从队列尾部开始往前去找最前面的一个waitStatus小于0的节点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)//唤醒后继节点对应的线程
        LockSupport.unpark(s.thread);
}

4.4.3 AQS 中增长跟删除形象图

领统Java并发半壁江山的AQS你真的懂了吗?

5、CountDownLatch底层

5.1 共享锁 CountDownLatch底层

CountDownLatch 虽然相对简单,但也实现了共享锁模型。可是如何正确的吹逼 CountDownLatch  呢?若是在理解了上述流程的基础上,从CountDownLatch入手来看AQS 中关于共享锁的代码还比较好看懂,在看的时候能够 以看懂大体内容为主,学习其设计的思路,不要陷入全部条件处理细节中,多线程环境中,对与错有时候不是那么容易看出来的。我的追源码绘制了以下图:

领统Java并发半壁江山的AQS你真的懂了吗?

5.2 计数信号量Semaphore

Semaphore 这就是共享锁的一个实现类,在初始化的时候就规定了共享锁池的大小N,有一个线程得到了锁,可用数就减小1个。有一个线程释放锁可用数就增长1个。若是有 >=2 的线程同时释放锁,则此时有多个锁可用。这个时候就能够 同时唤醒 两个锁setHeadAndPropagate (流程图懒的绘制了)。

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
 }
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //找先驱结点
            final Node p = node.predecessor();
            if (p == head) {
                 // 尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 设置当前结点为头结点,而后去唤醒后续结点。注意传播性 唤醒!
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC  释放头结点,等待GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;//获取到资源
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)//若是最后没有获取到资源,则cancel
            cancelAcquire(node);
    }
}

5.3 ReentrantReadWriteLock

领统Java并发半壁江山的AQS你真的懂了吗?

在 ReentrantReadWriteLock 类中也是只有一个32位的int state来表示读锁跟写锁,如何实现的?

1.后16位用来保存独享的写锁个数,第一次得到就是01,第二次重入就是10了,这样的方式来保存。
2.可是多个线程均可以得到读锁,而且每一个线程可能读屡次,如何保存?咱们用前16位来保存有多少个线程得到了读锁
3.每一个读锁线程得到的重入读锁个数 由内部类HoldCounter与读锁配套使用。

6、Condition

synchronized 可用 wait() 和 notify()/notifyAll() 方法相结合能够实现等待/通知模式。Lock 也提供了 Condition 来提供相似的功能。

Condition是JDK5后引入的Interface,它用来替代传统的Object的wait()/notify()实现线程间的协做,相比使用Object的wait()/notify(),使用Conditionawait()/signal()这种方式 实现线程间协做更加安全和高效。简单说,他的做用是使得某些线程一块儿等待某个条件(Condition),只有当该条件具有(signal 或者 signalAll方法被调用)时,这些等待线程才会被唤醒,从而从新争夺锁。wait()/notify()这些都更倾向于底层的实现开发,而Condition接口更倾向于代码实现的等待通知效果。二者之间的区别与共通点以下:

领统Java并发半壁江山的AQS你真的懂了吗?

6.1 条件等待队列

条件等待队列,指的是 Condition 内部本身维护的一个队列,不一样于 AQS 的 同步等待队列。它具备如下特色:

要加入条件等待队列的节点,不能在 同步等待队列。

从 条件等待队列 移除的节点,会进入同步等待队列。

一个锁对象只能有一个同步等待队列,但能够有多个条件等待队列。

这里以 AbstractQueuedSynchronizer 的内部类 ConditionObject 为例(Condition 的实现类)来分析下它的具体实现过程。首先来看该类内部定义的几个成员变量:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

它采用了 AQS 的 Node 节点构造(前面说过Node类有nextWaiter属性),并定义了两个成员变量:firstWaiterlastWaiter 。说明在 ConditionObject 内部也维护着一个本身的单向等待队列。目前可知它的结构以下:

领统Java并发半壁江山的AQS你真的懂了吗?

6.2 await、signal

好比有线程 一、2竞争锁,下面来讲下具体过程 线程1:

一、线程1 调用 reentrantLock.lock时,持有锁。

二、线程1 调用 await 方法,进入条件等待队列 ,同时释放锁。

三、线程1 获取到线程2 signal 信号,从条件等待队列进入同步等待队列。

线程2:

一、线程2 调用 reentrantLock.lock时,因为锁被线程1 持有,进入同步等待队列 。

二、因为线程1 释放锁,线程2 从同步等待队列 移除,获取到锁。

三、线程2 调用 signal 方法,致使线程 1 被唤醒。线程2 调用unlock ,线程1 获取锁后继续下走。

6.2.1 await

当咱们看await、signal 的源码时候不要认为等待队列跟同步队列是彻底分开的,其实我的感受底层源码是有点 HashMap 中的红黑树跟双向链表的意思。

当调用await方法时候,说明当前任务队列的头节点拿着锁呢,此时要把该Thread从任务队列挪到等待队列再唤醒任务队列最前面排队的运行任务,如图:

领统Java并发半壁江山的AQS你真的懂了吗?

  1. thread 表示节点存放的线程。
  2. waitStatus 表示节点等待状态。条件等待队列中的节点等待状态都是 CONDITION,不然会被清除。
  3. nextWaiter 表示后指针。

6.2.2 signal

当咱们调用signal方法的时候,咱们要将等待队列中的头节点移出来,让其去抢锁,若是是公平模式就要去排队了,流程如图:

领统Java并发半壁江山的AQS你真的懂了吗?

上面只是形象流程图,若是从代码级别看的话大体流程以下:

领统Java并发半壁江山的AQS你真的懂了吗?

6.2.3 signalAll

signalAll 与 signal 方法的区别体如今 doSignalAll 方法上,前面咱们已经知道doSignal方法只会对等待队列的头节点进行操做,doSignalAll方法只不过将等待队列中的每个节点都移入到同步队列中,即通知当前调用condition.await()方法的每个线程:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null); // 循环
}

6.3 End

一个 Condition 对象就有一个单项的等待任务队列。在一个多线程任务中咱们能够new出多个等待任务队列。好比咱们new出来两个等待队列。

private Lock lock = new ReentrantLock();
 private Condition FirstCond = lock.newCondition();
 private Condition SecondCond = lock.newCondition();

因此真正的AQS任务中通常是一个任务队列N个等待队列的,所以咱们尽可能调用signal而少用signalAll,由于在指定的实例化等待队列中只有一个能够拿到锁的。

Synchronized 中的 wait 跟 notify 底层代码的等待队列只有一个,多个线程调用wait的时候咱们是没法知道头节点是那个具体线程的。所以只能notifyAll

写在最后

欢迎你们关注个人公众号【风平浪静如码】,海量Java相关文章,学习资料都会在里面更新,整理的资料也会放在里面。

以为写的还不错的就点个赞,加个关注呗!点关注,不迷路,持续更新!!!

相关文章
相关标签/搜索