初版地址:https://blog.csdn.net/wandou9527/article/details/107769598java
第二版优化点:多线程
java.util.concurrent.Executor
接口,更加符合规范优化后又离完美线程池近了一步。并发
若是想玩转 Java 的多线程与高并发,线程池是你永远也绕不过的山。既然绕不过,咱们就啃他,吃透线程池,玩转高并发。
阅读Jdk线程池源码发现,Jdk里的线程池实现的很是完善,有不少复杂的逻辑处理,因此形成代码较长,并且代码格式也不规范(ps. 请原谅我指点江山,人家可能有人家的实际缘由),eg. if 后没有大括号;不少变量命名都是单字母,好比c、w等。。。ide
本文,精简了线程池的一些复杂逻辑,从主干功能出发,实现主干功能,我相信更有助于咱们理解线程池,而后再一步步深刻。高并发
private volatile int corePoolSize; //核心线程数 private volatile int maximumPoolSize; //最大线程数 private volatile long keepAliveTime; //存活时间 private final BlockingQueue<Runnable> workQueue; //任务等待队列 private volatile ThreadFactory threadFactory; //线程工厂 private volatile RejectedExecutionHandler handler; //拒绝策略
下面介绍一下线程池执行任务的流程,理解各个属性的意义。当一个线程池初始化,向线程池提交任务,线程池新建线程执行任务,随着线程建立,线程数逐渐增多,当达到 corePoolSize
线程池将再也不新建线程,而是将任务放入任务等待队列 workQueue
。再持续向线程池提交任务,当等待队列满了,这时会继续新建线程,直到到达最大线程数 maximumPoolSize
,若是还继续有任务到来,线程池没法处理,这时就启动拒绝策略。 post
这个过程咱们能够以生活中的例子比喻一下。大体咱们把线程池理解为理发店。那么流程就是:来了顾客开始理发,好比只有4个理发师4个座位,至关于核心线程。那么来了过多的顾客,理发师忙不过来就会先让你去等候区稍等排队等待,前面有理完发的会叫你,至关于等待队列。等待区满了呢?现实中理发店确定不会拒绝顾客的啊,他可能让你先在外面等。但若是等待区天天都爆满,那么老板可能会考虑扩大店面,扩充理发师团队了。因此,这只是个大体的比喻。测试
废话不说,咱们上代码。优化
package com.wandou.demo.thread.post.threapool; import java.util.HashSet; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; /** * @author liming * @date 2020/09 * @description 自定义线程池 * new功能 * - 线程延迟建立(来任务才建立) * 欠缺功能: * - 队列满了后继续建立线程,直到达到最大线程数 * - 拒绝策略 * - 线程超时销毁 */ public class MyThreadPool implements Executor { /** * 核心线程数(核心理发师数量) */ private volatile int corePoolSize; /** * 任务等待队列(等待区座位数) */ private final BlockingQueue<Runnable> workQueue; /** * 线程容器(理发师做业区) */ private final HashSet<MyThreadPool.Worker> workers = new HashSet<>(); private final AtomicInteger workerCount = new AtomicInteger(0); public MyThreadPool(int corePoolSize, BlockingQueue<Runnable> workQueue) { this.corePoolSize = corePoolSize; this.workQueue = workQueue; } /** * 运行,来顾客了,安排 * * @param command * @return */ @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int count = workerCount.get(); // 若是小于核心线程数,能够创建新线程 if (count < corePoolSize) { if (addWorker(command, true)) { return; } } if (workQueue.offer(command)) { int recheck = workerCount.get(); if (recheck == 0) { addWorker(null, false); } return; } //拒绝 throw new RejectedExecutionException("Task " + command.toString() + " rejected from " + this.toString()); } private boolean addWorker(Runnable command, boolean core) { for (; ; ) { int count = workerCount.get(); if (count >= corePoolSize) { return false; } // 线程数+1, 成功向下走 if (workerCount.compareAndSet(count, count + 1)) { break; } } Worker worker = new Worker(command); final Thread thread = worker.thread; workers.add(worker); thread.start(); return true; } // ---------------- /** * 线程(理发师) */ private class Worker implements Runnable { /** * 工做者运行的线程 */ final Thread thread; /** * 初始运行的任务,可能为 null */ Runnable firstTask; /** * 建立一个工做者 * * @param firstTask 第一个任务,能够为 null */ Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = new Thread(this); } @Override public void run() { System.out.println("run"); runWorker(this); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Worker worker = (Worker) o; return Objects.equals(thread, worker.thread) && Objects.equals(firstTask, worker.firstTask); } @Override public int hashCode() { return Objects.hash(thread, firstTask); } } final void runWorker(Worker worker) { Runnable task = worker.firstTask; worker.firstTask = null; while (task != null || (task = getTask()) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { // 运行完毕 task = null; } } } private Runnable getTask() { for (; ; ) { try { //阻塞拿任务 return workQueue.take(); } catch (InterruptedException e) { System.out.println("InterruptedException!!!"); } } } }
测试代码:this
package com.wandou.demo.thread.post.threapool; import org.junit.Test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * @author liming * @date 2020/9/17 * @description */ public class MyThreadPoolDemo { private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); private MyThreadPool myThreadPool = new MyThreadPool(3, workQueue); @Test public void t1() throws Exception { AtomicInteger atomicInteger = new AtomicInteger(0); Runnable task = new Runnable() { @Override public void run() { try { System.out.println(atomicInteger.incrementAndGet() + "号顾客来理发,为其理发的理发师是:" + Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } } }; for (int i = 0; i < 30; i++) { myThreadPool.execute(task); } Thread.sleep(5000); System.out.println("================================================"); for (int i = 0; i < 30; i++) { myThreadPool.execute(task); } //让主线程阻塞等待 System.in.read(); } }
测试结果:atom