在并发相关,不单单依靠以前介绍的各类锁或者队列操做--JAVA并发之多线程基础(5),同时咱们也须要考虑到资源的消耗状况(力扣上各类题目比消耗与时间。。)。这个时候咱们就引入了线程池。java
针对于你们熟悉的Executors
进行入手,咱们常常性的使用里面的线程池。固然,根据阿里巴巴的规范手册上来讲,不建议咱们直接经过这个类去建立一个线程池,须要经过ThreadPoolExecutor
自行去建立,这样会让咱们懂得线程池中的线程各个时间的状态变化,以防止线程池中的线程异常。缓存
Executors
newFixedThreadPool(int nThreads)
建立一个固定大小的线程池。newSingleThreadExecutor()
建立单一线程的线程池newCachedThreadPool()
建立一个带缓存的线程池,里面的线程会执行完成以后会存在一段时间,没有执行就释放。newScheduledThreadPool(int corePoolSize)
建立一个计划任务的线程池前三个里面的实现都是利用了ThreadPoolExecutor
,只不过传入的参数是不一样的,而后造就了不一样的线程池,接下来就看看ThreadPoolExecutor
里面参数的各个含义。多线程
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
corePoolSize
核心线程池大小maximumPoolSize
线程池最大容量keepAliveTime
线程存活时间unit
存活时间的单位workQueue
阻塞队列存放里面执行任务默认会帮咱们填充的两个参数:并发
threadFactory
线程工厂,用于产生线程放入到线程池中handler
拒绝策略处理器,用于当前线程池中拒绝多余的任务等,默认是AbortPolicy
拒绝策略线程工厂里面就会帮咱们产生一个个线程放入到线程池中:jvm
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())//是否为守护线程
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)//是否为默认的优先级
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
复制代码
在线程池中的execute(Runnable command)
方法是会去帮助咱们去执行咱们所须要的任务:ide
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//ctl是一个包装的原子类,里面包含了线程的数量以及状态
if (workerCountOf(c) < corePoolSize) {//工做线程数量小于当前的核心线程数
if (addWorker(command, true))//加入到队列中,而且true表明使用核心线程
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//判断线程池是否在运行,同时将任务加入到队列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//若是线程池不是运行状态则进行拒绝任务操做
reject(command);
else if (workerCountOf(recheck) == 0)//若是线程数为0,则开辟一个线程去执行,不采用核心线程
addWorker(null, false);
}
else if (!addWorker(command, false))//加入队列失败则进行线程数增长,这里采用的线程是大于核心线程数小于最大线程数。
reject(command);
}
复制代码
ForkJoinTask
这是一个比较特殊的线程池,能够将一个很大的任务进行分解成为若干个小的任务去执行。执行完成以后再将每一个任务的结果进行整合返回。底下派生出两个抽象类:
RecursiveAction
是没有返回值的,只是将任务划分去执行。RecursiveTask
是有返回值的,将任务执行完成以后结果整合进行返回。post
package com.montos.lock;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
super();
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;//阈值判断
if (canCompute) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
long step = (start + end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end)
lastOne = end;
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();//子线程进行求解
}
for (CountTask t : subTasks) {
sum += t.join();//返回全部的结果集进行求和返回
}
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0, 200000l);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
Long res = result.get();
System.out.println("result is :" + res);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
复制代码
上面是有返回值的demo,执行完以后控制台会会出现result is :20000100000
这个返回结果。this
有些小伙伴可能会遇到两个错误(修改里面的参数值,而致使出现的问题,这里我就遇到了!!):spa
java.util.concurrent.ExecutionException:java.lang.StackOverflowError
。缘由是由于ForkJoin
不会对堆栈进行控制,编写代码时注意方法递归不能超过jvm的内存,若是必要须要调整jvm的内存:在Eclipse中JDK的配置中加上 -XX:MaxDirectMemorySize=128
(默认是64M)。改成128后不报栈溢出,可是报下一个错。java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node
。这个致使的缘由是由于子任务的处理长度不平衡。咱们须要对原来的长度进行计算处理。至此
JDK
中大部分的并发类都谈及到用法,对于底层代码的描述和处理,这块期待我以后的文章。线程