如下代码的实现逻辑出自于公众号 码农翻身java
《你管这破玩意叫线程池?》web
- PS:刘欣老师在我心中是软件技术行业的大刘。spring
public interface Executor { public void execute(Runnable r); }
接口中只有一个抽象方法,execute(Runnable r);它接收一个Runnable,无返回值实现它的子类只须要将传入的Runnable执行便可。数组
package com.datang.bingxiang.run; import com.datang.bingxiang.run.intr.Executor; public class NewsThreadExecutor implements Executor { //每次调用都建立一个新的线程 @Override public void execute(Runnable r) { new Thread(r).start(); } }
这个实现类最简单也最明白,真的每次调用咱们都建立一个Thread将参数Runnable执行。这么作的弊端就是每一个调用者发布一个任务都须要建立一个新的线程,线程使用后就被销毁了,对内存形成了很大的浪费。安全
package com.datang.bingxiang.run; import java.util.concurrent.ArrayBlockingQueue; import com.datang.bingxiang.run.intr.Executor; //只有一个线程,在实例化后就启动线程。用户调用execute()传递的Runnable会添加到队列中。 //队列有一个固定的容量3,若是队列满则抛弃任务。 //线程的run方法不停的循环,从队列里取Runnable而后执行其run()方法。 public class SingThreadExecutor implements Executor { // ArrayBlockingQueue 数组类型的有界队列 // LinkedBlockingDeque 链表类型的有界双端队列 // LinkedBlockingQueue 链表类型的有界单向队列 private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1); //线程不停的从队列获取任务 private Thread worker = new Thread(() -> { while (true) { try { //take会在获取不到任务时阻塞。而且也有Lock锁 Runnable r = queue.take(); r.run(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); // 构造函数启动线程 public SingThreadExecutor() { worker.start(); } @Override public void execute(Runnable r) { // 这个offer和add不一样的是offer有Lock锁,若是队列满则返回false。 // add则是队列满抛出异常,而且没有Lock锁。 if (!queue.offer(r)) { System.out.println("线程等待队列已满,不可加入。本次任务丢弃!"); } } }
改变下思路,此次线程池实现类只建立一个线程,调用者发布的任务都存放到一个队列中(队列符合先进先出的需求)可是注意咱们设计线程池必定要选择有界队列,由于咱们不能无限制的往队列中添加任务。在队列满后,在进来的任务就要被拒绝掉。ArrayBlockingQueue数据结构
是一个底层有数组实现的有界阻塞队列,实例化一个ArrayBlockingQueue传递参数为1,表示队列长度最大为1.惟一的一个工做线程也是成员变量,线程执行后不断的自旋从队列中获取任务,take()方法将队列头的元素出队,若队列为空则阻塞,这个方法是线程安全的。多线程
execute(r)方法接收到任务后,将任务添加到队列中,offer()方法将元素添加到队列若队列已满则返回false。execute(r)则直接拒绝掉本次任务。app
SingThreadExecutor线程池的缺点是只有一个工做线程,这样显然是不够灵活,CorePollThreadExecutor中增长了corePollSize核心线程数参数,由用户规定有须要几个工做线程。此次咱们选用的队列为LinkedBlockingQueue这是一个数据结构为链表的有界阻塞单向队列。
ide
initThread()方法根据corePollSize循环建立N个线程,线程建立后一样调用take()方法从阻塞队列中获取元素,若获取成功则执行Runnable的run()方法,若获取队列中没有元素则阻塞。execute(r)则仍是负责将任务添加到队列中。函数
CorePollThreadExecutor中有三个问题
1 当队列满时线程池直接拒绝了任务,这应该让用户决定被拒绝的任务如何处理。
2 线程的建立策略也应该交给用户作处理。
3 初始化后就建立了N个核心线程数,可是这些线程可能会用不到而形成浪费。
RejectedExecutionHandler接口的实现应该让用户决定如何处理队列满的异常状况。
package com.datang.bingxiang.run.intr; public interface RejectedExecutionHandler { public void rejectedExecution(); }
package com.datang.bingxiang.run; import com.datang.bingxiang.run.intr.RejectedExecutionHandler; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution() { System.out.println("队列已经满了!!!!当前task被拒绝"); } }
ThreadFactory接口的实现应该让用户决定建立线程的方法。
package com.datang.bingxiang.run.intr; public interface ThreadFactory { public Thread newThread(Runnable r); }
package com.datang.bingxiang.run; import com.datang.bingxiang.run.intr.ThreadFactory; public class CustomThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { System.out.println("建立了新的核心线程"); return new Thread(r); } }
CountCorePollThreadExecutor的构造函数接收三个参数corePollSize,rejectedExecutionHandler,threadFactory。由于如今咱们须要按需建立核心线程,因此须要一个变量workCount记录当前已经建立的工做线程,为了保证线程之间拿到的workCount是最新的(可见性),咱们须要给变量workCount加上volatile修饰,保证改变了的修改能被全部线程看到。execute(r)首先要调用initThread(r)判断是否有线程被建立,若是没有线程建立则表示工做线程数已经和核心线程数相同了,此时须要将新的任务添加到队列中,若是队列满,则执行传入的拒绝策略。重要的方法在于initThread(r)。initThread(r)方法返回true表示有工做线程被建立任务将被工做线程直接执行,无需入队列。返回false则将任务入队,队列满则执行拒绝策略。
fill变量表示核心线程数是否所有建立,为了保证多线程的环境下不会建立多于corePoolSize个数的线程,因此须要使用同步锁,initThread(r)都要使用锁则会下降效率,尤为是当工做线程数已经到达核心线程数后,因此这一块代码使用到了双重判断,当加锁后在此判断工做线程是否已满。若是已满返回false。接下来使用threadFactory工厂建立线程,在线程中使用代码块,保证当前任务能够被新建立的工做线程执行。新的工做线程依然是从队列中获取任务并执行。线程开启后工做线程++,若是工做线程数等于核心线程数则改变fill标记。返回true,成功建立线程,不要忘记在finally中释放锁。
package com.datang.bingxiang.run; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.datang.bingxiang.run.intr.Executor; import com.datang.bingxiang.run.intr.RejectedExecutionHandler; import com.datang.bingxiang.run.intr.ThreadFactory; public class CountCorePollThreadExecutor implements Executor { // 核心线程数 private Integer corePollSize; // 工做线程数,也就是线程实例的数量 private volatile Integer workCount = 0; // 线程是否已满 private volatile boolean fill = false; // 拒绝策略,由调用者传入,当队列满时,执行自定义策略 private RejectedExecutionHandler rejectedExecutionHandler; // 线程工厂,由调用者传入 private ThreadFactory threadFactory; public CountCorePollThreadExecutor(Integer corePollSize, RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) { this.corePollSize = corePollSize; this.rejectedExecutionHandler = rejectedExecutionHandler; this.threadFactory = threadFactory; } // 此次使用链表类型的单向队列 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1); @Override public void execute(Runnable r) { // 若是没有建立线程 if (!initThread(r)) { // offer和ArrayBlockingQueue的offer相同的做用 if (!queue.offer(r)) { rejectedExecutionHandler.rejectedExecution(); } } } // 同步锁,由于判断核心线程数和工做线程数的操做须要线程安全 Lock lock = new ReentrantLock(); public boolean initThread(Runnable r) { // 若是工做线程没有建立满则须要建立。 if (!fill) { try { lock.lock();// 把锁 加在判断里边是为了避免让每次initThread方法执行时都加锁 // 此处进行双重判断,由于可能由于多线程缘由多个线程都判断工做线程没有建立满,可是没关系 // 只有一个线程能够进来,若是后续线程二次判断已经满了就直接返回。 if (fill) { return false; } Thread newThread = threadFactory.newThread(() -> { // 由于线程是由任务触发建立的,因此先把触发线程建立的任务执行掉。 { r.run(); } while (true) { // 而后该线程则不停的从队列中获取任务 try { Runnable task = queue.take(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } }); newThread.start(); // 工做线程数+1 workCount++; // 若是工做线程数已经与核心线程数相等,则不可建立 if (workCount == corePollSize) { fill = true; } return true; } finally { lock.unlock();// 释放锁 } } else { // 工做线程已满则不建立 return false; } } }
最后考虑下,当工做线程数到达核心线程数后,队列也满了之后,任务就被拒绝了。能不能想个办法,当工做线程满后,多增长几个线程工做,当任务很少时在将扩展的线程销毁。ThreadPoolExecutor的构造函数中新增三个参数maximumPoolSize最大线程数keepAliveTime空闲时间,unit空闲时间的单位。
和CountCorePollThreadExecutor相比较在流程上讲咱们只须要在队列满时判断工做线程是否和最大线程数相等,若是不相等则建立备用线程,而且在备用线程长时间不工做时须要销毁掉工做线程。create()方法双重判断workCount==maximumPoolSize若是已经相等表示已经不能建立线程了,此时只能执行拒绝策略。不然建立备用线程,备用线程建立后自旋的执行poll(l,u)方法,该方法也是取出队列头元素,和take()不一样的是,poll若是一段时间后仍然从队列中拿不到元素(队列为空)则返回null,此时咱们须要将该备用线程销毁。在建立线程后将workCount++。此外须要注意,由于当前队列满了,因此才会建立备用线程因此不要将当前的任务给忘了,LinkedBlockingQueue的put(r)方法会阻塞的添加元素,直到添加成功。最后 stop()判读若是workCount>corePollSize则在线程安全的环境下将线程中止,而且将workCount--。
package com.datang.bingxiang.run; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.datang.bingxiang.run.intr.Executor; import com.datang.bingxiang.run.intr.RejectedExecutionHandler; import com.datang.bingxiang.run.intr.ThreadFactory; public class ThreadPoolExecutor implements Executor { // 核心线程数 private Integer corePollSize; // 工做线程数,也就是线程实例的数量 private Integer workCount = 0; // 当队列满时,须要建立新的Thread,maximumPoolSize为最大线程数 private Integer maximumPoolSize; // 当任务很少时,须要删除多余的线程,keepAliveTime为空闲时间 private long keepAliveTime; // unit为空闲时间的单位 private TimeUnit unit; // 线程是否已满 private boolean fill = false; // 拒绝策略,由调用者传入,当队列满时,执行自定义策略 private RejectedExecutionHandler rejectedExecutionHandler; // 线程工厂,由调用者传入 private ThreadFactory threadFactory; // 此次使用链表类型的单向队列 BlockingQueue<Runnable> workQueue; public ThreadPoolExecutor(Integer corePollSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { this.corePollSize = corePollSize; this.rejectedExecutionHandler = rejectedExecutionHandler; this.threadFactory = threadFactory; this.workQueue = workQueue; this.maximumPoolSize = maximumPoolSize; this.keepAliveTime = keepAliveTime; this.unit = unit; } @Override public void execute(Runnable r) { // 若是没有建立线程 if (!initThread(r)) { // offer和ArrayBlockingQueue的offer相同的做用 if (!workQueue.offer(r)) { // 队列满了之后先不走拒绝策略而是查询线程数是否到达最大线程数 if (create()) { Thread newThread = threadFactory.newThread(() -> { while (true) { // 而后该线程则不停的从队列中获取任务 try { Runnable task = workQueue.poll(keepAliveTime, unit); if (task == null) { stop(); } else { task.run(); } } catch (InterruptedException e) { e.printStackTrace(); } } }); newThread.start(); // 工做线程数+1 workCount++; // 增长线程后,还须要将本应该被拒绝的任务添加到队列 try { // 这个put()方法会在队列满时阻塞添加,直到添加成功 workQueue.put(r); } catch (InterruptedException e) { e.printStackTrace(); } } else { rejectedExecutionHandler.rejectedExecution(); } } } } Lock clock = new ReentrantLock(); private boolean create() { //双重检查 if (workCount == maximumPoolSize) { return false; } try { clock.lock(); if (workCount < maximumPoolSize) { return true; } else { return false; } } finally { clock.unlock(); } } Lock slock = new ReentrantLock(); // 销毁线程 private void stop() { slock.lock(); try { if (workCount > corePollSize) { System.out.println(Thread.currentThread().getName() + "线程被销毁"); workCount--; Thread.currentThread().stop(); } } finally { slock.unlock(); } } // 获取当前的工做线程数 public Integer getworkCount() { return workCount; } // 同步锁,由于判断核心线程数和工做线程数的操做须要线程安全 Lock lock = new ReentrantLock(); public boolean initThread(Runnable r) { // 若是工做线程没有建立满则须要建立。 if (!fill) { try { lock.lock();// 把锁 加在判断里边是为了避免让每次initThread方法执行时都加锁 // 此处进行双重判断,由于可能由于多线程缘由多个线程都判断工做线程没有建立满,可是没关系 // 只有一个线程能够进来,若是后续线程二次判断已经满了就直接返回。 if (fill) { return false; } Thread newThread = threadFactory.newThread(() -> { // 由于线程是由任务触发建立的,因此先把触发线程建立的任务执行掉。 { r.run(); } while (true) { // 而后该线程则不停的从队列中获取任务 try { Runnable task = workQueue.take(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } }); newThread.start(); // 工做线程数+1 workCount++; // 若是工做线程数已经与核心线程数相等,则不可建立 if (workCount == corePollSize) { fill = true; } return true; } finally { lock.unlock();// 释放锁 } } else { // 工做线程已满则不建立 return false; } } }
package com.datang.bingxiang.run.test; import java.time.LocalDateTime; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import com.datang.bingxiang.run.CorePollThreadExecutor; import com.datang.bingxiang.run.CountCorePollThreadExecutor; import com.datang.bingxiang.run.CustomRejectedExecutionHandler; import com.datang.bingxiang.run.CustomThreadFactory; import com.datang.bingxiang.run.NewsThreadExecutor; import com.datang.bingxiang.run.SingThreadExecutor; import com.datang.bingxiang.run.ThreadPoolExecutor; import com.datang.bingxiang.run.intr.Executor; @RestController public class TestController { private int exe1Count = 1; Executor newsThreadExecutor = new NewsThreadExecutor(); // 每次都建立新的线程执行 @GetMapping(value = "exe1") public String exe1() { newsThreadExecutor.execute(() -> { System.out.println("正在执行" + exe1Count++); }); return "success"; } /* * 等待队列长度为1,三个线程加入,第一个加入后会迅速的出队列。剩下两个只有一个能够成功 加入,另外一个 则会被丢弃 */ private int exe2Count = 1; Executor singThreadExecutor = new SingThreadExecutor(); @GetMapping(value = "exe2") public String exe2() { singThreadExecutor.execute(() -> { System.out.println("正在执行" + exe2Count++); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }); return "success"; } private int exe3Count = 1; Executor corePollThreadExecutor = new CorePollThreadExecutor(2); @GetMapping(value = "exe3") public String exe3() { corePollThreadExecutor.execute(() -> { System.out.println("正在执行" + exe3Count++); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }); return "success"; } private int exe4Count = 1; Executor countCorePollThreadExecutor = new CountCorePollThreadExecutor(2, new CustomRejectedExecutionHandler(), new CustomThreadFactory()); @GetMapping(value = "exe4") public String exe4() { countCorePollThreadExecutor.execute(() -> { System.out.println("正在执行" + exe4Count++); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); return "success"; } // 第一次建立线程并执行 1 // 第二次进入队列 2 // 第三次建立线程取出队列中的2,将3添加到队列 // 第四次拒绝 // 等待3秒后只剩下一个队列 private int exe5Count = 1; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new CustomThreadFactory(), new CustomRejectedExecutionHandler()); @GetMapping(value = "exe5") public String exe5() { threadPoolExecutor.execute(() -> { System.out.println("正在执行" + exe5Count++); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }); return "success"; } @GetMapping(value = "workCount") public Integer getWorkCount() { return threadPoolExecutor.getworkCount(); } }