1. 用户请求取消。 java
2. 有时间限制的操做,如超时设定。 web
3. 应用程序事件。 安全
4. 错误。 异步
5. 关闭。 jvm
以下面这种取消操做实现: socket
/** * 一个可取消的素数生成器 * 使用volatile类型的域保存取消状态 * 经过循环来检测任务是否取消 */ @ThreadSafe public class PrimeGenerator implements Runnable { private final List<BigInteger> primes = new ArrayList<>(); private volatile boolean canceled; @Override public void run() { BigInteger p = BigInteger.ONE; while (!canceled){ p = p.nextProbablePrime(); synchronized (this) { //同步添加素数 primes.add(p); } } } /** * 取消生成素数 */ public void cancel(){ canceled = true; } /** * 同步获取素数 * @return 已经生成的素数 */ public synchronized List<BigInteger> get(){ return new ArrayList<>(primes); } }
其测试用例: ide
public class PrimeGeneratorTest { public static void main(String[] args) { PrimeGenerator pg = new PrimeGenerator(); new Thread(pg).start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally{ pg.cancel(); //始终取消 } System.out.println("all primes: " + pg.get()); } }
下面经过中断实现取消功能: 测试
/** * 经过中断来实现取消 * 不采用boolean变量, * 防止在queue.put()时因为阻塞,不能检查到boolean变量而没法取消 * 但使用interrupt就能够, * 即便queue.put()阻塞, 也会检查到interrupt信号,从而抛出IntteruptedException * 从而达到取消的目的 */ public class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; public PrimeProducer(BlockingQueue<BigInteger> queue){ this.queue = queue; } @Override public void run() { try { BigInteger p = BigInteger.ONE; while (!Thread.currentThread().isInterrupted()){ queue.put(p = p.nextProbablePrime()); } } catch (InterruptedException e) { // thread exit } } /** * 取消 */ public void cancel(){ interrupt(); //中断当前线程 } }
1. 传递异常。 this
2. 恢复中断状态,从而事调用栈的上层代码可以对其进行处理。 spa
public void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (ExecutionException e) { //任务执行中抛出异常 } catch (TimeoutException e) { //任务超时处理 } finally{ //任务执行完毕,没有影响; 任务执行中会中断任务 if (task != null) task.cancel(true); } }
1. java.io包中的同步Socket I/O。如套接字中进行读写操做read, write方法。
2. java.io包中的同步I/O。如当中断或关闭正在InterruptibleChannel上等待的线程时,会对应抛出ClosedByInterruptException或 AsynchronousCloseException。
3. Selector的异步I/O。若是一个线程在调用Selector.select时阻塞了,那么调用close, wakeup会使线程抛出ClosedSelectorException。
4. 获取某个锁。当一个线程等待某个锁而阻塞时,不会响应中断。但Lock类的lockInterruptibly容许在等待锁时响应中断。
/** * 经过改写interrupt方法将非标准的取消操做封装在Thread中 */ public class ReaderThread extends Thread { private final Socket socket; private final InputStream in; private int bufferSize; public ReaderThread(Socket socket, InputStream in) { this(socket, in, 1024); } public ReaderThread(Socket socket, InputStream in, int bufferSize) { this.socket = socket; this.in = in; this.bufferSize = bufferSize; } @Override public void interrupt() { try { socket.close(); //中断前关闭socket } catch (IOException e) { } finally{ super.interrupt(); } } @Override public void run() { try { byte[] buf = new byte[bufferSize]; while (true) { int count = in.read(buf); if (count < 0) { break; } else if (count > 0) { processBuffer(buf, count); } } } catch (IOException e) { // 线程中断处理 } } ... }
/** * 可取消的任务接口 */ public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } /** * 使用了Socket的任务 * 在取消时须要关闭Socket */ public abstract class SocketUsingTask<T> implements CancellableTask<T> { private Socket socket; public void setSocket(Socket socket) { this.socket = socket; } @Override public T call() throws Exception { //do working ... } @Override public synchronized void cancel() { try { if (socket != null){ socket.close(); } } catch (IOException ignored) { } } @Override public RunnableFuture<T> newTask() { return new FutureTask<T>(this){ @Override public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } catch (Exception ignored) { } return super.cancel(mayInterruptIfRunning); } }; } } /** * 经过newTaskFor将非标准的取消操做封装在任务中 */ public class CancellingExecutor extends ThreadPoolExecutor { public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask){ //如果咱们定制的可取消任务 return ((CancellableTask<T>)callable).newTask(); } return super.newTaskFor(callable); } }
/** * 不支持关闭的生产者-消费者日志服务 */ public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread logger; public LogWriter(Writer writer){ this.queue = new LinkedBlockingDeque<String>(); this.logger = new LoggerThread(writer); } public void start(){ logger.start(); } public void log(String msg) throws InterruptedException{ queue.put(msg); } private class LoggerThread extends Thread{ private final Writer writer; public LoggerThread(Writer writer) { this.writer = writer; } @Override public void run() { try { while(true){ writer.write(queue.take()); } } catch (IOException e) { // io exception handle } catch (InterruptedException e) { // interrupt exceptino handle } finally{ try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
/** * 为LoggerWriter添加可靠的取消操做 */ public class LogService { private final BlockingQueue<String> queue; private final LoggerThread logger; private final PrintWriter writer; private boolean isShutdown; //用于终止生产者 private int reservations; //队列中的消息数 public LogService(PrintWriter writer){ this.queue = new LinkedBlockingDeque<String>(); this.logger = new LoggerThread(); this.writer = writer; } /** * 产生日志 * @param msg 日志内容 * @throws InterruptedException */ public void log(String msg) throws InterruptedException{ synchronized (this) { if (isShutdown){ throw new IllegalStateException("can't log, service has stopped."); } ++reservations; } queue.put(msg); } /** * 启动日志香妃 */ public void start(){ logger.start(); } /** * 中止日志服务 */ public void stop(){ synchronized(this){ isShutdown = true; } logger.interrupt(); //中断日志线程 } /** * 消费日志线程 */ private class LoggerThread extends Thread{ @Override public void run() { try { while(true){ try { synchronized (LogService.this) { if (isShutdown) break; } String msg = queue.take(); synchronized (LogService.this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { // retry } } }finally{ writer.close(); } } } }
1. shutdown: 安全关闭。再也不接受新任务提交,待全部队列中的任务执行完成再关闭。
2. shutdownNow: 强行关闭。再也不接受新任务提交,中止正在执行的任务,并返回未开始执行的任务列表。
/** * 封装ExecutorService实现日志服务 */ public class LogService2 { private final ExecutorService exec = Executors.newSingleThreadExecutor(); private final PrintWriter writer; public LogService2(PrintWriter writer){ this.writer = writer; } /** * 产生日志 * @param msg 日志内容 * @throws InterruptedException */ public void log(String msg) throws InterruptedException{ exec.execute(new WriteTask(msg)); } /** * 中止日志服务 * @throws InterruptedException */ public void stop(long timeout, TimeUnit unit) throws InterruptedException{ try { exec.shutdown(); //平缓关闭服务 //关闭服务后, 阻塞到全部任务被执行完毕或者超时发生,或当前线程被中断 exec.awaitTermination(timeout, unit); } finally{ writer.close(); } } ... }
经过毒丸对象来关闭服务:
/** * 索引服务 * 经过一个毒丸对象来关闭服务 */ public class IndexingService { private static final File POISON = new File(""); //毒丸对象 private final IndexerThread consumer = new IndexerThread(); //消费者 private final CrawlerThread producer = new CrawlerThread(); //生产者 private final BlockingQueue<File> queue = new LinkedBlockingDeque<File>(); private final File root; public IndexingService(File root) { this.root = root; } /** * 启动索引服务 */ public void start(){ producer.start(); consumer.start(); } public void stop(){ producer.interrupt(); //中断爬虫线程 } public void awaitTermination() throws InterruptedException{ consumer.join(); //等待消费者线程结束 } /** * 爬虫线程 */ private class CrawlerThread extends Thread{ @Override public void run() { try { crawl(root); } catch (InterruptedException e) { // handle the exception } try { while(true){ queue.put(POISON); break; } } catch (InterruptedException e) { // retry } } private void crawl(File root) throws InterruptedException{ // crawl from web } } /** * 创建索引的线程 */ private class IndexerThread extends Thread{ @Override public void run() { try { while (true){ File file = queue.take(); if (file == POISON){ //如果毒丸对象 break; } else{ indexFile(file); //创建索引文件 } } } catch (InterruptedException e) { // handle exception } } private void indexFile(File file) { } } }
/** * 在ExecutorService中跟踪在关闭以后被取消的任务 */ public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>()); public TrackingExecutor(ExecutorService exec) { this.exec = exec; } /** * 获取关闭后取消的任务 */ public List<Runnable> getCancelledTasks(){ if (!exec.isTerminated()){ throw new IllegalStateException("service doesn't stop"); } return new ArrayList<>(tasksCancelledAtShutdown); } @Override public void execute(final Runnable command) { exec.execute(new Runnable() { @Override public void run() { try { command.run(); } finally{ //有可能出现误报: 任务执行完毕了, 线程池 if (isShutdown() && //若Executor已经关闭了 Thread.currentThread().isInterrupted()){ //且当前线程被中断了 tasksCancelledAtShutdown.add(command); } } } }); } }
/** * 将异常写入日志的UncaughtExceptionHandler */ public class UEHLogger implements UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { Logger logger = Logger.getAnonymousLogger(); logger.log(Level.SEVERE, "the thread with exceptoin: "+t.getName(), e); } }
终结器:
不吝指正。