【Java并发.6】结构化并发应用程序

   

6.1  在线程中执行任务java

  应用程序提供商但愿程序支持尽量多的用户,从而下降每一个用户的服务成本,而用户则但愿得到尽量快的响应。大多数服务器应用程序都提供了一种天然的任务边界选择方式:以独立的客户请求为边界。数据库

 

6.1.1  串行地执行任务浏览器

  在应用程序中能够经过多种策略来调度任务,而其中一些策略可以更好地利用潜在的并发性。最简单的策略就是在单个线程中串行地执行各项任务。缓存

  程序清单 6-1 :串行的 Web 服务器安全

public class SingleThreadWebServer {
    public static void main(String[] args) throws IOException{
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }
}

  SingleThreadWebServer  很简单,且在理论上是正确的,但在实际生产环境中的执行性能却很糟糕,由于它每次只能处理一个请求。服务器

  在服务器应用程序中,串行处理机制一般都没法提供高吞吐率或快速响应性。也有一些例外,例如,当任务数量不多且执行时间长时,或者当服务器只为单个用户提供服务,而且该客户每次只发出一种请求。网络

 

6.1.2  显示地为任务建立线程并发

  经过为每个请求建立一个新的线程来提供服务,从而实现更高的响应性,如程序清单 6-2 中的 ThreadTaskWebWebServer 所示。框架

  程序清单 6-2:在 Web 服务器中为每一个请求启动一个型的线程。socket

public class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable bleck = new Runnable() {
                public void run() {
                    //handleRequest(connection);
         }
       };
     }
  }
}

  对比 ThreadPerTaskWebServer 和 SingleThreadWebServer 区别在于,对于每一个链接,主循环都将建立一个新线程来处理请求,而不是在主循环中进行处理。所以可得出三个结论:

  • 任务处理过程从主线程中分离出来,使得主循环可以更快地从新等待下一个到来的链接。这使得程序在完成前面的请求以前能够接受新的请求,从而提升响应性。
  • 任务能够并行处理,从而能同时服务多个请求。若是有多个处理器,或者任务因为某种缘由被阻塞,例如等待 I/O 完成、获取锁或者资源可用性等,程序的吞吐量将获得提升。
  • 任务处理代码必须是线程安全的,由于当有多个任务时会并发地调用这段代码。

 

6.1.3  无限制建立线程的不足

  在生产环境中,“为每一个任务分配一个线程” 这种方法存在一些缺陷,尤为是当须要建立大量的线程时:

  • 线程生命周期的开销很是高:线程的建立与销毁并非没有代价的。根据平台的不一样,实际的开销也有所不一样,但线程的建立过程都会须要时间,延迟处理的请求,而且须要 JVM 和操做系统提供一些辅助操做。若是请求的到达率很是高且请求的处理过程是轻量级的,例如大多数服务器应用程序就是这种状况,那么为每一个请求建立一个新线程将消耗大量的计算资源。
  • 资源消耗:活跃的线程会消耗系统资源,尤为是内存。若是你已经拥有足够多的线程使 CPU 保持忙碌状态,那么再建立更多的线程反而会下降性能。
  • 稳定性:在可建立线程的数量上存在一个限制。这个限制随着平台的不一样而不一样,而且受到多个限制约束,包括 SVM 的启动参数、Thread 构造函数中请求的栈大小,以及底层操做系统对线程的限制等。若是破坏这些限制,则抛出 OutOfMemoryError 异常。

 

6.2  Executor 框架

   咱们已经分析了两种经过线程来执行任务的策略,即把全部任务放在单个线程中串行执行,以及将每一个任务放在各自的线程中执行。这两种方式都存在一些严格的限制:串行执行的问题在于其糟糕的响应性和吞吐量,而 “为每一个任务分配一个线程” 的问题在于资源管理的复杂性。在第五章中,咱们介绍了如何经过有界队列来防止高负荷的应用程序耗尽内存。线程池简化了线程的管理工做,而且 java.util.concurrent 提供了一种灵活的线程池实现做为 Executor 框架的一部分。在Java 类库中,任务执行的主要抽象不是 Thread,而是 Executor,如程序清单6-3:Executor 接口

public interface Executor {
    void execute(Runnable command);
}

 

