AQS源码解读 二

前面已经讲解了AQS源码的独享模式,今天来讲一下AQS的共享模式

 

下面以CountDownLatch去讲解AQS的共享模式

 

首先讲下什么是CountDownLatch,CountDownLatch所描述的是”在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待“。在API中是这么说的:

用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

先看CountDownLatch的例子

 

public  static  void  main(String[] args) {
     final  CountDownLatch latch =  new  CountDownLatch( 2 );
  
     new  Thread(){
         public  void  run() {
             try  {
                 System.out.println( "线程1执行" );
                 Thread.sleep( 5000 );
                 latch.countDown();
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
         };
     }.start();
  
     new  Thread(){
         public  void  run() {
             try  {
                 System.out.println( "线程2执行" );
                 Thread.sleep( 3000 );
                 latch.countDown();
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
         };
     }.start();
  
     new  Thread(){
         public  void  run() {
             try  {
                 System.out.println( "线程3阻塞" );
                 latch.await();
                 System.out.println( "线程3继续执行" );
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
         };
     }.start();
  
     try  {
         Thread.sleep( 1000 );
         System.out.println( "主线程线程阻塞" );
         latch.await();
     catch  (InterruptedException e) {
         e.printStackTrace();
     }
     System.out.println( "主线程继续执行" );
  
}

 

线程3 和主线程会加入到队列中

 

 node1会判断前序节点是否是头结点,如果是前序节点是头节点 但是计数器不为0 则阻塞自己 并将waitstatus状态改为-1 即SIGNAL

 node2 会判断当前节点是否为头结点,前序节点不是头结点 直接阻塞自己 并将waitstatus状态改为-1

 

如果计数器为零,就会把node1给唤醒,唤醒后 node1将自己的节点设置为头结点 并将节点waitstatus状态设置为 -3 PROPAGATE

然后继续执行for循环 这时候node2的前序节点是头结点,然后继续将节点node2设置为头结点,并将节点waitstatus状态设置为-3 即PROPAGATE

 

 

 

接着看CountDownLatch的源码

public  class  CountDownLatch {
     /**
  * Synchronization control For CountDownLatch.
  * Uses AQS state to represent count.
  */
  private  static  final  class  Sync  extends  AbstractQueuedSynchronizer {
         private  static  final  long  serialVersionUID = 4982264981922014374L;
 
         Sync( int  count) {
             setState(count);
         }
 
         int  getCount() {
             return  getState();
         }
 
         protected  int  tryAcquireShared( int  acquires) {
             return  (getState() ==  0 ) ?  1  : - 1 ;
         }
 
         protected  boolean  tryReleaseShared( int  releases) {
             // Decrement count; signal when transition to zero
  for  (;;) {
                 int  c = getState();
                 if  (c ==  0 )
                     return  false ;
                 int  nextc = c- 1 ;
                 if  (compareAndSetState(c, nextc))
                     return  nextc ==  0 ;
             }
         }
     }
 
     private  final  Sync sync;
 
//构造一个用给定计数初始化的 CountDownLatch
  public  CountDownLatch( int  count) {
         if  (count <  0 throw  new  IllegalArgumentException( "count < 0" );
         this .sync =  new  Sync(count);
     }
}
 
 
 
 
public  void  await()  throws  InterruptedException {
     sync.acquireSharedInterruptibly( 1 );
}
 
 
public  boolean  await( long  timeout, TimeUnit unit)
     throws  InterruptedException {
     return  sync.tryAcquireSharedNanos( 1 , unit.toNanos(timeout));
}
 
 
public  void  countDown() {
     sync.releaseShared( 1 );
}
}
 

 

可以看出CountDownLatch内部依赖Sync实现,

Sync继承AQS。CountDownLatch仅提供了一个构造方法:

CountDownLatch(int count) : 构造一个用给定计数初始化的 CountDownLatch 设置count

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
}
Sync(int count) {
    setState(count);
}

设置state是count

 

看countDown方法

public  void  countDown() {
     sync.releaseShared( 1 );
}
public  final  boolean  releaseShared( int  arg) {
     if  (tryReleaseShared(arg)) { //如果此线程是被等待线程里最后一个被释放的线程 就去通知同步等待队列里的节点
         doReleaseShared();
         return  true ;
     }
     return  false ;
}

 

再看tryReleaseShared方法 

protected  boolean  tryReleaseShared( int  releases) {
         // Decrement count; signal when transition to zero
  for  (;;) {
             int  c = getState(); //获取计数器的值
             if  (c ==  0 )
                 return  false ;
             int  nextc = c- 1 ; //每个被等待的线程执行完计数器减1
             if  (compareAndSetState(c, nextc)) //设置计数器的新值
                 return  nextc ==  0 ; //如果计数器为0 返回true
         }
     }
}

 

 

再看doReleaseShared方法

private  void  doReleaseShared() {
     /*
  * Ensure that a release propagates, even if there are other
  * in-progress acquires/releases. This proceeds in the usual
  * way of trying to unparkSuccessor of head if it needs
  * signal. But if it does not, status is set to PROPAGATE to
  * ensure that upon release, propagation continues.
  * Additionally, we must loop in case a new node is added
  * while we are doing this. Also, unlike other uses of
  * unparkSuccessor, we need to know if CAS to reset status
  * fails, if so rechecking.
  */
  for  (;;) {
         Node h = head;
         if  (h !=  null  && h != tail) {
             int  ws = h.waitStatus; //如果头结点是-1 (可以看下面wait方法有讲解,已经把头结点设置为-1了 所以会走
//f (ws == Node.SIGNAL) 这一步
 
             if  (ws == Node.SIGNAL) {
                 if  (!compareAndSetWaitStatus(h, Node.SIGNAL,  0 )) //把头结点再设置为0 不成功自旋操作,直到设置成功
                     continue ;        i     // loop to recheck cases
  unparkSuccessor(h); //唤醒节点
             }
             else  if  (ws ==  0  &&
                      !compareAndSetWaitStatus(h,  0 , Node.PROPAGATE))
                 continue ;                 // loop on failed CAS
  }
         if  (h == head)                    // loop if head changed
  break ;
     }
}

 

 
再看unparkSuccessor方法
private  void  unparkSuccessor(Node node) {
     /*
  * If status is negative (i.e., possibly needing signal) try
  * to clear in anticipation of signalling. It is OK if this
  * fails or if status is changed by waiting thread.
  */
  int  ws = node.waitStatus; //因为头结点已经设置为0了,所以ws<0不满足
     if  (ws <  0 )
         compareAndSetWaitStatus(node, ws,  0 );
 
     /*
  * Thread to unpark is held in successor, which is normally
  * just the next node. But if cancelled or apparently null,
  * traverse backwards from tail to find the actual
  * non-cancelled successor.
  */
  Node s = node.next;
     if  (s ==  null  || s.waitStatus >  0 ) { //这一步也不满足,可以看下面wait方法里有讲解 头结点的后续节点的status都是-1
//所以这一步不满足 直接走LockSupport.unpark(s.thread);唤醒头结点的下一个节点
         s =  null ; //如果waitstatus>0说明 节点取消了 就找下一个waitstatus是-1的节点 并唤醒
         for  (Node t = tail; t !=  null  && t != node; t = t.prev)
             if  (t.waitStatus <=  0 )
                 s = t;
     }
     if  (s !=  null )
         LockSupport.unpark(s.thread);
}

 

再看wait方法

public  final  void  acquireSharedInterruptibly( int  arg)
         throws  InterruptedException {
     if  (Thread.interrupted())
         throw  new  InterruptedException();
     if  (tryAcquireShared(arg) <  0 ) //尝试获取锁,获取失败就执行下面的方法
         doAcquireSharedInterruptibly(arg);
}

 

 

看tryAcquireShared方法

protected  int  tryAcquireShared( int  acquires) {
     return  (getState() ==  0 ) ?  1  : - 1 ;
}

 

如果state是0,说明被等待的线程全都执行完了 。return -1说明没有执行完 

再看doAcquireSharedInterruptibly方法

private  void  doAcquireSharedInterruptibly( int  arg)
     throws  InterruptedException {
     final  Node node = addWaiter(Node.SHARED); //如果队列是空的,就新建一个头节点,头节点指向尾节点,
    //然后再新建一个节点放在头节点后面 如果队列不为空,就在尾节点后面新建一个节点。节点是shared类型的
//队列节点的waitStatus默认是0 因为上篇AQS源码一种有讲解,就不讲那么多了
     boolean  failed =  true ;
     try  {
         for  (;;) { //开启自旋
             final  Node p = node.predecessor();
             if  (p == head) { //如果新建节点的前序节点是头节点,而且state的值为0 就走到setHeadAndPropagate方法
 
                 int  r = tryAcquireShared(arg);
                 if  (r >=  0 ) { //如果被等待的线程执行完了
 
                     setHeadAndPropagate(node, r); //把当前节点设置为头节点,而且唤醒后续挂起的节点
                     p.next =  null // help GC
  failed =  false ;
                     return                      p.next =  null // help GC
  failed =  false ;
                     return ;
                 }
             }
             if  (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt()) //如果当前节点的前序节点不是头节点或者计数器不等于0,就阻塞当前节点
                 throw  throw  new  InterruptedException();
         }
     finally  {
         if  (failed)
             cancelAcquire(node);
     }
}

 

再看shouldParkAfterFailedAcquire方法
finally 
相关文章
相关标签/搜索