在大多数状况下,咱们建立一个任务,都会让它运行直到结束。但有时,又须要在某种状况下取消任务,好比用户请求取消,有时间限制的任务,任务运行时出现错误等等。在Java中,没有一种安全的抢占式方式来中止线程(什么意思?),所以也没有安全的抢占式方法来中止任务。html
###标识 在前面的例子中,咱们曾使用volatile来修饰一个变量做为方法退出的一种标识,而在任务中,咱们一样可使用它来使得任务在须要的状况下退出。在下面的例子中,PrimeGenerator每次在生成素数以前都会检查canceled标识,若是为true,则当即退出任务。java
public class PrimeGenerator implements Runnable{ private final List<BigInteger> primes=new ArrayList<>(); private volatile boolean cancelled; public void run(){ BigInteger p=BigInteger.ONE; while(!cancelled){ p=p.nextProbablePrime(); synchronized(primes){ primes.add(p); } } } public void cancel(){ cancelled=true; } public synchronized List<BigInteger> get(){ return new ArrayList<BigInteger>(primes); } public static void main(String []args) throws Exception{ PrimeGenerator generator=new PrimeGenerator(); new Thread(generator).start(); try{ Thread.sleep(100); }finally{ generator.cancel(); } List<BigInteger> results=generator.get(); for(BigInteger i:results){ System.out.println(i); } } }
但这种方式只能知足一部分需求,若是在任务执行代码中存在线程阻塞的方法(sleep(),wait()...),那么就可能存在一个很严重的问题,任务不能如指望的那样及时退出,甚至可能永远不会退出。安全
public class BrokenPrimeProducer extends Thread{ private final BlockingQueue<BigInteger> queue; private volatile boolean cancelled=false; public BrokenPrimeProducer(BlockingQueue<BigInteger> queue){ this.queue=queue; } public void run(){ try{ BigInteger p=BigInteger.ONE; while(!cancelled){ queue.put(p=p.nextProbablePrime()); } }catch(InterruptedException e){} } public void cancel(){ cancelled=true; } public static void main(String []args) throws Exception{ BlockingQueue<BigInteger> primes=new LinkedBlockingQueue<BigInteger>(10); BrokenPrimeProducer producer=new BrokenPrimeProducer(primes); producer.start(); int count=0; try{ while(count<10){ count++; Thread.sleep(1000); System.out.println(primes.take()); } }finally{ producer.cancel(); } System.out.println("over.."); } }
在上面的例子中,BrokenPrimeProducer用于生产素数,并将结果保存在阻塞队列中,而main方法则在不断的从队列中读取素数。可是程序在运行到producer.cancel()以后,生产者线程并无如期的中止下来。这是由于,当队列已满时,queue.put()将会阻塞,而此时count>=10,再也不执行primes.take(),那么在调用producer.cancel()时,因为producer一直阻塞在queue.put方法处,使得线程不能检查到cancelled标识,致使线程永远不会结束。服务器
###线程中断app
对于须要取消 存在阻塞操做的任务,则不能使用检查标识的方式,而是经过线程中断机制。每一个线程都有一个boolean类型的中断状态,当中断线程时,该状态会被设置为true。在Thread类中,定义interrupt方法来中断线程目标,而isInterrupted方法能返回中断状态。静态的interrupted方法将清除当前线程的中断状态并返回它以前的值。异步
须要注意的是:socket
线程中断是一种协做机制,线程能够经过这种机制来通知另外一个线程,告诉它在合适的或者可能的状况下中止当前的工做。this
线程中断并不表明线程终止,线程的中断只是改变了线程的中断状态,这个中断状态改变后带来的结果取决于这个程序自己.net
调用interrupt方法并非当即中止线程,而是发出了一个中断请求,而后有线程自己在某个合适的时间点中断本身。对于wait(),sleep()等阻塞方法来讲,将严格处理这种请求,当他们收到中断请求或者开始执行发现中断状态被设置了,将抛出一个异常并将中断状态复位。线程
一般,中断是实现取消的最合理的方式
经过中断实现取消操做
public class PrimeProducer extends Thread{ private final BlockingQueue<BigInteger> queue; public PrimeProducer(BlockingQueue<BigInteger> queue){ this.queue=queue; } public void run(){ try{ BigInteger p=BigInteger.ONE; while(!Thread.currentThread().isInterrupted()){ queue.put(p=p.nextProbablePrime()); } }catch(InterruptedException e){ } } public void cancel(){ interrupt(); } }
###响应中断 当调用可阻塞的方法时,例如Thread.sleep()或BlockingQueue.put等,有两种实用策略可用于处理InterruptedException:
传递异常(抛出异常),从而使你的方法也成为可中断的阻塞方法
恢复中断,从而使调用栈中的上层代码可以对其进行处理
####传递异常 传递InterruptedException的方法包括根本就不捕获该异常,直接向上抛出,与将InterruptedException添加到throws子句中同样(如程序清单getNextTask所示)。或者捕获异常,在执行完某些操做后(清理),再抛出该异常。
BlockingQueue<Task> queue; ... public Task getNextTask() throws InterruptedException{ return queue.take(); }
####恢复中断状态
有时候不能抛出InterruptedException,例如在Runnable的run方法中,则必须捕获该异常,并经过调用当前线程的interrupt方法恢复中断状态。这样,在调用栈中更高层的代码将看到引起了一个中断。
public void run() { while (!Thread.currentThread().isInterrupted()) { try { ... sleep(delay); } catch (InterruptedException e) { //抛出InterruptedException异常后,中断标示位会自动清除 Thread.currentThread().interrupt();//从新设置中断标示 } } }
另一种状况是
public void mySubTask() { ... try { sleep(delay); } catch (InterruptedException e) { //抛出InterruptedException异常后,中断标示位会自动清除 Thread.currentThread().interrupted(); } } public void test(){ while(!Thread.currentThread().isInterrupted()){ mySubTask(); } }
抛出InterruptedException异常后,中断标示位会自动清除,须要恢复中断状态,这样,在test方法中才能看到mySubTask引起了一个中断。不然,test将继续执行while。
###Future
在concurrent包中,ExecutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning。若是为true而且这个线程正在运行,则线程能被中断。若是为false而且任务没有运行,则该任务不会被启动。
public static void timeOut(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{ Future<?> task=executorService.submit(r); try{ task.get(timeout,unit); }catch(TimeoutException e){ }catch(ExecutionException e){ }finally{ //若是任务已经结束,那么执行取消操做也不会有任何影响 task.cancel(true);//若是任务正在运行,则将被中断 } }
###不可中断的阻塞
在前面的例子中,只是针对某些可阻塞的方法作中断请求,在Java库中,并不是全部的可阻塞方法或者阻塞机制都能响应中断;若是一个线程因为执行同步的Socket I/O或者等待得到内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此以外没有任何其它用处。那些因为执行不可中断操做操做而被阻塞的线程,可使用相似于中断的手段来中止这些线程,但这要求咱们必须知道线程阻塞的缘由;
Socket I/O
:在服务器应用程序中,常见的阻塞IO形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但经过关闭底层套接字,可使得因为执行read或write等方法而被阻塞的线程抛出一个SocketException
同步 I/O
:当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路。当关闭一个InterruptibleChannel时,将致使全部在链路操做上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel
Selector异步I/O
:若是一个线程在调用Selector.select方法时阻塞,那么调用close或者wakeup方法会使线程抛出CloseSelectorException并提早返回
获取某个锁
:若是一个线程因为等待某个内置锁而阻塞,那么将没法响应中断。但可使用Lock类作替代
下面给出一个中断套接字阻塞的例子
import java.io.IOException; import java.net.ServerSocket; public class ThreadTest extends Thread { volatile ServerSocket socket; public static void main(String args[]) throws Exception { ThreadTest1 thread = new ThreadTest1(); System.out.println("Starting thread..."); thread.start(); Thread.sleep(3000); System.out.println("Asking thread to stop..."); thread.socket.close();// 再调用close方法,此句去掉将则不会 System.out.println("Stopping application..."); } public void run() { try { socket = new ServerSocket(3036); } catch (IOException e) { System.out.println("Could not create the socket..."); return; } while (!Thread.currentThread().isInterrupted()) { System.out.println("Waiting for connection..."); try { socket.accept(); } catch (IOException e) { System.out.println("accept() failed or interrupted..."); Thread.currentThread().interrupt();// 从新设置中断标示位 } } //判断线程是否被阻塞,若是被阻塞则没法打印此句 System.out.println("Thread exiting under request..."); } }
###日志服务 在代码清单LogWriter中给出了一个简单的日志服务示例,其中日志操做在单独的日志线程中执行。产生日志消息的线程并不会将消息直接输出,而是将其保存在一个阻塞队列中。这是一种多生产者单消费者的设计方式。
public class LogWriter{ private final BlockingQueue<String> queue; private final LoggerThread logger; public LogWriter(Writer writer){ this.queue=new LinkedBlockingQueue<String>(10); this.logger=new LoggerThread(writer); } public void start(){ logger.start(); } public void put(String msg) throws InterruptedException{ queue.put(msg); } private class LoggerThread extends Thread{ private final Writer writer; public void run(){ try{ while(true){ writer.println(queue.take()); } }catch(InterruptedException e){ }finally{ writer.close(); } } } }
当咱们想要中止日志服务时,只须要在queue.take()方法捕获抛出的InterruptedException异常,退出日志服务便可。但这种退出方式是不完备的,首先,对于正在等待被写入到日志的信息将会丢失,其次,因为队列已满时put操做会阻塞,因此等待put的线程也会被阻塞。这种状态下,生产者和消费者须要同时被取消。因为生产者不是专门的线程,所以要取消他们将很是困难。
另外一种关闭LogWriter的方式是设置一个“已请求关闭”的标识,以免进一步提交日志。
public class LogService{ private final BlockingQueue<String> queue; private fina LoggerThread loggerThread; private final PrintWriter writer; private boolean isShutDown; private int reservations; public void start(){ loggerThread.start(); } public void stop(){ synchronized(this){ isShutDown=true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException{ synchronized(this){ if(isShutDown){ throw new InterruptedException(""); } ++reservations; } //由于put操做原本就是同步的,因此不须要再加内置锁 queue.put(msg); } private class LoggerThread extends Thread{ public void run(){ try{ while(true){ try{ //若是处理完了阻塞队列中的日志则退出 synchronized(LogService.this){ if(isShutdown&&reservations==0){ break; } } String msg=queue.take(); synchronized(LogService.this){ --reservations; } writer.println(msg); }catch(InterruptedException e){ } } }finally{ writer.close(); } } } }
更简单的使用ExecutorService来管理
public class LogService{ private final ExecutorService executorService=... ... public void start(){ } public void stop() throws InterruptedException{ try{ executorService.shutdown();//再也不接收新的任务 executorService.awaitTermination(TIMEOUT,UNIT);//等待关闭时间 }finally{ writer.close(); } } public void log(String msg){ try{ exectorService.submit(new Task(msg)); }catch(RejectedExecutionException e){ } } }