并发编程专题六-线程池的使用与分析

五一要结束了,是时候开始新的一波学习了~java

1、什么是线程池?为何要用线程池?

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和总体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短期任务时建立与销毁线程的代价。线程池不只可以保证内核的充分利用,还能防止过度调度。 数据库

优点:编程

  1. 下降资源的消耗。下降线程建立和销毁的资源消耗;
  2. 提升响应速度。例如:线程的建立时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间
  3. 提升线程的可管理性。

2、如何实现一个线程池

根据线程池的概念,若是要本身建立线程池,应该知足一下条件。缓存

  1. 保存线程的容器。由于线程必须在池子已经建立好了,而且能够保持住,所以,须要一个容器去保存咱们的线程。
  2. 能够接受外部任务。线程还要可以接受外部的任务,冰并运行这个任务。
  3. 保存任务的容器,有些任务可能来不及执行,所以须要未来不及执行的任务经过容器保存起来。

根据以上的条件以及以前咱们学的并发编程知识,咱们先手动本身尝试写一个线程池服务器

Code:网络

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * @Auther: DarkKing
 * @Date: 2019/5/4 12:09
 * @Description:
 */
public class MyThreadPool {
    // 线程池中默认线程的个数为5
    private static int WORK_NUM = 5;
    // 队列默认任务个数为100
    private static int TASK_COUNT = 100;  
    
    // 工做线程组
    private WorkThread[] workThreads;

    // 任务队列,做为一个缓冲
    private final BlockingQueue<Runnable> taskQueue;
    private final int worker_num;//用户在构造这个池,但愿的启动的线程数

    // 建立具备默认线程个数的线程池
    public MyThreadPool() {
        this(WORK_NUM,TASK_COUNT);
    }

    // 建立线程池,worker_num为线程池中工做线程的个数
    public MyThreadPool(int worker_num,int taskCount) {
    	if (worker_num<=0) worker_num = WORK_NUM;
    	if(taskCount<=0) taskCount = TASK_COUNT;
        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        workThreads = new WorkThread[worker_num];
        for(int i=0;i<worker_num;i++) {
        	workThreads[i] = new WorkThread();
        	workThreads[i].start();
        }
       
    }
    