6.2.1  示例:基于 Executor  的 Web 服务器

  基于 Executor 来构建 Web 服务器是很是容易的。在程序清单 6-4 中用 Executor 代替了硬编码的线程建立。在这种状况下使用了一种标准的 Executor 实现,即一个固定长度的线程池,能够容纳 100 个线程。

public class TaskExecutingWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exe = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    //handleRequest(connection);
                }
            };
            exe.execute(task);
        }
    }
}

  咱们能够很容易地将 TaskExecutionWebServer 修改成相似 ThreadPerTaskWebServer 的行为,只需使用一个为每一个请求都建立新线程的 Executor。程序清单 6-5:为每一个请求启动一个新线程的 Executor 

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

  一样,咱们能够编写一个 Executor 使 TaskExecutionWebServer 的行为相似于单线程的行为,如程序清单 6-6:在调用线程中以同步方式执行全部任务的 Executor 

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}
每当看到下面这种形式的代码时:
    new Thread(rennable).start();
而且你但愿得到一种更灵活的执行策略时,请考虑使用 Executor 来代替 Thread

 

6.2.3  线程池

  “在线程池中执行任务” 比 “为每一个任务分配一个线程” 优点更多。经过重用现有的线程而不是建立新线程,能够在处理多个请求时分摊在线程建立和销毁过程当中产生的巨大开销。另外一个额外的好处是,当请求到达时,工做线程一般已经存在,所以不会因为等待建立线程而延迟任务的执行,从而提升了响应性。

  类库提供了一个灵活的线程池以及一些有用的默认配置。能够经过调用 Executor 中的静态工厂方法之一来建立一个线程池:

  • newFixedThreadPool:将建立一个固定长度的线程池。(若是某个线程因为发生了未预期的 Exception 而结束,那么线程池会补充一个新的线程)。
  • newCachedThreadPool:将建立一个可缓存的线程池,若是线程池当前规模超过了处理需求,那么回收空闲的线程,而当需求增长时,则能够添加新的线程,线程池规模不存在任何限制。
  • newSingleThreadExecutor:一个单线程的 Executor,它建立单个工做线程来执行任务,若是线程异常结束,会建立另外一个线程来替代。
  • newScheduledThreadPool:建立一个固定长度的线程池,并且延迟或定时的方式来执行任务,相似 Timer。

 

6.2.4  Executor 的生命周期

  咱们已经知道如何建立一个 Executor,但没有讨论如何关闭它。Executor 的实现一般会建立线程来执行任务。但 JVM 只有在全部线程所有终止后才会退出。所以,若是没法正确地关闭 Executor,那么 JVM 将没法关闭。

  当关闭应用程序时,可能采用最平缓的关闭形式(完成全部已经启动的任务,而且再也不接受任何新的任务),也可能采用最粗暴的关闭形式(直接关闭电脑),以及其余各类可能的形式。

  为了解决执行服务的生命周期问题,Executor 扩展了 ExecutorService 接口,添加了一些用于生命周期管理的方法。

  程序清单 6-7:ExecutorService 中的生命周期管理方法 

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
    // ... 其余用于任务提交的便利方法
}

  shutdown 方法将执行平缓的关闭过程:再也不接受新的任务,同时等待已经提交的任务执行完成 -- 包括那些还未开始执行的任务。shutdownNow 方法将执行粗暴的关闭过程:它将尝试取消全部运行中的任务,而且再也不启动队列中还没有开始执行的任务。

  那么咱们尝试吧生命周期管理扩展到 Web服务器的功能。 程序清单 6-8:支持关闭操做的 Web 服务器

public class LifecycleWebServer {
    private final ExecutorService exe = ...;
    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exe.isShutdown()) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    //handleRequest(connection);
                }
            };
            exe.execute(task);
        }
    }
    public void stop() {
        exe.shutdown();
    }
    void handleRequest(Socket connection) {
        Request req = readRequest(connection);
        if (isShutdownRequest(connection)) {
            stop();
        } else {
            dispatchrequest(热情);
        }
    }
}

 

6.3  找出可利用的并行性

  本节咱们将开发一些不一样版本的组件,该示例实现浏览器程序中的页面渲染(Page-Rendering)功能,它的做用是将 HTML 页面绘制到图像缓存中。为了简便,假设 HTML 页面只包含标签文本,以及预约义大小的图片和 URL。

 

