从0学习java并发编程实战-读书笔记-结构化并发应用程序(6)

在线程中执行任务

在理想状况下,任务之间都是相互独立的:任务并不依赖于其余任务的状态,结果或边界效应。独立性有助于实现并发。java

大多数服务器的应用程序都提供了一个天然的任务边界:以独立的客户请求为边界。web

串行地执行任务

最简单的方式就是在单个线程中串行的执行各项任务。可是现实中的web服务器的状况却并不是如此。在web请求中包含了一组不一样的运算和I/O操做。服务器必须处理套接字I/O以读取请求和写回响应,这些操做一般会因为网络或者连通性问题而被阻塞。数据库

在服务器应用中,串行机制一般没法提供高吞吐率和快速响应性。缓存

显示地为任务建立线程

若是为每一个请求建立一个新的线程来提供服务,特色有:安全

  • 任务处理过程将主线程中分离出来,使主循环可以更快的接受下一个到来的链接,使得程序在完成前面的请求以前能够接受更多新的请求,从而提升响应性。
  • 任务能够并行处理,从而能够同时服务多个请求。程序的吞吐量将会提升。
  • 任务处理代码必须是线程安全的,由于将会有多个任务并发的调用这段代码。

只要请求的到达速率不超过服务器的请求处理能力,那么这种方法能够同时带来更快的响应性和更高的吞吐率。服务器

无限制创造线程的不足

在生产环境中,若是为每一个任务分配一个线程,这种方法有着一些缺陷,尤为是当须要建立大量线程的时候:网络

  • 线程生命周期的开销很是高:若是请求的到达率很是高,且处理过程是轻量级的,那么没建立和销毁一个新线程将消耗大量的计算资源。
  • 资源消耗:活跃的线程会消耗系统资源,尤为是内存。若是可运行的线程数量大于可用处理器的数量,那么有些线程将会闲置,并且大量线程在竞争cpu资源还会产生其余的性能开销。
  • 稳定性:在可建立线程的数量上存在一个限制。这个限制随着平台的不一样的,受到多个制约因素,包括:数据结构

    • JVM的启动参数。
    • Thread构造函数中请求栈的大小
    • 以及底层的操做系统对线程的限制等。若是破坏了这些限制,可能抛出OOM。

在必定范围内,增长线程能够提高系统的吞吐率,可是若是超过了这个范围,再建立线程只会下降系统的执行速度,而且若是过多地建立一个线程,那么整个应用也许都会崩溃。多线程

Executor框架

任务是一组逻辑工做单元,而线程是使任务异步执行的机制。并发

线程池优化了线程管理工做,而且java.util.concurrent提供了一种灵活的线程池实现做为Executor框架的一部分。在java类库中,任务执行的主要抽象不是Thread,而是Executor.

public interface Executor{
    void execute(Runnable command);
}

它提供了一种标准的方法将任务的提交过程与执行过程解耦,并用Runnable表示任务。

Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能坚实等机制。

Executor基于生产者和消费者模式,提交任务的操做至关于生产者(生产待完成的工做单元),执行任务的操做至关于消费者。

基于Executor的web服务器

标准的Executor实现:

// 建立线程池
static final Executor executor = Executors.newFixedThreadPool(num);

// 建立任务 
Runnable task = new Runnable(){
    public void run(){
        doSomething();
    }
}

// 执行线程
executor.execute(task);

经过使用Executor,将请求任务的提交与任务的实际行为解耦,并只须要采用另外一种不一样的Executor实现,就能够改变服务器行为。

执行策略

经过将任务的提交和执行解耦开,就能够无需太大的困难为某种类型的任务指定或修改执行策略。最佳策略取决于可用的计算资源以及对服务质量的需求。经过限制并发任务的数量,能够确保应用程序不会因为资源耗尽而失败,或者因为在稀缺资源的竞争而影响性能。

每当看到 new Thread(Runnable).start() 时,而且你但愿得到一个更加灵活的执行策略时,请使用Executor来代替Thread

线程池

线程池是指管理同一组同构工做线程的资源池。线程池是与工做队列密切相关的,工做队列中保存了全部等待执行的任务。工做者线程的任务很简单:从工做队列中获取一个任务,执行任务,而后回到线程池等待下一个任务。

