死磕 java线程系列之本身动手写一个线程池

mythreadpool

(手机横屏看源码更方便)java


问题

(1)本身动手写一个线程池须要考虑哪些因素?编程

(2)本身动手写的线程池如何测试?安全

简介

线程池是Java并发编程中常用到的技术,那么本身如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池。并发

属性分析

线程池,顾名思义它首先是一个“池”,这个池里面放的是线程,线程是用来执行任务的。ide

首先,线程池中的线程应该是有类别的,有的是核心线程,有的是非核心线程,因此咱们须要两个变量标识核心线程数量coreSize和最大线程数量maxSize。测试

为何要区分是否为核心线程呢?这是为了控制系统中线程的数量。this

当线程池中线程数未达到核心线程数coreSize时,来一个任务加一个线程是能够的,也能够提升任务执行的效率。线程

当线程池中线程数达到核心线程数后,得控制一下线程的数量,来任务了先进队列,若是任务执行足够快,这些核心线程很快就能把队列中的任务执行完毕,彻底没有新增线程的必要。code

当队列中任务也满了,这时候光靠核心线程就没法及时处理任务了,因此这时候就须要增长新的线程了,可是线程也不能无限制地增长,因此须要控制其最大线程数量maxSize。接口

其次,咱们须要一个任务队列来存听任务,这个队列必须是线程安全的,咱们通常使用BlockingQueue阻塞队列来充当,固然使用ConcurrentLinkedQueue也是能够的(注意ConcurrentLinkedQueue不是阻塞队列,不能运用在jdk的线程池中)。

最后,当任务愈来愈多而线程处理却不及时,早晚会达到一种状态,队列满了,线程数也达到最大线程数了,这时候怎么办呢?这时候就须要走拒绝策略了,也就是这些没法及时处理的任务怎么办的一种策略,经常使用的策略有丢弃当前任务、丢弃最老的任务、调用者本身处理、抛出异常等。

根据上面的描述,咱们定义一个线程池一共须要这么四个变量:核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

另外,为了便于给线程池一个名称,咱们再加一个变量:线程池的名称name。

因此咱们得出了线程池的属性及构造方法大概以下:

public class MyThreadPoolExecutor implements Executor {
    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }
}

任务流向分析

根据上面的属性分析,基本上咱们已经获得了任务流向的完整逻辑:

首先,若是运行的线程数小于核心线程数,直接建立一个新的核心线程来运行新的任务。

其次,若是运行的线程数达到了核心线程数,则把新任务入队列。

而后,若是队列也满了,则建立新的非核心线程来运行新的任务。

最后,若是非核心线程数也达到最大了,那就执行拒绝策略。

mythreadpool

代码逻辑大体以下:

@Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 若是正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不必定添加成功,addWorker()方法里面还要判断一次是否是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 若是添加核心线程失败,进入下面的逻辑
        }

        // 若是达到了核心线程数,先尝试让任务入队
        // 这里之因此使用offer(),是由于若是队列满了offer()会当即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
            // 本文由公从号“彤哥读源码”原创
        } else {
            // 若是入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 若是添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

建立线程逻辑分析

首先,建立线程的依据是正在运行的线程数量有没有达到核心线程数或者最大线程数,因此咱们还须要一个变量runningCount用来记录正在运行的线程数。

其次,这个变量runningCount须要在并发环境下加加减减,因此这里须要使用到Unsafe的CAS指令来控制其值的修改,用了CAS就要给这个变量加上volatile修饰,为了方便咱们这里直接使用AtomicInteger来做为这个变量的类型。

而后,由于是并发环境中,因此须要判断runningCount < coreSize(或maxSize)(条件一)的同时修改runningCount CAS加一(条件二)成功了才表示能够增长一个线程,若是条件一失败则表示不能再增长线程了直接返回false,若是条件二失败则表示其它线程先修改了runningCount的值,则重试。

最后,建立一个线程并运行新任务,且不断从队列中拿任务来运行,本文由公从号“彤哥读源码”原创。

mythreadpool

代码逻辑以下:

private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是否是真的能够建立一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程仍是非核心线程
            int max = core ? coreSize : maxSize;
            // 不知足建立线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,能够建立线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 建立线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,若是取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

取任务逻辑分析

从队列中取任务应该使用take()方法,这个方法会一直阻塞直至取到任务或者中断,若是中断了就返回null,这样当前线程也就能够安静地结束了,另外还要注意中断了记得把runningCount减一。

private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null能够结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

好了,到这里咱们本身的线程池就写完了,下面咱们一块儿来想一想怎么测试呢?

测试逻辑分析

咱们再来回顾下本身的写的线程池的构造方法:

public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

name,这个随便传;

coreSize,咱们假设为5;

maxSize,咱们假设为10;

taskQueue,任务队列,既然咱们设置的是有边界的,咱们就用最简单的ArrayBlockingQueue好吧,容量设置为15,这样里面最多能够存储15条任务;

rejectPolicy,拒绝策略,咱们假设使用丢弃当前任务的策略,OK,咱们来实现一个。

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

OK,这样一个线程池就建立完成了,下面就是执行任务了,咱们假设经过for循环接二连三地添加100个任务好很差。

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

咱们分析下这段程序:

(1)先连续建立了5个核心线程,并执行了新任务;

(2)后面的15个任务进了队列;

(3)队列满了,又连续建立了5个线程,并执行了新任务;

(4)后面的任务就没得执行了,所有走了丢弃策略;

(5)因此真正执行成功的任务应该是 5 + 15 + 5 = 25 条任务;

运行之:

thread name: core_test2
thread name: core_test5
thread name: core_test3
thread name: core_test4
thread name: core_test1
thread name: test6
thread name: test7
thread name: test8
thread name: test9
discard one task
thread name: test10
discard one task
...省略被拒绝的任务
本文由公从号“彤哥读源码”原创
discard one task
running: 1570546871851: 2
running: 1570546871851: 8
running: 1570546871851: 7
running: 1570546871851: 6
running: 1570546871851: 5
running: 1570546871851: 3
running: 1570546871851: 4
running: 1570546871851: 1
running: 1570546871851: 10
running: 1570546871851: 9
running: 1570546872852: 14
running: 1570546872852: 20
running: 1570546872852: 19
running: 1570546872852: 17
running: 1570546872852: 18
running: 1570546872852: 16
running: 1570546872852: 15
running: 1570546872852: 12
running: 1570546872852: 13
running: 1570546872852: 11
running: 1570546873852: 21
running: 1570546873852: 24
running: 1570546873852: 23
running: 1570546873852: 25
running: 1570546873852: 22

能够看到,建立了5个核心线程、5个非核心线程,成功执行了25条任务,完成没问题,完美^^。

总结

(1)本身动手写一个线程池须要考虑的因素主要有:核心线程数、最大线程数、任务队列、拒绝策略。

(2)建立线程的时候要时刻警戒并发的陷阱;

彩蛋

咱们知道,jdk自带的线程池还有两个参数:keepAliveTime、unit,它们是干什么的呢?

答:它们是用来控制什么时候销毁非核心线程的,固然也能够销毁核心线程,具体的分析请期待下一章吧。

完整源码

Executor接口

public interface Executor {
    void execute(Runnable command);
}

MyThreadPoolExecutor线程池实现类

/**
 * 自动动手写一个线程池
 */
public class MyThreadPoolExecutor implements Executor {

    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 线程序列号
     */
    private AtomicInteger sequence = new AtomicInteger(0);
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;
    /**
     * 当前正在运行的线程数,本文由公从号“彤哥读源码”原创
     * 须要修改时线程间当即感知,因此使用AtomicInteger
     * 或者也可使用volatile并结合Unsafe作CAS操做(参考Unsafe篇章讲解)
     */
    private AtomicInteger runningCount = new AtomicInteger(0);

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

    @Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 若是正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不必定添加成功,addWorker()方法里面还要判断一次是否是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 若是添加核心线程失败,进入下面的逻辑
        }

        // 若是达到了核心线程数,先尝试让任务入队
        // 这里之因此使用offer(),是由于若是队列满了offer()会当即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
        } else {
            // 若是入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 若是添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是否是真的能够建立一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程仍是非核心线程
            int max = core ? coreSize : maxSize;
            // 不知足建立线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,能够建立线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 建立线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务,本文由公从号“彤哥读源码”原创
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,若是取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

    private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null能够结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

}

RejectPolicy拒绝策略接口

public interface RejectPolicy {
    void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
}

DiscardRejectPolicy丢弃策略实现类

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

测试类

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }
}
相关文章
相关标签/搜索