6.3.1  示例:串行的页面渲染器

  最简单的方式是对 HTML 文档进行串行处理,但这种方法可能会令用户感到烦恼,它们必须等待很长时间。另外一种串行执行方法更好一些,它先绘制文本元素,同时为图像预留出矩形的占位空间,在处理完了第一遍文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。

  程序清单 6-10:串行地渲染页面元素 

public class SingleThreadRender {
    void rederPage(CharSequence source) {
        renderText(source);
        List<ImageData> imageDataList = new ArrayList<ImageData>();
        for (ImageInfo imageInfo : scanFoeImageInfo(source)) {
            imageDataList.add(imageInfo.downloadImage());
        }
        for (ImageData image : imageDataList) {
            rederImage(image);
        }
    }
}

 

6.3.2  携带结果的任务 Callable 与 Future

  许多任务实际上都是存在延迟的计算----执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable 是一种更好的抽象:它认为主入口点(即 call)将返回一个值,并可能抛出异常。在Executor 中包含了一些辅助方法能将其余类型的任务封装为一个 Callable ,例如 Runable 和 java.security.privilegedAction。

  程序清单 6-11:Callable 与 Future 接口

public interface Callable<V> {
    V call() throws Exception;
}
public interface Future<V> {
    boolean cancel(boolean var1);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}

  

6.3.3  示例:使用 Future 实现页面渲染器

  为了使页面渲染器实现更高的并发性,首先将渲染过程分解为两个任务,一个是渲染全部的文本,另外一个是下载全部的图像。(由于其中一个任务时 CPU 密集型,一个是 IO 密集型,所以即便在单 CPU 系统上也能提高性能)

  程序清单 6-13:使用 Future 等待图像下载

public class FutureRender {
    private final ExecutorService executor = ...;
    void rederPage(CharSequence source) throws Exception{
        final List<ImageInfo> imageInfoList = scanFoeImageInfo(source);
        Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
            public List<ImageData> call() {
                List<ImageData> imageDataList = new ArrayList<ImageData>();
                for (ImageInfo imageInfo : imageInfoList) {
                    imageDataList.add(imageInfo.downloadImage());
                }
                return imageDataList
            }
        };
        Future<List<ImageData>> future = executor.submit(task);
        renderText(source);
        
        List<ImageData> imagedata = future.get();
        for (ImageData image : imagedata) {
            rederImage(image);
        }
    }
}

 

6.3.6  示例:使用 CompletionService 实现页面渲染器

  能够经过 CompletionService 从两个方面来提升页面渲染器的性能:缩短总运行时间以及提升响应性。为每一幅图像的下载都建立一个独立任务,并在线程池中实行它们。

  程序清单 6-15:使用 CompletionService ,使页面元素在下载完成后当即显示出来

public class Render {
    private final ExecutorService executor = ...;
    Render(ExecutorService exe) {
        this.executor = exe;
    }
    void rederPage(CharSequence source) throws Exception{
        final List<ImageInfo> imageInfoList = scanFoeImageInfo(source);
        CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageDara>(executor);
        for (final ImageInfo info: imageInfoList) {
            completionService.submit(new Callable<ImageData>() {
                public List<ImageData> call() {
                    return info.downloadImage();
                }
            });
        }
        renderText(source);
        
        for (int i = 0, n = imageInfoList.size(); i < n; i++) {
            Future<ImageData> f = completionService.take();
            ImageData imageData = f.get();
            rederImage(imageData);
        }
    }
}

 

6.3.7  为任务设置时限

  程序清单 6-16:在指定时间内获取广告信息

Page RenderPageWithAd() throws Exception {
        long endNanos = System.nanoTime() + TIME_BUDGET;
        Future<Ad> f = exe.submit(new FetchAdTask());
        //在等待广告的同时显示页面
        Page page = renderPageBody();
        Ad ad;
        //指等待指定的时间长度
        long timeLeft = endNanos - System.nanoTime();
        ad = f.get(timeLeft, NANOSECONDS);
    }

 

6.3.8  示例:批量 为任务设置时限

List<Future<Integer>> futures = exec.invokeAll(tasks, time, unit);

  ExecutorService 中 invokeAll 方法参数为一组任务,并返回一组 Future。

相关文章
相关标签/搜索