提供Executor的工厂类 java
忽略了自定义的ThreadFactory、callable和unconfigurable相关的方法bash
newFixedxxx:在任意时刻,最多有nThreads个线程在处理task;若是全部线程都在运行时来了新的任务,它会被扔入队列;若是有线程在执行期间因某种缘由终止了运行,若是须要执行后续任务,新的线程将取代它多线程
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
复制代码
newCachedxxx:新任务到来若是线程池中有空闲的线程就复用,不然新建一个线程。若是一个线程超过60秒没有使用,它就会被关闭移除线程池并发
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
复制代码
newSingleThreadExecutor:仅使用一个线程来处理任务,若是这线程挂了,会产生一个新的线程来代替它。每个任务被保证按照顺序执行,并且一次只执行一个框架
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
使用newFixedxxx方法也能实现相似的做用,可是ThreadPoolExecutor会提供修改线程数的方法,FinalizableDelegatedExecutorService则没有修改的途径,它在DelegatedExecutorService的基础 上仅提供了执行finalize时候去关闭线程,而DelegatedExecutorService仅暴漏ExecutorService自身的方法socket
newScheduledThreadPool:提供一个线程池来延迟或者按期执行任务性能
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
newSingleThreadScheduledExecutor:提供单个线程来延迟或者按期执行任务,若是执行的线程挂了,会生成新的。网站
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
复制代码
一样,它保证返回的Executor自身的线程数不可修改ui
从上述的实现能够看出,核心在于三个部分this
对应的,相应也有不一样的队列去实现不一样的场景
它仅仅是包装了ExecutorService的方法,交由传入的ExecutorService来执行,所谓的UnConfigurable实际也就是它没有暴漏配置各类参数调整的方法
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
复制代码
提供一系列的schedule方法,使得任务能够延迟或者周期性的执行,对应schedule方法会返回ScheduledFuture以供确认是否执行以及是否要取消。它的实现ScheduledThreadPoolExecutor也支持当即执行由submit提交的任务
仅支持相对延迟时间,好比距离如今5分钟后执行。相似Timer也能够管理延迟任务和周期任务,可是存在一些缺陷:
- 全部的定时任务只有一个线程,若是某个任务执行时间长,将影响其它TimerTask的精确性。
ScheduledExecutorService的多线程机制可弥补
- TimerTask抛出未检查的异常,将终止线程执行,此时会错误的认为任务都取消了。
1:可使用try-catch-finally对相应执行快处理;2:经过execute执行的方法能够设置UncaughtExceptionHandler来接收未捕获的异常,并做出处理;3:经过submit执行的,将被封装层ExecutionException从新抛出
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
默认状况下,只有新任务到达的时候才会启动线程,可经过prestartCoreThread方法实现事先启动
- corePoolSize:默认线程池所须要维护的最小的worker的数量,就算是worker过时了也会保留。若是想要不保留,则须要设置allowCoreThreadTimeOut,此时最小的就是0
- maximumPoolSize:线程池最大的线程数。java限制最多为 2^29-1,大约5亿个
Executors.defaultThreadFactory
ThreadPoolExecutor可自定义beforeExecutor、afterExecutor能够用来添加日志统计、计时、件事或统计信息收集功能,不管run是正常返回仍是抛出异常,afterExecutor都会被执行。若是beforeExecutor抛出RuntimeException,任务和afterExecutor都不会被执行。terminated在全部任务都已经完成,而且全部工做者线程关闭后会调用,此时也能够用来执行发送通知、记录日志等等。
cpu的个数
将一个网站的业务抽象成以下几块
理论上,服务端经过实现约定的接口就能够实现接收请求和处理接二连三的请求过来
ServerSocket socket = new ServerSocket(80);
while(true){
Socket conn = socket.accept();
handleRequest(conn)
}
复制代码
缺点:每次只能处理一个请求,新请求到来时,必须等到正在处理的请求处理完成,才能接收新的请求
为每一个请求建立新的线程提供服务
ServerSocket socket = new ServerSocket(80);
while(true){
final Socket conn = socket.accept();
Runnable task = new Runnable(){
public void run(){
handleRequest(conn);
}
}
new Thread(task).start();
}
复制代码
缺点:
使用java自带的Executor框架。
private static final Executor exec = Executors.newFixedThreadPool(100);
...
ServerSocket socket = new ServerSocket(80);
while(true){
final Socket conn = socket.accept();
Runnable task = new Runnable(){
public void run(){
handleRequest(conn);
}
}
exec.execute(task);
}
...
复制代码
线程池策略经过实现预估好的线程需求,限制并发任务的数量,重用现有的线程,解决每次建立线程的资源耗尽、竞争过于激烈和频繁建立的问题,也囊括了线程的优点,解耦了任务提交和任务执行。
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for(ImageInfo info:scaForImageInfo(source)){
imageData.add(info.downloadImage());
}
for(ImageData data:imageData){
renderImage(data);
}
复制代码
缺点:图像的下载大部分时间在等待I/O操做执行完成,这期间CPU几乎不作任何工做,使得用户看到最终页面以前要等待过长的时间
渲染过程能够分红两个部分,1是渲染文本,1是下载图像
private static final ExecutorService exec = Executors.newFixedThreadPool(100);
...
final List<ImageInfo> infos=scaForImageInfo(source);
Callable<List<ImageData>> task=new Callable<List<ImageData>>(){
public List<ImageData> call(){
List<ImageData> r = new ArrayList<ImageData>();
for(ImageInfo info:infos){
r.add(info.downloadImage());
}
return r;
}
};
Future<List<ImageData>> future = exec.submit(task);
renderText(source);
try{
List<ImageData> imageData = future.get();
for(ImageData data:imageData){
renderImage(data);
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
future.cancel(true);
}catche(ExecutionException e){
throw launderThrowable(e.getCause());
}
复制代码
使用Callable来返回下载的图片结果,使用future来得到下载的图片,这样将减小用户所须要的等待时间。
缺点:图片的下载很明显时间要比文本要慢,这样的并行化极可能速度可能只提高了1%
使用CompletionService。
private static final ExecutorService exec;
...
final List<ImageInfo> infos=scaForImageInfo(source);
CompletionService<ImageData> cService = new ExecutorCompletionService<ImageData>(exec);
for(final ImageInfo info:infos){
cService.submit(new Callable<ImageData>(){
public ImageData call(){
return info.downloadImage();
}
});
}
renderText(source);
try{
for(int i=0,n=info.size();t<n;t++){
Future<ImageData> f = cService.take();
ImageData imageData=f.get();
renderImage(imageData)
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}catche(ExecutionException e){
throw launderThrowable(e.getCause());
}
复制代码
核心思路为为每一幅图像下载都建立一个独立的任务,并在线程池中执行他们,从而将串行的下载过程转换为并行的过程
广告展现若是在必定的时间之内没有获取,能够再也不展现,并取消超时的任务。
ExecutorService exe = Executors.newFixedThreadPool(3);
List<MyTask> myTasks = new ArrayList<>();
for (int i=0;i<3;i++){
myTasks.add(new MyTask(3-i));
}
try {
List<Future<String>> futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS);
for (int i=0;i<futures.size();i++){
try {
String s = futures.get(i).get();
System.out.println("task execut "+myTasks.get(i).getSleepSeconds()+" s");
} catch (ExecutionException e) {
System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ");
}catch (CancellationException e){
System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ,because "+e);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
exe.shutdown();
复制代码
invokeAll方法对于没有完成的任务会被取消,经过CancellationException能够捕获,invokeAll返回的序列顺序和传入的task保持一致。结果以下:
task sleep 3 not execute ,because java.util.concurrent.CancellationException
task sleep 2 not execute ,because java.util.concurrent.CancellationException
task execut 1 s
复制代码