JAVA并发之多线程基础(6)

在并发相关,不单单依靠以前介绍的各类锁或者队列操做--JAVA并发之多线程基础(5),同时咱们也须要考虑到资源的消耗状况(力扣上各类题目比消耗与时间。。)。这个时候咱们就引入了线程池。java

针对于你们熟悉的Executors进行入手,咱们常常性的使用里面的线程池。固然,根据阿里巴巴的规范手册上来讲,不建议咱们直接经过这个类去建立一个线程池,须要经过ThreadPoolExecutor自行去建立,这样会让咱们懂得线程池中的线程各个时间的状态变化,以防止线程池中的线程异常。缓存

Executors

  • newFixedThreadPool(int nThreads) 建立一个固定大小的线程池。
  • newSingleThreadExecutor()建立单一线程的线程池
  • newCachedThreadPool()建立一个带缓存的线程池,里面的线程会执行完成以后会存在一段时间,没有执行就释放。
  • newScheduledThreadPool(int corePoolSize)建立一个计划任务的线程池

前三个里面的实现都是利用了ThreadPoolExecutor,只不过传入的参数是不一样的,而后造就了不一样的线程池,接下来就看看ThreadPoolExecutor里面参数的各个含义。多线程

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
  • corePoolSize 核心线程池大小
  • maximumPoolSize 线程池最大容量
  • keepAliveTime 线程存活时间
  • unit 存活时间的单位
  • workQueue 阻塞队列存放里面执行任务

默认会帮咱们填充的两个参数:并发

  • threadFactory 线程工厂,用于产生线程放入到线程池中
  • handler 拒绝策略处理器,用于当前线程池中拒绝多余的任务等,默认是AbortPolicy拒绝策略

线程工厂里面就会帮咱们产生一个个线程放入到线程池中:jvm

public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())//是否为守护线程
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)//是否为默认的优先级
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
复制代码

在线程池中的execute(Runnable command)方法是会去帮助咱们去执行咱们所须要的任务:ide

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//ctl是一个包装的原子类,里面包含了线程的数量以及状态
        if (workerCountOf(c) < corePoolSize) {//工做线程数量小于当前的核心线程数
            if (addWorker(command, true))//加入到队列中,而且true表明使用核心线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//判断线程池是否在运行,同时将任务加入到队列中
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//若是线程池不是运行状态则进行拒绝任务操做
                reject(command);
            else if (workerCountOf(recheck) == 0)//若是线程数为0,则开辟一个线程去执行,不采用核心线程
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//加入队列失败则进行线程数增长,这里采用的线程是大于核心线程数小于最大线程数。
            reject(command);
    }
复制代码

ForkJoinTask

这是一个比较特殊的线程池,能够将一个很大的任务进行分解成为若干个小的任务去执行。执行完成以后再将每一个任务的结果进行整合返回。底下派生出两个抽象类:RecursiveAction是没有返回值的,只是将任务划分去执行。RecursiveTask是有返回值的,将任务执行完成以后结果整合进行返回。post

package com.montos.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long> {
	private static final long serialVersionUID = 1L;
	private static final int THRESHOLD = 10000;
	private long start;
	private long end;

	public CountTask(long start, long end) {
		super();
		this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute() {
		long sum = 0;
		boolean canCompute = (end - start) < THRESHOLD;//阈值判断
		if (canCompute) {
			for (long i = start; i <= end; i++) {
				sum += i;
			}
		} else {
			long step = (start + end) / 100;
			ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
			long pos = start;
			for (int i = 0; i < 100; i++) {
				long lastOne = pos + step;
				if (lastOne > end)
					lastOne = end;
				CountTask subTask = new CountTask(pos, lastOne);
				pos += step + 1;
				subTasks.add(subTask);
				subTask.fork();//子线程进行求解
			}
			for (CountTask t : subTasks) {
				sum += t.join();//返回全部的结果集进行求和返回
			}
		}
		return sum;
	}
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(0, 200000l);
		ForkJoinTask<Long> result = forkJoinPool.submit(task);
		try {
			Long res = result.get();
			System.out.println("result is :" + res);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}

	}
}

复制代码

上面是有返回值的demo,执行完以后控制台会会出现result is :20000100000这个返回结果。this

有些小伙伴可能会遇到两个错误(修改里面的参数值,而致使出现的问题,这里我就遇到了!!):spa

  • 1.java.util.concurrent.ExecutionException:java.lang.StackOverflowError。缘由是由于ForkJoin不会对堆栈进行控制,编写代码时注意方法递归不能超过jvm的内存,若是必要须要调整jvm的内存:在Eclipse中JDK的配置中加上 -XX:MaxDirectMemorySize=128(默认是64M)。改成128后不报栈溢出,可是报下一个错。
  • 2.java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node。这个致使的缘由是由于子任务的处理长度不平衡。咱们须要对原来的长度进行计算处理。

至此JDK中大部分的并发类都谈及到用法,对于底层代码的描述和处理,这块期待我以后的文章。线程

相关文章
相关标签/搜索