/** * 串行处理请求: * 简单正确,但性能低下 */ public class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(80); boolean listening = true; while (listening){ Socket connection = server.accept(); //阻塞等待客户端链接请求 handlerRequest(connection); } server.close(); } ... }
/** * 为每个用户请求建立一个线程为其服务 */ public class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(80); boolean listening = true; while (listening){ final Socket connection = server.accept(); //阻塞等待客户端链接请求 Runnable task = new Runnable() { @Override public void run() { handlerRequest(connection); } }; new Thread(task).start(); } server.close(); } ... }上面的实现至少能给咱们一些暗示:
/** * 基于线程池的Web服务器 */ public class ThreadPerTaskWebServer { private static final int NTHREADS = 100; /** * 建立固定线程数量的线程池 */ private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(80); boolean listening = true; while (listening){ final Socket connection = server.accept(); //阻塞等待客户端链接请求 Runnable task = new Runnable() { @Override public void run() { handlerRequest(connection); } }; exec.execute(task); } server.close(); } ... }
执行策略须要考虑的有: java
Executors提供了几种建立线程池的方法: 安全
//建立固定长度的线程池,每当提交一个任务时就建立一个线程,直到达到线程池的最大数量,如有线程发生异常,则会从新建立 public static ExecutorService newFixedThreadPool(int nThreads) {...} //建立单个线程来执行任务,若该线程发生异常,会建立一个新的线程。该池可按顺序执行队列中的任务(如FIFO,LIFO,优先级等) public static ExecutorService newSingleThreadExecutor() {...} //该线程池无长度限制,在线程过多时会回收,过少时会建立 public static ExecutorService newCachedThreadPool() {...} //建立一个固定长度的线程池,并以延迟或定时的方式执行任务 public static ScheduledExecutorService newScheduledThreadPool(...}
public interface ExecutorService extends Executor { void shutdown();//平缓关闭,不接受新任务,待提交的任务执行完毕后,再关闭 List<Runnable> shutdownNow();//粗暴关闭,尝试取消全部执行中的任务,再也不启动队列中还没有开始执行的任务 boolean isShutdown(); //是否已关闭 boolean isTerminated(); //是否已终止 boolean awaitTermination(long timeout, TimeUnit unit)//等待ExecutorService到达终止状态 throws InterruptedException; ... }
/** * 对线程池进行生命周期管理 */ public class LifecycleWebServer { private static final int NTHREADS = 100; private final ServerSocket server; public LifecycleWebServer() throws IOException{ server = new ServerSocket(80); } /** * 建立固定线程数量的线程池 */ private static final ExecutorService exec = Executors.newFixedThreadPool(NTHREADS); public void start() throws IOException{ while (!exec.isShutdown()){ try { final Socket connection = server.accept(); //阻塞等待客户端链接请求 Runnable task = new Runnable() { @Override public void run() { handlerRequest(connection); } }; exec.execute(task); } catch (RejectedExecutionException e) { if (!exec.isShutdown()){ //task submission is rejected } } } } public void stap() throws IOException{ exec.shutdown(); //平缓关闭线程池 server.close(); } private static void handlerRequest(Socket connection) { // handle request } }
/** * 错误的Timer行为,Timer是脆弱的 */ public class OutOfTime { public static void main(String[] args) throws InterruptedException { Timer timer = new Timer(); timer.schedule(new ThrowTask(), 1); //第一个任务抛出异常 Thread.sleep(1000); timer.schedule(new ThrowTask(), 1); //第二个任务将不能再执行, 并抛出异常Timer already cancelled. Thread.sleep(5000); System.out.println("end."); } static class ThrowTask extends TimerTask{ @Override public void run() { throw new RuntimeException("test timer's error behaviour"); } } }
/** * 串行地渲染页面元素, 性能很低下 * 下载图片过程当中有可能IO时间长阻塞, * CPU没能有效利用 */ public class SingleThreadRenderer { void rendererPage(CharSequence source){ renderText(source); List<ImageData> imageDatas = new ArrayList<>(); //解析文本中的图片链接 for (ImageInfo imageInfo : scanForImage(source)){ imageDatas.add(imageInfo.downloadImageData()); //下载图片 } //渲染图片 for (ImageData data : imageDatas){ renderImage(data); } } ... }
/** * 使用Future等待图像下载 * 将渲染过程分为: * IO密集型(下载图像) * CPU密集型(渲染页面) * 但这里仍然必须图片下载完成了才能看到页面,只是缩短了总时间 */ public class FutureRenderer { private final ExecutorService exec = Executors.newFixedThreadPool(10); void rendererPage(CharSequence source){ final List<ImageInfo> imageInfos = scanForImage(source); //抽出图片连接信息 Callable<List<ImageData>> task = new Callable<List<ImageData>>() { @Override public List<ImageData> call() throws Exception { List<ImageData> result = new ArrayList<>(); for (ImageInfo imageInfo : imageInfos){ result.add(imageInfo.downloadImageData()); //下载图片 } return result; } }; Future<List<ImageData>> future = exec.submit(task); //提交下载图片的任务 renderText(source); //渲染文本 try { List<ImageData> imageDatas = future.get();//阻塞获取下载的图片 for (ImageData data : imageDatas){ //渲染图片 renderImage(data); } } catch (InterruptedException e) { //从新设置线程的中断状态 Thread.currentThread().interrupt(); future.cancel(true); } catch (ExecutionException e) { // handle exception } } ... }
/** * 使用CompletionService, 使页面元素在下载完成后当即显示出来 * 相似Mobile中的新闻加载,图片时被异步加载的 */ public class Renderer { private final ExecutorService executor; public Renderer(ExecutorService executor){ this.executor = executor; } void rendererPage(CharSequence source){ final List<ImageInfo> imageInfos = scanForImage(source); //抽出图片连接信息 CompletionService<ImageData> completionService = new ExecutorCompletionService<>(this.executor); for (final ImageInfo imageInfo : imageInfos){ //提交下载图片的任务, 每下载一个图片就是一个任务,达到下载图片并行性 completionService.submit(new Callable<ImageData>() { //内部会将执行完后封装的Future对象放到一个BlockingQueue中 @Override public ImageData call() throws Exception { return imageInfo.downloadImageData(); } }); } renderText(source); //渲染文本 try { for (int i=0, n=imageInfos.size(); i<n; i++){ Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); //渲染图片 } } catch (InterruptedException e) { //从新设置线程的中断状态 Thread.currentThread().interrupt(); } catch (ExecutionException e) { // handle exception e.getCause() } } ... }
任务超时设置可经过Future的get超时版本: 服务器
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;你能够捕获其TimeoutException来作相应处理便可。
能够经过ExecutorService提交一组任务: 多线程
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;不吝指正。