要使任务和线程能安全、快速、可靠的停下来,并非一件容易的事。java没有提供任何机制来安全地终止线程(Thread.stop和suspend等方法提供了这样的功能,可是存在严重缺陷,应该避免使用)。
可是java提供了中断(Interruption)
,这是一种协做机制,可以使一个线程终止另外一个线程的当前工做。
咱们不多但愿某个任务、线程或服务当即中止,由于这种当即中止会使共享的数据结构处于不一致的状态。通常使用的协做的方式:当须要中止时,它们首先会清除当前正在执行的工做,而后再结束。由于任务自己的代码比发出取消请求的代码更清除如何执行清除工做。java
若是外部代码能够在某个操做正常完成以前,将其置入完成
状态,那么这个操做就能够称为可取消的
。取消某个操做的缘由有不少:编程
协做机制能设置某个已请求取消(Cancellation Request)
标志,而任务将按期查看这个标志。若是设置了这个标志,任务就提早结束。api
private volatile boolean cancelled; public void run(){ while(!cancelled){ doSomething(); } } public void cancel(){ cancelled = true; }
一个可取消的任务必须拥有取消策略(Cancellation Policy)
,这个策略中必须定义:其余代码如何How
请求该任务,任务在什么时候When
检查是否已经请求了取消,以及在响应取消时应该执行哪些What
操做.缓存
若是在使用中断某一个任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题:任务可能永远不会检查取消标志,所以永远也不会结束。安全
private final BlockingQueue<T> queue; private volatile boolean cancelled; public void run(){ while(!cancelled){ queue.put(something); } } public void cancel(){ cancelled = true; }
当前线程为生产者,当生产者在queue.put()
方法上阻塞了,而这时候消费者但愿取消生产者任务,执行了cancel()
方法,可是生产阻塞在put上,也许永远也没有机会检查cancelled标志
(若是消费者中止从队列中取数,put方法就会一直阻塞)。服务器
在java的api或语言规范中,并无将中断与任何取消语义关联起来,但实际上,若是在取消以外的其余操做中使用中断,那么都是不合适的,而且很难支撑起更大的应用。
// Thread中的中断方法 public class Thread{ public void interrupt(){} public void isInterrupted(){} public static boolean interrupted(){} }
阻塞库方法(例如Thread.sleep()和Object.wait())等,都会检查线程什么时候中断,而且在发现中断时提早返回。
它们在响应中断时执行的操做包括:数据结构
JVM并不能保证阻塞方法检测到中断的速度(实际上速度是很快的)。并发
调用interrupt并不意味着当即中止目标线程正在进行的工做,而只是传递了请求中断的消息。
对中断操做的正确理解是:它并不会真正的中断一个正在运行的线程,只是发出中断请求,而后等待线程在下一个合适的时刻中断本身(这些时刻也被称为取消点)。
一些自定义的取消机制没法与可阻塞的库函数实现良好交互。若是代码可以响应中断,那么能够用中断做为取消机制,而且利用许多类库提供的中断支持。
一般中断是实现取消的最合理方式。
private final BlockingQueue<T> queue; public void run(){ try{ while(!Thread.currentThread().isInterrupted()){ queue.put(something); } }catch(InterruptedException consumed){ // 我的理解:中断线程也许是靠抛出异常来退出的 } public void cancel(){ interrupted(); } }
中断策略规定线程如何解释中断请求:当发现中断时,应该作哪些工做,哪些工做单元对中断来讲是原子操做,以及以多快的速度来响应中断。
最合理的中断策略是某种形式的线程级(Thread-Level)
取消操做或是服务级(Service-Level)
取消操做:框架
一个中断请求可能有一个或者多个接受者,中断线程池中的某个工做者线程,同时意味着“取消当前任务”和“关闭工做者线程”。
阻塞的库函数都只是抛出InterruptedException做为中断响应,它们的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使使用栈中的上层代码能够采起进一步操做。异步
因为每一个线程拥有各自的中断策略,所以除非你知道中断对该线程的含义,不然就不该该中断该线程。
当调用可中断的阻塞函数时(例如Thread.sleep或BlockingQueue.put等),有两种实用策略可用于处理InterruptedException:
/** * 传递异常的方法,能够直接经过throws抛出 **/ BlockingQueue<Task> queue; …… public Task getNextTask() throws InterruptedException { return queue.take(); }
若是不想或者不能传递InterruptedException,那么要寻找另外一种方式来保存中断请求。一种标准的作法就是经过再次调用interrupt来恢复中断状态。除非在代码中实现了中断策略,不然不要无视InterruptedException。
只有实现线程中断策略的代码才能够屏蔽中断请求。在常规的任务和库代码中都不该该屏蔽中断请求。
对于一些不支持取消,可是仍然能够调用中断阻塞方法的操做,它们必须在循环中调用这些方法,并在发现中断后从新尝试。
boolean interrupted = false; try{ while(true){ try{ }catch(InterruptedException e){ interrupted = true; // 从新尝试 } } }finally{ if(interrupted){ Thread.currentThread().interrupt(); } }
若是代码不会调用可中断的阻塞方法,那么能够经过在任务代码中轮询当前线程的中断状态来响应中断。
中断能够用来获取线程的注意,而且由中断线程保存的信息,能够为中断的线程提供进一步的提示(访问这些信息的时候须要确保使用同步)。
private static final ScheduledExcutorService cancelExec = ... private static void timedRun(Runnable r, long timeout, TimeUnit unit){ final Thread taskThread = Thread.currentThread(); cancelExec.schedule(new Runnable(){ public void run(){ taskThread.interrupt(); } },timeout unit); r.run(); }
这是一个在指定时间内运行一个任意的Runnable的示例。它在调用线程中的运行任务,并安排了一个取消任务,在运行了指定的时间间隔后中断它。这解决了从任务中抛出未检查异常的问题,由于这个异常会被timeRun()的调用者所捕获。
可是问题是:在中断以前,应该了解它的中断策略。
由于timeRun()能够从任意一个线程中调用,所以它没法得知这个调用线程的中断策略。若是任务在超时前完成,那么中断timeRun所在线程的取消任务将爱timedRun返回到调用者以后启动。并且若是任务不响应中断,那么timeRun将会在任务结束时才返回,有可能超过的调用者所指定的时限。
public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InerruptedException { class RethrowableTask implements Runnable { private volatile Throwable t; public void run() { try{ r.run(); }catch(Throwable t){ this.t = t; } } void rethrow(){ if(t != null){ throw launderThrowable(t); } } } RethrowableTask task = new RethrowableTask; final Thread taskThread = new Thread(task); taskThread.start(); cancelExec.schedule(new Runnable(){ public void run(){ taskThread.interrupt(); } }, timeout, unit); taskThread.join(unit.toMillis(timeout)); task.rethrow(); }
执行任务的线程拥有本身的执行策略,即便任务不响应中断,即时运行的方法仍能返回到它的调用者。
在启动任务线程以后,timeRun将执行一个限时的join方法。在join返回后,它将检查任务中是否有异常抛出,若是有的话,则会在调用timedRun的线程中再次抛出该异常。因为Throwable将在两个线程之间共享,所以设置为volatile类型,来保证安全发布。
可是该代码依赖一个限时的join,所以有着join的不足:没法知道执行控制是由于线程正常退出而返回,仍是由于join超时而返回。
ExecutorService.submit()
将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning
,表示取消操做是否成功(只是表示可否接受中断,而不是表示任务是否能检测并处理中断)。
mayInterruptIfRunning
为true且任务当前正在某个线程运行,那么这个线程能够被中断。mayInterruptIfRunning
为false,那么意味着若任务还没启动,则不要运行它。执行任务的线程是由标准的Executor建立的,它实现了一种中断策略使得任务能够经过中断被取消,若是任务在标准Executor中运行,并经过它们的Future来取消任务,那么能够设置mayInterruptIfRunning
。当尝试取消某个任务时,不宜直接中断线程池,由于你不知道当中断请求到达时线程正在运行什么任务,只能经过任务的Future来实现。
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try{ task.get(timeout,unit); }catch(TimeoutException e){ // 接下来任务将被取消 }catch(ExecutionException e){ // 若是任务中抛出了异常,则从新抛出该异常 throw launderThrowable(e.getCause()); }finally { // 若是任务已经结束,取消操做也不会带来任何影响 // 若是任务正在运行,那么将被中断 task.cancel(); } }
当Future.get
抛出InterruptedException
或TimeoutException
时,若是你知道再也不须要结果,那么就能够调用Future.cancel
来取消任务。
在java库,许多可阻塞的方法都是经过提早返回或者抛出InterruptedException来响应中断请求的。然而并不是全部的可阻塞方法或者阻塞机制都能响应中断。
若是一个线程因为执行同步的Socket I/O或者等待获取内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此以外没有其余做用。
java.io
包中的同步Socket I/O
:
InputStream
和OutputStream
的read和write方法
不会响应中断,但经过关闭底层的套接字,可使因为执行read和write等方法被阻塞的线程抛出一个SocketException
.java.io
包中的同步I/O
:
InterruptibaleChannel
上等待的线程时,将抛出ClosedByInterruptException
并关闭链路(会使得其余在这条链路上阻塞的线程通用抛出ClosedByInterruptException
)。InterruptibaleChannel
,将致使全部在链路操做上阻塞的线程都抛出AsynchronousCloseException
。大多数标准的Channel都实现了InterruptibaleChannel
。Selector的异步I/O
:
java.nio.channels中的Select.select
方法阻塞了,那么调用close或者wakeup方法会使线程抛出ClosedSelectorException
并提早返回。得到某个锁:
lockInterruptibly
方法,该方法容许在等待一个锁的同时能响应中断)。public class ReaderThread extends Thread{ private final Socket socket; private final InputStream inputStream; public ReaderThread(Socket socket) throws IOException{ this.socket = socket; this.inputStream = socket.getInputStream(); } public void interrupt(){ try { socket.close(); }catch (IOException ignored){ }finally { super.interrupt(); } } public void run(){ try { byte[] buf = new byte[1000]; while (true){ int count = inputStream.read(buf); if(count<0){ break; }else if (count>0){ doSomething(buf,count); } } }catch (IOException e){ } } }
经过改写interrupt方法,既能处理标准的中断,也能关闭底层套接字。不管线程是在read或write方法中阻塞仍是在某个可中断方法中阻塞,均可以被中断中止执行当前工做。
咱们能够经过newTaskFor
方法是ThreadPoolExecutor中的新增功能。当把一个Callable提交给ExecutorService
时,submit()
会返回一个Future
,咱们能够经过这个Future来取消任务。newTaskFor是一个工厂方法,它将建立Future来表明任务。newTaskFor还能返回一个RunnableFuture接口,该接口拓展了Future
和Runnable
(由FutureTask实现)。
经过定制表示任务的Future能够改变Future.cancel的行为。例如,定制的取消代码能够实现日志记录或者收集取消操做的统计信息,以及取消一些不响应中断的操做。
public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } public class CancellingExecutor extends ThreadPoolExecutor{ ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask){ return ((CancellableTask<T>) callable).newTask(); }else { return super.newTaskFor(callable); } } public abstract class SocketUsingTask<T> implements CancellableTask<T> { private Socket socket; protected synchronized void setSocket(Socket s){ socket = s; } public synchronized void cancel(){ try{ if(socket != null){ socket.close(); } }catch(IOException ignored){} } public RunnableFuture<T> newTask(){ return new FutureTask<T>(this){ public boolean cancel(boolean mayInterruptIfRunning){ try{ SocketUsingTask.this.cancel(); }finally{ return super.cancel(mayInterruptIfRunning); } } } } } }
CancellableTask中定义了一个CancellableTask接口,该接口拓展了Callable,并增长了一个cancel方法和一个newTask工厂方法来构造RunnableFuture。CancellingExecutor拓展了ThreadPoolExecutor,并改写了newTaskFor使Cancellable能够建立本身的Future。
正确的封装原则:除非拥有某个线程,不然不能对该线程进行操控。
线程由Thread对象表示,而且像其余对象同样能够被自由的共享。可是线程有一个相应的全部者,即建立该线程的类。所以线程池是其工做线程的全部者,若是要中断这些线程,那么应该使用线程池。
与其余封装对象同样,线程的全部权是不可传递的:应用程序能够拥有服务,服务也能够拥有工做者线程,可是应用程序并不能拥有工做者线程,所以应用程序不能直接中止工做者线程。应用程序应该提供生命周期方法(Lifecycle Method)
来关闭它本身以及它所拥有的线程。在ExecutorService中提供了shutdown和shutdownNow等方法,在其余拥有线程的服务中也应该提供相似的关闭机制。
对于持有线程的服务,只要服务的存在时间大于建立线程的方法的存在时间,那就应该提供生命周期方法。
ExecutorService提供了两种关闭方法:
shutdown
正常关闭:速度相对shutdownNow来讲更慢,安全性高,会等待队列中全部的任务都执行完才关闭。shutdownNow
强行关闭:速度快,可是安全性低,首先关闭当前正在执行的任务,而后返回未启动的任务清单。一种关闭生产者-消费者服务的方式就是使用毒丸(Poison Pill)对象
:毒丸是指一种放在队列上的对象,其含义是:当获得这个对象,当即中止。在FIFO队列中,毒丸对象将会确保消费者在关闭以前首先完成队列中的全部工做,再提交毒丸。毒丸对象以前的全部工做都会获得处理,而生产者在提交毒丸对象之后,将不会提交任何的工做。
当生产者和消费者的数量较大时,这种方法将变得难以使用。只有在无界队列中,毒丸对象才能可靠的工做。
当经过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务而且返回已提交可是还没有执 行的任务
(以便调用者线程把这些任务写入日志或者作其余后续处理);shutdownNow返回的List<Runnable>
可能与提交给ExecutorService的Runnable不一样,它们可能被封装或者修改过。
可是咱们没法经过普通方法找出哪些任务已经开始可是还没有结束
,咱们没法得知状态,除非执行线程中有某些检查。
致使线程提早死亡的最主要缘由就是RuntimeException,因为某些异常表示了某种编程错误或其余相似的不可修复的错误,所以它们不会被捕获。它们不会在调用栈中逐层传递,而是默认的在控制台输出栈追踪信息,并终止线程。
任何代码均可能抛出一个RuntimeException。每当调用另外一个方法时,都要对它的行为保持怀疑,不要盲目的认为它必定会正常返回,或者必定会抛出在方法原型中声明的异常。对调用的代码越不熟悉,越应该对其行为保持怀疑。
在任务处理线程的生命周期中,将经过某种抽象机制(如Runnable)来调用许多未知的代码,咱们应该对这些线程可否表现出正确的行为表示怀疑。所以,这些线程应该在try-catch代码块中调用这些任务,就能捕获未检测的异常了,或者也可使用try-finally代码块来确保框架可以知道线程非正常退出的状况。
public void run(){ Throwable thrown = null; try{ while(!isInterrupted){ runTask(getTaskFromWorkQueue()); } catch (Throwable e){ thrown = e; } finally { threadExited(this,thrown); } } }
在线程池内部构建一个工做者线程,若是任务抛出了一个未检查异常,那么它将使线程终结,可是会先同时框架它已经终结。而后框架可能会调用一个新的线程来代替这个工做线程,也可能不会,由于线程池正在关闭,或者已经有足够多的线程能知足须要。当使用这种方法,能够避免某个编写的糟糕的任务或插件时不会影响调用它的整个线程。
在Thread API中一样提供了UncaughtExceptionHandler
,它能检测某个线程因为捕获异常而终结的状况。这个与前面的工做者线程是互补的,经过将两者结合在一块儿,能够有效的防止线程泄露问题。
public interface UncaughtExceptionHandler { void uncaughtExceptionHandle(Thread t, Throwable e); }
异常处理器如何捕获异常,取决于对服务质量的需求。最多见的响应方式将一个错误信息以及相应的栈追踪信息写入应用程序日志中。
public class UEHLogger implements Thread.UncaughtExceptionHandler { public void uncaughtExceptionHandler(Thread t, Throwable e){ Logger logger = Logger.getAnonymousLogger(); logger.log(Level.SEVERE, "Thread terminated with Exception:" + t.getName(), e); } }
能够经过Thread.setUncaughtExceptionHandler
为每一个线程设置一个UncaughtExceptionHandler,还可使用setDefaultUncaughtExceptionHandler
来设置默认的UncaughtExceptionHandler。
在运行时间较长的应用程序中,一般会为全部线程的未捕获异常指定同一个异常处理器,而且该处理器至少会将异常信息记录到日志中。
若是要为线程池中的全部线程设置一个UncaughtExceptionHandler,须要为ThreadPoolExecutor的构造函数提供一个ThreadFactory。标准线程池容许当发送未捕获异常时结束线程,但因为使用了一个try-finally代码块来接受其余通知,所以当线程结束时,将有新的线程来代替它。若是没有提供捕获异常处理器或其余的故障通知机制,那么任务会悄悄的失败,从而形成极大的混乱。若是你但愿任务因为发生异常而失败的时得到通知,而且执行一些特定于任务的恢复操做,那么能够将 任务封装在能捕获异常的Runnable或Callable中,或者 改写ThreadPoolExecutor的afterExecute
方法。
可是只有经过execute提交的任务,才能将异常交给捕获异常处理器,而经过submit提交的任务,不管是抛出的未检查异常仍是已检查异常,都将被认为是任务返回状态的一部分。若是一个由submit提交的任务抛出异常,那么将被Future.get封装在ExecutionException中从新抛出。
JVM既能够正常关闭,也能够强行关闭。
正常关闭的触发方式:
也能够经过调用Runtime.halt或者在操做系统中发送SIGKILL等
在正常关闭中,JVM首先调用全部已注册的关闭钩子(Shutdown Hook)
。关闭钩子是指经过Runtime.addShutdownHook注册的但还没有开始的线程。
runFinalizersOnExit
为true,那么JVM将运行终结器,而后中止。关闭钩子应该是线程安全的:
关闭钩子能够用于实现服务或应用程序的清理工做,例如删除临时文件,或者清除没法由系统自动清除的资源。
因为关闭钩子将并发执行,所以在关闭日志文件时可能致使其余须要日志服务的关闭钩子产生问题:所以,关闭钩子不该该依赖于那些可能被应用程序或其余的关闭钩子关闭的服务。实现这种功能的一种方式是对全部服务使用同一个关闭钩子,而且在该关闭钩子中执行一些列的关闭操做。这确保了关闭操做在单个线程中串行执行,从而避免了操做之间出现竞态条件或死锁等问题。
public void start(){ Runtime.getRuntime().addShutdownHook(new Thread(){ public void run(){ try{ LogService.this.stop(); }catch(InterruptedException ignored){} } }); }
有时候你但愿建立一个线程来执行一些辅助工做,可是又不但愿这个线程阻碍JVM的关闭,那么你可使用守护线程(Daemon Thread)
。
线程分为守护线程和普通线程。在JVM启动时建立的全部线程,除了主线程,其余都是守护线程(例如GC或其余辅助工做的线程)。
当建立一个新线程时,新线程将继承建立它的线程的守护状态。所以主线程建立的都是普通线程。
仅在于线程退出时发生的操做:当一个线程退出时,JVM会检查其余正在运行的线程,若是这些线程都是守护线程,那么JVM会正常的退出操做。当JVM中止时,全部仍然存在的守护线程将被直接抛弃:不会执行finally代码块,也不会执行回卷栈,JVM会直接退出。
咱们应该尽可能不使用守护线程 : 由于不多有操做可以在不进行清理的状况下被安全地抛弃。例如若是在守护线程中执行包含I/O操做的任务,那么将是一种危险的行为。守护线程最好用于执行“内部”任务,例如周期性的从缓存中移除无效数据。
此外,守护线程一般不能用来替代应用程序管理程序中的各个服务的生命周期。
当再也不须要内存资源的时候,能够经过GC自动回收它们。对于一些其它资源,如文件句柄或者套接字句柄,当再也不须要它们的时候,须要显式的还给操做系统。为了实现这个功能,垃圾回收期对定义了finalize方法的对象会进行特殊处理:在回收期释放它们之后,调用它们的finalize方法,从而保证一些持久化的资源被释放。
因为终结器能够在某个由jvm管理的线程中运行,所以终结器访问的任何状态均可能被多个线程访问,这样就必须对其访问操做进行同步。
终结器也没法保证它们在什么时候运行甚至是否运行,而且复杂的终结器将在对象上产生巨大的性能开销。
在大多数时候,使用finally代码块和显式的close方法,可以比使用终结器更好地管理资源。
惟一的例外状况是:当须要管理对象,而且该对象持有的资源是经过本地方法得到的。
避免使用终结器。
在任务,线程,服务以及应用程序等模块中的生命周期结束问题,可能会增长它们在设计和实现时的复杂性。java并无提供某种抢占式的机制来取消或者终结线程。相反,它提供了一种协做式的中断机制来实现取消操做,但这要依赖于如何构建取消操做的协议,以及可否始终遵循这些协议。经过FutureTask和Executor框架,能够帮助咱们构建可取消任务和服务。