经过重用现有线程而不是建立新线程,能够在处理多个请求的时候分摊掉建立和销毁线程的成本。并且在请求到达的时候,工做线程通常已经存在,就不须要等待线程建立的时间,提升了响应性。经过限制线程池大小,还能够避免多线程之间过分竞争资源,致使程序耗尽内存。

Executor的静态工厂方法

  • newFixedThreadPool:newFixedThreadPool将建立一个固定长度的线程池,每当提交一个任务时就建立一个线程,直到达到线程的最大数量,这时线程池的规模将再也不变化。
  • newCacheThreadPool:newCacheThreadPool将建立一个可缓存的线程池,若是线程池的当前规模超过了处理需求时,那么将回收空闲的线程。而当需求增长的时候,会添加新的线程,线程池规模不受限制。
  • newSingleThreadExecutor:newSingleThreadExecutor是一个单线程Executor,它建立单个工做线程来执行任务,若是这个线程异常结束,会建立另外一个线程来替代,能确保依照任务在队列中的顺序来串行执行。
  • newScheduledThreadPool:newScheduledThreadPool建立了一个固定长度的线程池,并且以延迟或者定时的方式来执行任务,相似Timer。

newFixedThreadPool和newCacheThreadPool这两个方法返回通用的ThreadPoolExecutor实例,能够用来构建专门用途的executor。

Executor的生命周期

Executor的实现一般会建立线程来执行任务。但JVM只有在全部的非守护线程所有终止后才会退出。若是没法正确的关闭Executor,那么JVM将没法结束。
为了解决执行服务生命周期的问题,ExecutorService拓展了Executor接口,添加了一些生命周期的管理方法。

public interface ExecutorService extends Executor{
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException;
}

ExecutorService的生命周期有3种状态:运行关闭终止

  • shutdown()将执行平缓的关闭过程:再也不接受新的任务,同时等待已提交的任务执行完成,包括那些还未开始执行的任务。
  • shutdownNow()将执行粗暴的关闭过程:它将尝试取消全部运行中的任务,而且再也不启动队列中还没有开始执行的任务。

在ExecutorService关闭后提交的任务将由拒绝执行处理器(Rejected Execution Handle)来处理,它会抛弃任务,或使execute方法抛出一个未检查的RejectedExecutionException。等全部任务完成后,ExecutorService将转入终止状态。

延迟任务与周期任务

Timer类负责管理延迟任务(“在100ms后执行该任务”)以及周期任务(“每100ms执行一次该任务”)。然而,Timer存在一些缺陷,所以应该考虑使用ScheduledThreadPoolExecutor来替代它(Timer支持的是绝对时间而不是相对时间的调度制度,所以任务的执行对系统时间很是敏感。而ScheduledThreadPoolExecutor只支持系统相对时间)。

Timer在执行全部定时任务时只会建立一个线程。若是某个任务的执行时间过长,那么会破坏其余timerTask的定时精准性。例如某个周期TimerTask须要每10ms执行一次,而另外一个Task执行了50ms,那么TimerTask会在50ms之后快速的连续调用5次,或者直接丢掉这5次执行。(取决于Timer是基于固定速率仍是说基于固定延时来调度)。

Timer的另外一个问题是,若是TimerTask抛出了一个未检查的异常,Timer线程并不捕获异常。所以当TimerTask抛出未检查的异常时,将终止定时线程。这种状况下,Timer不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。所以已经被调度可是未执行的Task不会被执行,新的任务也不会被调度。

如今基本不会使用Timer

找出可利用的并行性

Executor框架帮助指定执行策略,可是若是要使用Executor,必须将任务表述为一个Runnable。在大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。可是不少边界并不是明显可见的,即便是服务器应用程序,在用户请求中仍存在可发掘的并行性,例如数据库服务器。

携带结果的任务Callable和Future

Executor框架使用Runnable做为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或将结果放入某个共享的数据结构,可是它不能返回一个值或抛出一个受检查的异常

对于某些存在延迟的计算,Callable是一种更好的抽象:它认为主入口点(即call)将返回一个值,并可能抛出一个异常。在Executor中包含了一些辅助方法,能将其余类型的任务封装为一个Callable,例如Runnable和java.security.PrivilegedAction。

Runnable和Callable描述的都是抽象的计算任务。这些任务一般是有范围的,即都有一个明确的起始点,而且最终都会结束。

Executor执行的任务有4个生命周期阶段:

  • 建立
  • 提交
  • 开始
  • 完成

