线程池的建立以及CyclicBarrier与CountDownLatch的简单使用

1、线程池的简单建立

(1)、使用Executors进行建立

ExecutorService poo1 = Executors.newFixedThreadPool(10);

        ExecutorService pool = Executors.newSingleThreadExecutor();

这两种线程池都是无界队列的线程池,建立比较简单,但可能致使堆积请求处理队列而消耗很是大的内存。html

ExecutorService poo1 = Executors.newCachedThreadPool();

这种可缓存线程池,建立无脑暴力,能够建立Integer.MAX_VALUE个线程,若是处理不当对内存消耗巨大。java

ExecutorService poo1 = Executors.newScheduledThreadPool(10);

这是支持定时及周期性任务执行的一种线程池,不过一样会有上面的问题。面试

(2)、经过ThreadPoolExecutor建立线程池

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("这是线程%d - ").build();

        ExecutorService pool = new ThreadPoolExecutor(3, 6, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(3), threadFactory, new ThreadPoolExecutor.AbortPolicy());

以上是一个简单的实例,表示建立一个核心线程数为3,最大线程数为6,超时0s就进行回收,缓存队列容量为3,并使用AbortPolicy的拒绝策略缓存

AbortPolicy         -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
CallerRunsPolicy    -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,而后将被拒绝的任务添加到等待队列中。
DiscardPolicy       -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

(3)、简单玩一玩这个线程池

public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("这是线程%d - ").build();

        ExecutorService pool = new ThreadPoolExecutor(3, 6, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(3), threadFactory, new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Integer sout = i;
            new Thread(() -> {
                pool.submit(() -> {
                    System.out.println(Thread.currentThread().getName() + sout);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }).start();
        }
    }

打印:

这是线程0 - 0
这是线程1 - 1
这是线程2 - 2
这是线程3 - 6
这是线程4 - 7
这是线程5 - 8
Exception in thread "Thread-9" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@49984548 rejected from java.util.concurrent.ThreadPoolExecutor@475fb20b[Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
这是线程0 - 3
这是线程1 - 4
这是线程2 - 5

能够看出, 最后一个线程(i = 9)被拒绝了,其余的线程都获得了执行,虽然咱们指定核心线程池为3,但仍是先执行了6个线程(最大线程),等待了2秒后,再执行被放入队列中的线程。并发

从上面打印的顺序,咱们也能看出蹊跷,它并非按照0 - 8的顺序来进行打印的,这是由于新的线程进入线程池后,若是超过了核心线程池大小,并不会去直接开启新的线程,而是先放入队列,直到队列满了,才会开启新的线程。工具

咱们把maximumPoolSize从6改成4,会发生什么呢?ui

这是线程0 - 0
这是线程1 - 1
这是线程2 - 2
这是线程3 - 6
Exception in thread "Thread-7" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1abd1e5 rejected from java.util.concurrent.ThreadPoolExecutor@f6c2fa0[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-8" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1a371600 rejected from java.util.concurrent.ThreadPoolExecutor@f6c2fa0[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-9" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@317d0b0e rejected from java.util.concurrent.ThreadPoolExecutor@f6c2fa0[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
这是线程0 - 3
这是线程1 - 4
这是线程2 - 5

很显然,只有7个线程获得了执行。线程

再玩玩拒绝策略,好比 CallerRunsPolicycode

ExecutorService pool = new ThreadPoolExecutor(3, 4, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(3), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

打印:

这是线程0 - 0
这是线程1 - 1
这是线程2 - 2
这是线程3 - 6
Thread-77
Thread-88
Thread-99
这是线程0 - 3
这是线程1 - 4
这是线程2 - 5

上面被拒绝的三个线程,扔回给了它的调用方执行。orm

再好比 DiscardOldestPolicy 拒绝策略

这是线程0 - 0
这是线程1 - 1
这是线程2 - 2
这是线程3 - 6
这是线程0 - 7
这是线程1 - 8
这是线程2 - 9

直接将等待队列最末尾的任务放弃掉了,也就是先进入队列的345任务,被线程池放弃了。

最后一种 DiscardPolicy 就不赘述了,直接放弃被拒绝的任务。

2、简单用用 CountDownLatch

正如每一个Java文档所描述的那样,CountDownLatch是一个同步工具类,它容许一个或多个线程一直等待,直到其余线程的操做执行完后再执行。在Java并发中,countdownlatch的概念是一个常见的面试题,因此必定要确保你很好的理解了它。

CountDownLatch countDownLatch = new CountDownLatch(2);

        pool.submit(new Thread(() -> {
            try {
                sout("进入线程");
                countDownLatch.await();
                sout("继续执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        pool.submit(new Thread(() -> {
            try {
                Thread.sleep(7000);
                sout("进入线程");
                countDownLatch.await();
                sout("继续执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        for (int i = 0; i < 2; i++) {
            pool.submit(new Thread(() -> {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
                sout("countDown");
            }));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        pool.shutdown();

这里有四个线程,一个 须要同步的线程 先开始执行,五秒后,countDown,七秒后,另外一个 须要同步的线程 也开始执行,十秒后,再次countDown。

这是线程0 - 进入线程
这是线程2 - countDown
这是线程1 - 进入线程
这是线程3 - countDown
这是线程0 - 继续执行
这是线程1 - 继续执行

在线程3进行countDown后,线程0和线程1会从等待中恢复,并执行任务。

3、简单用用 CyclicBarrier

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。

CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        pool.submit(new Thread(() -> {
            try {
                sout("进入线程");
                try {
                    cyclicBarrier.await();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                sout("继续执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        pool.submit(new Thread(() -> {
            try {
                Thread.sleep(7000);
                sout("进入线程");
                try {
                    cyclicBarrier.await();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                sout("继续执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

CyclicBarrier的使用比CountDownLatch更加简单,它只须要简单使用cyclicBarrier.await();就能够了。

相关文章
相关标签/搜索