    // 执行任务,其实只是把任务加入任务队列,何时执行有线程池管理器决定
    public void execute(Runnable task) {
    	try {
			taskQueue.put(task);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

    }


    // 销毁线程池,该方法保证在全部任务都完成的状况下才销毁全部线程,不然等待任务完成才销毁
    public void destroy() {
        // 工做线程中止工做,且置为null
        System.out.println("ready close pool.....");
        for(int i=0;i<worker_num;i++) {
        	workThreads[i].stopWorker();
        	workThreads[i] = null;//help gc
        }
        taskQueue.clear();// 清空任务队列
    }

    // 覆盖toString方法,返回线程池信息:工做线程个数和已完成任务个数
    @Override
    public String toString() {
        return "WorkThread number:" + worker_num
                + "  wait task number:" + taskQueue.size();
    }

    /**
     * 内部类,工做线程
     */
    private class WorkThread extends Thread{
    	
    	@Override
    	public void run(){
    		Runnable r = null;
    		try {
				while (!isInterrupted()) {
				    //监听阻塞队列,若是有任务,则执行相应的任务
					r = taskQueue.take();
					if(r!=null) {
						System.out.println(getId()+" ready exec :"+r);
						r.run();
					}
					r = null;//help gc;
				} 
			} catch (Exception e) {
				// TODO: handle exception
			}
    	}
    	
    	//中止线程
    	public void stopWorker() {
    		interrupt();
    	}
    	
    }
}

一、定义WorkThread类,用来表示执行的线程,用于监听阻塞队列任务。并发

二、建立构建函数,咱们将线程池进行初始化,并启动全部的工做线程。workThreads用来保存运行的线程,使用BlockingQueue<Runnable> taskQueue用来保存咱们的任务队列dom

三、建立提交任务方法execute,用于提交咱们的任务。异步

四、建立销毁线程池的方法destroy,用于销毁线程池。ide

以后咱们编写测试类

import java.util.Random;

/**
 * @Auther: DarkKing
 * @Date: 2019/5/4 12:09
 * @Description:
 */
public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 建立3个线程的线程池
        MyThreadPool t = new MyThreadPool(3,0);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy();// 全部线程都执行完成才destory
        System.out.println(t);
    }

    // 任务类
    static class MyTask implements Runnable {

        private String name;
        private Random r = new Random();

        public MyTask(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 执行任务
            try {
                Thread.sleep(r.nextInt(1000)+2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
            System.out.println("任务 " + name + " 完成");
        }
    }
}

能按照线程池的方式进行执行。

咱们简单地手动写了一个线程池,但以上线程池有哪些问题呢?

一、启动的时候就将全部线程启动了,若是长时间不用比较消耗资源。

二、没有任务饱和的一个错略。

三、没有任务超时机制等等。

3、JDK中的线程池和工做机制

咱们大体了解了线程池的一个机制,那咱们看下JDK中,是如何实现线程池的吧。

一、线程池的建立

JAVA中,ThreadPoolExecutor,是全部线程池实现的父类,类结构图以下。

 

它的构造函数含有如下参数

参数 含义
int corePoolSize  线程池中核心线程数,< corePoolSize  ,就会建立新线程,= corePoolSize  ,这个任务就会保存到BlockingQueue,若是调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize  个数的线程。
int maximumPoolSize 容许的最大线程数,若是BlockingQueue也满了,而且线程数< maximumPoolSize时候就会再次建立新的线程
long keepAliveTime 线程空闲下来后,存活的时间,这个参数只在线程数> corePoolSize才有用
TimeUnit unit 存活时间的单位值
BlockingQueue<Runnable> workQueue 保存任务的阻塞队列
ThreadFactory threadFactory 建立线程的工厂,给新建的线程赋予名字
RejectedExecutionHandler handler

饱和策略

AbortPolicy :直接抛出异常,默认;

CallerRunsPolicy:用调用者所在的线程来执行任务

DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务

DiscardPolicy :当前任务直接丢弃

实现本身的饱和策略只要实现RejectedExecutionHandler接口便可

提交任务

execute(Runnable command)  不须要返回

Future<T> submit(Callable<T> task) 须要返回值

关闭线程池

shutdown(),shutdownNow();

shutdownNow():设置线程池的状态,还会尝试中止正在运行或者暂停任务的线程

shutdown()设置线程池的状态,只会中断全部没有执行任务的线程

二、线程池的工做机制

 

 

一、若是工做线程数小于核心线程数,则建立工做线程

二、若是工做线程数等于或者大于核心线程数,则将任务提交到阻塞队列中

三、若是阻塞队列也满了,但线程数小于最大线程数,则建立新的线程

四、若是建立新的线程也满了,则执行任务饱和策略。

源码以下

三、如何合理配置线程池

根据任务的性质来:计算密集型(CPU),IO密集型,混合型

计算密集型:例如加密,大数分解,正则……等

推荐:机器的Cpu核心数+1,为何+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:读取文件,数据库链接,网络通信,

推荐:线程数适当大一点,机器的Cpu核心数*2,

混合型:尽可能拆分,若是IO密集型远远计算密集型,拆分意义不大。

在阻塞队列的选择上,应该使用有界,无界队列可能会致使内存溢出

四、预约义的线程池

Java中,帮咱们预约了5种线程池

一、FixedThreadPool

建立固定线程数量的线程池,适用于负载较重的服务器,使用了无界队列

二、SingleThreadExecutor

建立单个线程的线程池,适用于须要顺序保证执行任务,不会有多个线程业务,使用了无界队列

三、CachedThreadPool

会根据须要来建立新线程的,适用于执行不少短时间异步任务的程序,使用了SynchronousQueue

四、WorkStealingPool(JDK7之后)

工做密取线程池,基于ForkJoinPool实现。适用于大任务分解的线程池。

五、ScheduledThreadPoolExecutor

须要按期执行周期任务的线程池。有两种实现

newSingleThreadScheduledExecutor:只包含一个线程,只须要单个线程执行周期任务,保证顺序的执行各个任务

newScheduledThreadPool 能够包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候

方法说明:

schedule:只执行一次,任务还能够延时执行

scheduleAtFixedRate:提交固定时间间隔的任务

scheduleWithFixedDelay:提交固定延时间隔执行的任务

具体使用,你们能够自行百度,都比较多,通常若是并发比较高的业务中,以上线程池都不建议使用,由于他们采用的都是无界队列,任务量比较大的时候有可能致使内存溢出。通常正确用法是直接使用ThreadPoolExecutor进行建立,或根据自身需求进行自定义。

 

本章重点:线程池的建立,使用,以及运行原理。

其余阅读   并发编程专题

481021518c4b8fa00ba60ef9609c53b2b5f.jpg

相关文章
相关标签/搜索