在Executor框架中,已提交但还没有开始的任务能够取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何的影响。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成取消,以及获取任务的结果和取消任务。
Future规范中包含的隐含含义是:任务的生命周期只能前进,不能后退,当某个任务完成后,它将永远的停留在完成状态上。
get方法的行为取决于任务的状态(还没有开始,正在运行,已完成)。

  • 若是任务已经完成,那么get会当即返回或者抛出一个Exception。
  • 若是任务没有完成,那么get将阻塞并直到任务完成。
  • 若是任务抛出了异常,那么get将该异常封装为ExecutionException并从新抛出。
  • 若是任务被取消,那么get将抛出CancellationException。
  • 若是get抛出了ExecutorException,那么能够经过getCause获取被封装的初始异常。
public interface Callable<V>{
    V call() throws Exception;
}

public interface Future<V>{
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCanceled();
    boolean isDone();
    V get() throws InterruptedException, ExcutionException, CancellationException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExcutionException,
                 CancellationException, TimeoutException;
}

能够经过不少方法建立一个Future来描述任务。ExecutorService中的全部submit方法均可以返回一个Future,从而将一个Runnable和Callable提交给Executor,并获得一个Future用来得到任务的执行结果或取消任务。还能够显式的为某个指定的Runnable或者Callable实例化一个FutureTask。

Future和Callable例子:

Callable<List<A>> task = new Callable<List<A>>(){
    public List<A> call(){
        List<A> list  = new ArrayList();
        return list;
    }
}

Future<List<A>> future = executor.submit(task);

List<A> list = future.get();

get方法拥有状态依赖的内在特性,于是调用者不须要知道任务的状态,此外在任务提交和得到结果中包含的安全发布属性也确保了这个方法是线程安全的。

在异构任务并行化中存在的局限

经过对异构任务进行并行化来得到重大的性能提高是很困难的。若是没有在类似的任务之间找出细粒度的并行性,那么这种方法带来的好处就回减小。只有当大量相互独立且同构的任务能够并发进行处理时,才能体现出将程序的工做负载分配到多个任务中带来的真正的性能提高。

CompletionService:Executor与BlockingQueue

CompletionService将Executor和BlockingQueue的功能融合在一块儿。能够将Callable任务提交给他来执行,而后使用相似于队列操做的take和poll等方法来得到已完成的结果,这个结果将会封装为Future。

ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。

ExecutorCompletionService的实现

  • 在构造函数中建立一个BlockingQueue来保存计算完成的结果。
  • 当计算完成时,调用Future-Task中的done方法。
  • 当提交某个任务时,该任务首先包装为一个QueueingFuture,是FutureTask的一个子类。
  • 改写子类的done()方法,并将结果放入BlockingQueue中。take和poll方法委托BlockingQueue,这些方法将会在出结果以前阻塞。
private class QueueingFuture<V> extends FutureTask<V> {
    QueueingFuture(Callable<V> c) { super(c); }
    QueueingFuture(Runnable t , V r) { super(t,r); }

    protected void done() {
        completionQueue.add(this);
    }
}

为任务设置时限

若是某个任务没法在指定时间内完成,若是将再也不须要它的结果,此时能够放弃这个任务。但可能只会在指定的时间内等待数据,若是超出了时间,那么只显示已经得到的数据。
要实现这个功能,能够由任务自己来管理它的限定时间,而且在超时之后停止执行或取消任务。
此时可再次使用Future,若是一个限时的get方法抛出了TimeoutException,那么能够提早停止它,避免消耗更多资源。

long endNanos = System.nanoTime() + TIME_BUDGET;
Future<A> f = exec.submit(task);
long timeLeft = endNanos - System.nanoTime();
try{
    A a = f.get(timeLeft,NANOSECONDS);// timeLeft若是<=0,就会中断
} catch(ExecutionException e){

} catch(TimeoutException){
    f.cancel(true);
}

小结

经过围绕任务执行来设计应用程序,能够简化开发过程,并有助于实现并发。Executor框架将任务提交与执行策略解耦开,还支持许多不一样类型的执行策略。当须要建立线程来执行任务时,能够考虑使用Executor。要想在将应用程序分解为不一样的任务时得到最大的好处,必须定义清晰的任务边界。某些应用程序有比较明显的边界,而在其余一些程序中则须要进一步分析才能揭示出粒度更细的并行性。

相关文章
相关标签/搜索