用java自制简易线程池(不依赖concurrent包)

好久以前人们为了继续享用并行化带来的好处而不想使用进程,因而创造出了比进程更轻量级的线程。以linux为例,建立一个进程须要申请新的本身的内存空间,从父进程拷贝一些数据,因此开销是比较大的,线程(或称轻量级进程)能够和父进程共享内存空间,让建立线程的开销远小于建立进程,因而就有了如今多线程的繁荣。
可是即使建立线程的开销很小,但频繁建立删除也是很浪费性能的,因而人们又想到了线程池,线程池里的线程只会被建立一次,用完也不会销毁,而是在线程池里等待被重复利用。这种尤为适用于多而小的任务。举个极端点的例子,若是一个小任务的执行消耗不及建立和销毁一个线程的消耗,那么不使用线程池时一大半的性能消耗都会是线程建立和销毁。 最开始学java的时候,一直不理解线程池,尤为是理解不了线程是如何被复用的,以及线程池和我建立的Thread/Runnable对象有什么关系,今天咱们就来写一个建议的线程池来理解这一切。(不依赖java concurrent包)
首先纠正不少人的一个误解,咱们new一个Thread/Runnable对象的时候,并非建立出一个线程,而是建立了一个须要被线程执行的任务,当咱们调用Thread.start()方法的时候,jvm才会帮咱们建立一个线程。线程池只是帮你执行这些任务而已,你submit的时候只是把这个任务放到某个存储里,等待线程池里空闲的线程来执行,而不是建立线程。知道了这点,因此咱们首先得有个东西来存储任务,还要支持多线程下的存取,最好还支持阻塞以免无任务时的线程空转。
除了存储外,咱们还须要一些线程来消费这些任务,看到这你可能就很明白的知道了这实际上是个生产者消费者模型,Java有好多种生产者消费者的实现,能够参考我以前的博客Java生产者消费者的几种实现方式。若是实现线程池,咱们能够选择使用BlockingQueue来实现。虽然java concurrent包里已经实现了好多BlockingQueue,但为了让你们理解BlockingQueue作了啥,我这里用LinkedListQueue简单封装了一个简易BlockingQueue,代码以下。
java

package me.xindoo.concurrent;

import java.util.LinkedList;
import java.util.Queue;

public class LinkedBlockingQueue<E> {
    private Object lock;
    private Queue<E> queue;
    public LinkedBlockingQueue() {
        queue = new LinkedList<>();
        lock = new Object();
    }

    public boolean add(E e) {
        synchronized (lock) {
            queue.add(e);
            lock.notify();
            return true;
        }
    }

    public  E take() {
        synchronized (lock) {
            while (queue.isEmpty()) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return queue.poll();
        }
    }

    public synchronized int size() {
        return queue.size();
    }
}
复制代码

我也只是简单在LinkedListQueue的基础上对其加了synchronized,以保证它在多线程环境下的正常运转。其次我在队列为空时经过wait()方法加了线程阻塞,以防止空队列时线程空转。既然加了阻塞也得加唤醒,每次在往队列里添加任务的时候,就会调用notify()来唤醒一个等待中的线程。
存储搞定了,咱们接下来须要实现的就是消费者。消费者就是线程池里的线程,由于任务队列里的任务都是实现了Runnable接口,因此咱们消费任务时均可以直接调用其run()方法来执行。当一个任务执行完成后在从队列里去取,知道整个线程池被shutdown。
linux

package me.xindoo.concurrent;

public class ThreadPool {
    private int coreSize;
    private boolean stop = false;
    private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
    private Thread[] threads;

    public ThreadPool(int coreSize)  {
        this.coreSize = coreSize;
        threads = new Thread[coreSize];
        for (int i = 0; i < coreSize; i++) {
            threads[i] = new Thread(new Worker("thread"+i));
            threads[i].start();
        }
    }

    public boolean submit(Runnable command) {
        return tasks.add(command);
    }

    public void shutdown() {
        stop = true;
    }

    private class Worker implements Runnable {
        public String name;

        public Worker(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            while (!stop) {
                Runnable command = tasks.take();
                System.out.println(name + " start run, starttime " + System.currentTimeMillis()/1000);  //方便观察线程的执行状况
                command.run();
                System.out.println(name + " finished, endtime " + System.currentTimeMillis()/1000);
            }
        }
    }
}
复制代码

上面就是一个线程池的实现,是否是很简单,在构造函数里初始化固定数目的线程,每一个线程作的只是从队列里取任务,执行……一直循环。
没错,一个简易的线程池就经过上面几十行的代码实现了,已经能够拿去用了,甚至用在生产环境都没啥问题(后果自负,哈哈)。固然这不是一个相似于concurrent包中功能完善、各类参数可自定义的线程池,但确确实实它实现了一个线程池的基本功能——线程的复用。 接下来写个建议的测试代码,若是线程池生产者消费者模型中的消费者,那这个测试代码就是生产者,代码以下。
多线程

package me.xindoo.concurrent;

public class Test {
    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
    public static void main(String[] args) {
        ThreadPool pool = new ThreadPool(5);

        for (int i = 0; i < 30; i++) {
            pool.submit(new Task());
        }

        System.out.println("add task finished");

        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }
}复制代码

执行结果以下jvm

thread0 start run, starttime 1566724202
thread2 start run, starttime 1566724202
thread1 start run, starttime 1566724202
thread3 start run, starttime 1566724202
thread4 start run, starttime 1566724202
add task finished
thread2 finished, endtime 1566724207
thread2 start run, starttime 1566724207
thread1 finished, endtime 1566724207
thread4 finished, endtime 1566724207
thread3 finished, endtime 1566724207
thread0 finished, endtime 1566724207
thread3 start run, starttime 1566724207
thread4 start run, starttime 1566724207
thread1 start run, starttime 1566724207
thread0 start run, starttime 1566724207
thread3 finished, endtime 1566724212
thread0 finished, endtime 1566724212
thread1 finished, endtime 1566724212
thread4 finished, endtime 1566724212
thread2 finished, endtime 1566724212复制代码

测试代码也很是简单,建立一个5个线程,而后提交30个任务,从输出也能够看到的确是5个线程分批次执行完了30个任务。备注:虽然我测试代码里的任务很是简单,其实复杂的任务也是能够的。ide

总结

实时上如上文中好几回提到,java.util.concurrent包里已经帮你们实现了一个很健壮、功能强大的线程池,你们没必要再去造轮子了,使用不一样的BlockingQueue就能够实现不一样功能的线程池。举个栗子,好比使用DelayedWorkQueue就能够实现能够按期执行的线程池了。 甚至Executors为你们封装了更为简易的线程池建立接口,可是《Alibaba Java开发手册》强制不容许使用 Executors 去建立线程池,而是经过 ThreadPoolExecutor 的方式,这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。
函数

  1. FixedThreadPool 和 SingleThreadPool: 容许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而致使 OOM。
  2. CachedThreadPool 和 ScheduledThreadPool: 容许的建立线程数量为 Integer.MAX_VALUE,可能会建立大量的线程,从而致使 OOM。

最后说点题外话,以前咱们一个服务启动的时候触发了一个jdk未修复的bug bugs.java.com/bugdatabase…,致使线程池里全部的任务都被阻塞,但其余工做线程一直在往里提交任务,由于咱们直接使用了Executors.FixedThreadPool 因此最后内存爆了..... 后来咱们的就结局方案就是直接使用ThreadPoolExecutor,限制了BlockingQueue的大小。
性能

版权声明:本文为博主原创文章,转载请注明出处。 博客地址:xindoo.blog.csdn.net/测试

相关文章
相关标签/搜索