多线程-AQS-CountDownLatch

介绍:

CountDownLatch--发令枪
        Java1.5以后引入的Java并发工具类,谈到CountDownLatch须要先介绍一个概念:闭锁/门栓[latch]
         latch:一种同步方法,能够延迟线程的进度直到线程到达某个终点状态。闭锁的状态是一次性的,它确保在闭锁打开以前全部特定的活动都须要在闭锁打开以后才能完成。java

        如若掌握了AQS的实现原理,这里的理解将会更加的水到渠成并发

==========================分割线==========================ide

应用场景
        A:如同赛跑,必须等待发令枪响后runner才能起跑同样,在CountDownLatch的计数器归零前,全部引用CountDownLatch闭锁的线程都必须阻塞。总结:准备好了才开始


        B:如同购物,只有全部商品都肯定购买了才好结帐,在全部任务执行完[并进行countDown()]直到归零前,当前任务必须阻塞。总结:准备好了才结束

应用A的样例[准备好了才开始]:工具

import java.util.concurrent.CountDownLatch;

/**
 * Created by ysma on 2018/6/7.
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch cdl = new CountDownLatch(1);
        threadX driverA = new threadX(cdl);
        threadX driverB = new threadX(cdl);
        new Thread(driverA).start();
        new Thread(driverB).start();
        System.out.println("===开始==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());
        cdl.countDown();
        Thread.sleep(10);
        System.out.println("========结束==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());

    }

    static class threadX implements Runnable {
        private CountDownLatch cdl;
        threadX(CountDownLatch cdl){
            this.cdl = cdl;
        }

        @Override
        public void run() {
            try {
                cdl.await();
                System.out.println(Thread.currentThread().getName()
                        +":running.... now:"
                + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

=============执行结果==========
===开始==time:1555310552568==count:1
Thread-0:running.... now:1555310552569
Thread-1:running.... now:1555310552569
========结束==time:1555310552579==count:0

应用B的样例[准备好了才结束]:ui

/**
 * Created by ysma on 2018/6/7.
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch cdl = new CountDownLatch(2);
        threadX driverA = new threadX(cdl);
        threadX driverB = new threadX(cdl);
        new Thread(driverA).start();
        new Thread(driverB).start();
        System.out.println("===开始==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());
        cdl.await();
        System.out.println("========结束==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());

    }

    static class threadX implements Runnable {
        private CountDownLatch cdl;
        threadX(CountDownLatch cdl){
            this.cdl = cdl;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()
                        + ":running.... now:"
                        + System.currentTimeMillis());
                Thread.sleep(10);
                cdl.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
===========执行结果===========
Thread-0:running.... now:1555310728080
Thread-1:running.... now:1555310728080
===开始==time:1555310728080==count:2
========结束==time:1555310728090==count:0

============================分割线==================this

tips

       A:latch.countDown(); 建议放到finally语句里。
       B:对这个计数器的操做都是原子操做,同时只能有一个线程去操做这个计数器。
       C:常与CyclicBarrier伴生讨论,将在后续章节进行讲述。
       D:主要功能简述-
            public CountDownLatch(int count); //指定计数的次数,只能被设置1次[闭锁特性]
            public void countDown();          //计数器减1
            /**await(...)会一直阻塞当前线程,直到计时器的值为0,除非线程被中断*/
            public void await() throws InterruptedException   
            Public boolean await(long timeout, TimeUnit unit) //and 返回false表明计数器超时。
           E:使用了AQS的状态来表明计数count,因此CountDownLatch是基于AQS的实现不是CAS哦
spa

==================分割线=====================线程

源码解读及注释

    

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        /**AQS共享模式加锁 就必须实现tryAcquireShared和tryReleaseShared*/
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;//计数为0时就能够结束并解除阻塞了
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {//循环执行直到cas减一成立或者计数器归零
                int c = getState();
                if (c == 0)//计数器归零 退出
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))//cas减一
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**Constructs a {@code CountDownLatch} initialized with the given count.*/
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**当前线程阻塞知道计数归零或者线程interrupted
     * 最终会由AbstractQueuedSynchronizer中的parkAndCheckInterrupt方法实现阻塞
     * ....LockSupport.park(this);
     * */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**同上,增长了超时,timeout纳秒内计数没有归零返回false*/
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**计数器减1*/
    public void countDown() {
        sync.releaseShared(1);
    }
}
相关文章
相关标签/搜索