本次本身实现一个简单的线程池,主要是为了后续看ThreadPool
的源码作准备的,是从别人的代码中改进的,从看别人的源码中学到一些东西,因此特地把这篇文章写出来,方便之后本身去回顾本身是如何学习。固然也但愿分享出来能够对别人产生良好的影响!html
在本身实现一个线程池以前,首先要知道怎么用。由于知道怎么用以后才能去理解一些代码的编写。关于怎么用这里就再也不多加赘述了,百度或者谷歌一下就好,为了避免让读者花过多的时间去找,我找了一篇文章,说得比较清楚。java
咱们能够看到,除了Thread
和Runnable
,其余都是咱们本身定义的,下面咱们来逐一说明。git
在咱们开始分析以前,先说下线程池的工做流程,也方便你们后面看的时候心理有一个底。github
线程池顾名思义就是一个存放多个线程的池子。那么在计算机语言中,咱们就是用数据结构来存放线程,在本线程池中用的是一个队列来存放要处理任务的线程。因此在线程池一启动,线程池里面就应该有必定数量的线程数目了,那么这个线程的数目是多少咱们先不用管,只须要知道有一些线程在等待用户把所须要线程执行的任务放进池子里面。而后线程池里面的线程就会自动帮你执行任务啦。数据结构
固然有些人说,我执行一个任务就建立一个线程就行了呀,何须大费周章呢。咱们须要知道,来一个任务就建立一个线程,多线程
建立线程须要时间 ,影响响应速度。ide
系统资源有限,若是有数以万计的线程须要建立,会大大消耗系统资源,会下降系统的稳定性。函数
其实有不少任务的时候,有些线程只是处理一些很轻的任务,很快就完成了,那么若是下一个任务恰好到达的时候,以前的线程也恰好完成工做了,那么这个线程就顺便接下到来的任务,这样的话岂不是提升了响应速度,而后又重复利用了线程,下降系统资源的损耗。岂不是一箭双雕。post
以前都是恰巧,那么咱们稍微放宽一点条件。若是线程执行完任务了,就先别退出呗。而是在等待执行任务,这个线程就能够看作被赋予执行任务的命令!**就等着任务来,任务一来,我就去执行。任务执行结束,线程就等,直到下一个任务来。周而复始,直到手动关闭!**这就是线程池的本质。学习
那么问题来了,线程池里面只有5个线程在等待执行任务,但是同时来了10个任务须要执行,那么有5个任务被执行了,剩下那5个放哪里?难道被丢弃?这可不是咱们设计线程池的初衷!你确定能够想到,确定是拿同样数据结构去存储剩下的线程呀!(咱们用队列存储,而后称为工做队列。)由于线程处理任务的时间是不必定的,确定是有些线程处理的快,有些慢。因此谁先处理的快,谁就去处理剩下的任务。正所谓能者多劳!
再抛出一个问题,假如前面5个线程执行得很慢,那么后面那5个线程就须要等好久,这时候还不如直接建立线程去操做呢,没错,线程池在设计的时候也想到过这个问题,关于这个问题在后面咱们设计的时候会说道,这里就先往下看吧!
既然涉及到多线程,那么确定就涉及到同步的问题,对哪一个对象须要同步呢?固然是任务队列啦。咱们须要知道颇有可能同时会有不少个线程对同一个任务队列取任务和听任务的,因此为了实现同步,咱们这里用了synchronized
关键字实现同步,也就是对这个任务队列加一把锁,哪一个线程能够拿到操做任务队列的锁哪一个线程就能够领取任务。没拿到这把锁的线程就死等,除非被中断或者手动关闭。
这里须要注意的是挂起阻塞和等待拿锁的区别。
挂起阻塞是该线程拿到锁以后调用await
方法才会进入的状态,前提是先拿到锁。被通知以后就会被唤醒,而后从await
以后的代码执行。
等待拿锁是别的线程还在占有锁,此时的线程还没拿到锁,就会进入这个锁的entrySet序列等待,直到锁被释放而后再去抢,抢到为止!
通过上面的讲解,咱们能够基本了解了线程池的设计思想和原理,下面补充点内容。
线程池内部有两个数据结构(队列)分别存放须要执行任务的线程(也叫工做线程)和所须要被**执行的任务*。
线程池初始化的线程放在工做队列里面,用户想要执行的任务放在任务队列
在用户添加任务以后,会通知工做队列的线程去取任务啦!
工做队列的线程若是有空而且任务队列不为空,哪一个线程拿到锁哪一个线程就能够在任务队列取任务,而后任务队列的任务数就-1。
不少个线程去拿锁的时候,只能有一个线程拿到。其余没拿到锁的线程不是阻塞等待,而是等待拿锁!
若是拿到锁以后任务队列为空,就挂起阻塞。若是被通知唤醒,继续执行3 4 5 6操做。
先看看咱们这个整个线程池的流程图,这样设计的时候就知道怎么回事了!
BaseThreadPool
先看看这个类的基本属性
public class BaseThreadPool extends Thread implements ThreadPool {
/*初始化线程数*/
private int initSize;
/*最大工做线程数*/
private int maxSize;
/*核心线程数*/
private int coreSize;
/*当前活跃线程数*/
private int activityCount = 0;
/*指定任务队列的大小数*/
private int queueSize;
/*建立工做线程的工厂,在构造方法由线程池规定好*/
private ThreadFactory threadFactory;
/*1. 任务队列,在构造方法由线程池规定好*/
private RunnableQueue runnableQueue;
//2. 工做队列
private final static Queue<ThreadTask> threadQueue = new ArrayDeque<>();
//3. 本线程池默认的拒绝策略
private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.IgnoreDenyPolicy();
/*4. 默认的线程工厂*/
private final static ThreadFactory DEFAULT_THREAD_FACTORY =new DefaultThreadFactory();
/*线程池是否关闭,默认为false*/
boolean isShutdown = false;
private long keepAliveTime;
private TimeUnit timeUnit ;
复制代码
由上面的属性咱们知道,咱们自定义的线程池这个类是依赖于几个类的。
依次是 RunnableQueue
,DenyPolicy
,ThreadFactory
。
而且由总览图咱们知道,BaseThreadPool
是实现了咱们定义的ThreadPool
接口和继承了Thread类,而且重写了run方法
run 里面的逻辑到后面再分析,这里能够先跳过这里。
@Override
public void run() { // BaseThreadPool
while (!isShutdown && !isInterrupted()){
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
//到这里就是关闭线程池了
isShutdown = true;
continue;
}
// 这里同步代码块,保证了每次访问的时候都是最新的数据!
synchronized (this){
if(isShutdown) break;
// 任务队列不为空,而且当前能够工做的线程小于coreCount,那么说明工做线程数不够,就先增长到maxSize
// 好比说coreSize 为20,initSize为10,maxSize 为30,
// 忽然一会儿来了20分线程进来,可是工做线程只有15个,因为某种缘由可能那15个工做现场还没执行完,那么此时的任务队列确定还有剩余的,发现此时线程还没到coreSize
// 那么就直接开maxSize个线程先把
if(runnableQueue.size() > 0){
for (int i = runnableQueue.size(); i < maxSize; i++) {
newThread();
}
}
// 任务队列为空,而且当前能够工做的线程数大于coreCount,工做线程数太多啦!那么就减小到coreCount
if(runnableQueue.size() == 0 && activityCount > coreSize){
for (int i = coreSize; i < activityCount; i++) {
removeThread();
}
}
}
}
}
复制代码
咱们先来看下BaseThreadPool的构造方法
//1 用户传入初始化线程数,最大线程数,核心线程数,和任务队列的大小便可
public BaseThreadPool(int initSize, int maxSize, int coreSize,int queueSize) {
/*这里建立线程的工厂和拒绝策略都是用本身定义好的对象*/ this(initSize,maxSize,coreSize,queueSize,DEFAULT_THREAD_FACTORY,DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
}
// 2
public BaseThreadPool(int initSize, int maxSize, int coreSize, int queueSize, ThreadFactory threadFactory, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
this.initSize = initSize; //初始化线程池的初始化线程数
this.maxSize = maxSize; // 初始化线程池能够拥有最大的线程数
this.coreSize = coreSize; // 这个值的意义后面说
this.threadFactory = threadFactory; //初始化建立线程池的工厂
//自定义存听任务的队列
this.runnableQueue = new LinkRunnableQueue(queueSize,denyPolicy,this); //RunnableQueue的实现类,本身定义
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
this.init(); //初始化函数
}
// ---init()
public void init(){
/*启动本线程池*/
this.start();//BaseThreadPool 继承了 Thread,缘由后面说
/*初始化initSize个线程在线程池中*/
for (int i = 0; i < initSize; i++) {
newThread();
}
}
// newThread()
public void newThread(){
/*建立工做线程,而后让工做线程等待任务到来被唤醒*/
Woker woker = new Woker(runnableQueue);
Thread thread = threadFactory.createThread(woker);
/*将线程和任务包装在一块儿*/
ThreadTask threadTask = new ThreadTask(thread,woker);
threadQueue.offer(threadTask);
this.activityCount++;
/*启动刚才新建的线程*/
thread.start();
}
// 再看看DefaultThreadFactory,就是
/*工厂建立一个新的线程*/
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(0); //线程组号
//计数
private static AtomicInteger COUNTER = new AtomicInteger(1);
private static final ThreadGroup group = new ThreadGroup("MyThreadPool-" + GROUP_COUNTER.getAndIncrement());
@Override
public Thread createThread(Runnable runnable) {
return new Thread(group,runnable,"threadPool-" + COUNTER.getAndIncrement());
}
}
复制代码
这里说明一下,咱们是能够这样new Thread(new Runnable(){....}).start
建立而且启动线程的。就是调用Thread
须要传入一个Runnable
实例的构造函数实例化Thread
类,经过重写Runnable
里面的run
方法就能够指定线程在启动的时候须要作的事。
咱们看到DefaultThreadFactory
就只有一个建立线程的方法,就是把线程启动后须要作的任务指定一下和重命名一下线程,就是用上面说明的方法。因此传给须要传给createThread
方法一个实现Runnable
的类。而这个类就是Woker
咱们看下Woker的代码
//------------Woker BaseThreadPool依赖的类
/*工做线程的任务*/
public class Woker implements Runnable{
/*任务队列,方便后面取出任务*/
private RunnableQueue runnableQueue;
/*方便判断该内部任务对应的线程是否运行,确保可见性!*/
private volatile boolean running = true;
public Woker(RunnableQueue runnableQueue) {
this.runnableQueue = runnableQueue;
}
@Override
public void run() {
/*当前对应的线程正在运行而且没有被中断*/
while (running && !Thread.currentThread().isInterrupted()){
//调用take的时候,若是任务队列没任务就会阻塞在这,直到拿到任务
Runnable task = runnableQueue.take();
task.run();
}
}
public void stop(){
running = false;
}
}
复制代码
咱们看到run
方法,这个任务就是去到任务队列里面取任务,而后执行。直到当前工做中止或者当前线程被中断。而这个任务队列就是咱们在调用构造函数的时候指定的对象,也就是这段代码
this.runnableQueue = new LinkRunnableQueue(queueSize,denyPolicy,this);
接下来看下LinkRunnableQueue
是怎么实现的
public class LinkRunnableQueue implements RunnableQueue{//BaseThreadPool依赖的类
//指定任务队列的大小
private int limit;
//也是使用BaseThreadPool传进来的默认拒绝策略
private DenyPolicy denyPolicy;
//这里传进BaseThreadPool实例
private ThreadPool threadPool;
//这个就是真正存储Runnable实例对象的数据结构!也就是一个链表
private LinkedList<Runnable> queue = new LinkedList<>();
//构造函数,也就是初始化这个类的属性
public LinkRunnableQueue(int queueSize,DenyPolicy denyPolicy,ThreadPool pool) {
this.limit = queueSize;
this.denyPolicy = denyPolicy;
this.threadPool = pool;
}
//任务队列添加任务,这个方法通常由线程池的execute方法调用
@Override
public void offer(Runnable runnable) {
//由于任务队列只有一个,可能会有多个线程同时操做任务队列,因此要考虑同步问题
//取得queue的锁才能加入任务,拿不到所就进入queue的entrySet
synchronized (queue){
if(queue.size() > limit){
//若是此时任务队列超过限制的值,那么就拒绝!
denyPolicy.reject(runnable,threadPool);
}else{
//把任务加入到任务队列呗
queue.addLast(runnable);
//唤醒等待的线程,这些线程在queue的waitSet里面,要结合take方法
queue.notifyAll();
}
}
}
//线程从任务队列里面拿任务,若是拿不到就会阻塞,直到有任务来而且抢到
@Override
public Runnable take() {
//这里以前也说过了,要先拿到锁才能拿任务
synchronized (queue){
//若是任务队列为空,那么确定拿不了,因此就等待呗
while (queue.size() == 0){
try {
//这个线程在这里就等待让出锁,直到执行offer方法从而被唤醒,而后
//再从新抢到锁,这里是个循环,若是被唤醒后,也抢到锁了,可是队列
//仍是空的话,继续等待
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//到这里执行这个方法的线程就是抢到锁了,而后获得任务啦!
return queue.removeFirst();
}
}
//返回调用该方法时任务队列有多少个任务在等待
@Override
public int size() {
synchronized (queue){
return queue.size();
}
}
}
复制代码
代码的注释已经解释得很清楚了,这里主要是了解为何Work中的Runnable task = runnableQueue.take()
中没有任务会阻塞等待,本质就是
1 拿到queue对象锁以后,任务队列没任务,释放掉真正存储任务的对象的对象锁,从而进入该对象的waitSet队列里面等待被唤醒。
2 固然若是没拿到锁也会一直等待拿到锁,而后像1同样.
若是看到这里看不太明白的,你们能够先回去看一下java线程的基本知识和
synchronized
的详解,这样能够更好地把知识串联起来!
接下来咱们再看下 工做队列是什么样子。
ThreadTask在BaseThreadPool的一个内部类
//把工做线程和内部任务绑定在一块儿
class ThreadTask{
Thread thread;
Woker woker;
public ThreadTask(Thread thread, Woker woker) {
this.thread = thread;
this.woker = woker;
}
}
复制代码
从上面的代码咱们知道,ThreadTask就是把一个工做线程和一个工做线程的任务封装在一块儿而已,这里主要是为了后面线程池关闭的时候可让线程须要作的任务中止!
线程池关闭的操做 ,BaseThreadPool
类的方法
/*shutdown 就要把 Woker 给中止 和 对应的线程给中断*/
@Override
public void shutDown() {
synchronized (this){
if(isShutDown())
return;
//设置标志位,让线程池线程也执行完run方法,而后退出线程。
isShutdown = true;
/*所有线程中止工做*/
for (ThreadTask task: threadQueue
) {
//1 这里就是把Woker实例对象的running置为false
task.woker.stop();
//2 中断执行对应任务的线程
task.thread.interrupt();
}
}
}
复制代码
能够看到关闭线程池,就是遍历存放工做线程的队列,1和2都是破坏Woker对象的while循环条件,从而让Woker对象的run
方法执行结束。(这里你们能够看下Woker这个类的run
方法就明白我说的了)
咱们在开始的时候说过,BaseThreadPool
启动的时候其实也是一个线程,在它的init
方法中就调用了start
方法表示执行run
里面的逻辑,以前咱们看了run的代码,可是没分析,如今就来分析吧
@Override
public void run() { //BaseThreadPool类的方法
//还记得shutDown()方法里面的 isShutdown = true语句吗?
//做用就是为了让这里下一次判断while循环的时候退出,而后执行完run啦!
while (!isShutdown && !isInterrupted()){
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
//若是线程池这个线程被中断
//到这里就是关闭线程池了,也是把isShutdown设置为我true!
isShutdown = true;
continue;
}
// 这里同步代码块,保证了每次访问的时候都是最新的数据!
synchronized (this){
if(isShutdown) break;
//任务队列不为空,而且当前能够工做的线程小于coreCount,那么说明工做 //线程数不够,就先增长到maxSize.
//好比说coreSize 为20,initSize为10,maxSize 为30,
//忽然一会儿来了20分线程进来,可是工做线程只有15个,因为某种缘由可能
//那15个工做现场还没执行完,那么此时的任务队列确定还有剩余的,发现此
//时线程还没到coreSize
//那么就直接开maxSize个线程先把
//若是发现如今工做的的线程已通过了coreSize就先不增长线程数啦
if(runnableQueue.size() > 0 && activityCount < coreSize){
for (int i = runnableQueue.size(); i < maxSize; i++) {
newThread();
}
}
// 任务队列为空,而且当前能够工做的线程数大于coreCount,工做线程数太多啦!那么就减小到coreCount基本大小把
if(runnableQueue.size() == 0 && activityCount > coreSize){
for (int i = coreSize; i < activityCount; i++) {
removeThread();
}
}
}
}
}
//----------removeThread()
// 线程池中去掉某个工做线程,这里的操做是否是很相似shutDown的内容
public void removeThread(){
this.activityCount--;
ThreadTask task = threadQueue.remove();
task.woker.stop();//就是破坏Woker对象的while循环的条件
}
复制代码
上面的注释讲解的比较清楚,有啥不懂的多看几篇,本身模拟一下思路就好啦!
在run
方法中,重要的是关于线程池中的线程数量的动态变化的部分。
coreSize:线程池基本的大小,至关于一个分界线
initSize:线程池的初始化大小,这枚啥好说的
activityCount:当前工做线程的数量
maxSIze:线程池中最大的线程数目
说一下它们之间的关系
任务队列不为空的状况下
activityCount < coreSize的时候,就说明线程池的数量没到达基本大小,就新增线程,直接新增到最大!
activityCount >= coreSize的时候,说明当前线程池的工做线程数量已经到达基本大小,有任务来就须要等一下啦!
注意:这里的扩容机制只是简单地扩容,Java中实现的线程池并非像我说那样扩容的,这就解决了开头的问题啦,具体的到时候仍是分析源码的时候再说把!这里只是简单地实现一下!
测试代码
package blogDemo.ThreadDemo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
public static void main(String[] args) {
ThreadPool threadPool = new BaseThreadPool(4,30,6,30);
for (int i = 0; i < 20; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running and done.");
});
}
}
}
复制代码
测试结果
本篇文章就写到这里啦,你们看文章的时候能够一边看代码一边看解释,这样会更加容易理解,但愿对读者后面理解java自带线程池有所帮助,下一篇文章就分析java自带的线程池的源码啦!