并发编程使人困惑的一个主要缘由:使用并发时须要解决的问题有多个,而实现并发的方法也有多种,而且在这二者之间没有明显的映射关系。java
速度问题初听起来很简单:若是你须要一个程序运行得更快,那么能够将起断开为多个片断,在单个处理器上运行每一个片断。
并发一般是提升运行在单个处理器上的程序的性能,但在单个处理器上运行的并发程序开销确实应该比该程序全部部分都顺序执行开销大,由于其中增长了所谓的上下文切换的代价。
若是没有任务会阻塞,那么在单处理器上使用并发就没有任何意义。
在单处理器系统中的性能提升常见示例是事件驱动的编程。
Java采起的是在顺序语言的基础上提供对线程的支持。与在多任务操做系统中分叉进程不一样,线程机制是在由执行程序表示的单一进程中建立任务。程序员
协做多线程:Java的线程机制是抢占式的,这表示调度机制周期性的中断线程,将上下文切换到另外一个线程,从而为每一个线程都提供时间片,使得每一个线程都会分配到数量合理得时间去驱动它得任务。在协做式系统中,每一个任务都会自动得放弃控制,这要求程序员要有意识得插入某种类型得让步语句。协做式系统得优点是双重得:上下文切换的开销一般比抢占式要少得多,而且对能够同时执行的线程数量在理论上没有任何限制。npm
经过使用多线程机制,这些独立任务中的每个将由执行线程来驱动,一个线程就是在进程中的一个单一顺序控制流,当个进程能够拥有多个并发执行的任务。编程
线程能够驱动任务,所以你须要一种描述任务的方式,这能够由Runnable接口来提供。要想定义任务,只需实现Runnable接口并编写run(0方法,使得该任务能够执行你的命令。设计模式
public class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } } } ///:~
Thread.yield()的调用是对线程调度器的之后在哪一个建议,它声明:我已经执行完生命周期中最重要的部分了,此刻正是切换给其余任务执行一段时间的时机了。安全
public class MainThread { public static void main(String[] args) throws InterruptedException { LiftOff launch = new LiftOff(); launch.run(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), *///:~
当从Runnable导出一个类时,它必须具备run()方法,可是这个方法并没有特殊之处——它不会产生内在的线程能力。要实现线程行为,你必须显示的将一个任务附着在线程上。多线程
将Runnable对象转变为工做任务的传统方式是把它提交给一个Thread构造器:并发
public class BasicThreads { public static void main(String[] args) { Thread t = new Thread(new LiftOff()); t.start(); System.out.println("Waiting for LiftOff"); } } /* Output: (90% match) Waiting for LiftOff #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), *///:~
能够添加更多的线程去驱动更多的任务。app
public class MoreBasicThreads { public static void main(String[] args) { for(int i = 0; i < 5; i++) new Thread(new LiftOff()).start(); System.out.println("Waiting for LiftOff"); } } /* Output: (Sample) Waiting for LiftOff #0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
当main()建立Thread对象时,它并无捕获任何对这些对象的引用。每一个Thread都注册了它本身,所以确实有一个对它的引用,并且在它的任务推出其run()并死亡以前,垃圾回收期没法清除它。dom
Java SE5的jav.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不一样,这个中介对象将执行任务。Executor容许你管理异步任务的执行,而无须显示的管理线程的声明周期。
咱们可使用Executor来代替Thread对象。
import java.util.concurrent.*; public class CachedThreadPool { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: (Sample) #0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
单个的Executor被用来建立和管理系统中全部任务。
对shutdown()方法的调用能够防止新任务被提交给这个Executor,当前线程将继续运行在shutdown()被调用以前提交全部任务。
FixedThreadPool使用了有限的线程集来执行所提交的任务:
import java.util.concurrent.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), *///:~
有了FixedThreadPool,就能够一次性预先执行代价高昂的线程分配,于是也就能够限制线程的数量了。
CachedThreadPool在程序执行过程当中一般会建立于所徐数量相同的线程,而后再它回收旧线程时中止建立新的线程,所以它是合理的Executor首选。只有当这种方式会引起问题时,才须要切换到FixedThreadPool。
SingleThreadExecutor就像是线程数量为1的FixedThreadPool。
SingleThreadExecutor会序列化全部提交给它的任务,并会维护它本身的悬挂任务队列。
import java.util.concurrent.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), *///:~
Runnable是执行工做的独立任务,可是它不返回任何值。若是你但愿任务再完成时可以返回一个值,能够实现Callable接口而不是Runnable接口。
//: concurrency/CallableDemo.java import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>(); for(int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i)));//将产生Future对象 for(Future<String> fs : results) try { // get() blocks until completion: System.out.println(fs.get()); } catch(InterruptedException e) { System.out.println(e); return; } catch(ExecutionException e) { System.out.println(e); } finally { exec.shutdown(); } } } /* Output: result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 *///:~
还可使用isDone判断是否执行完成,若是不调用isDone,那个若是没有完成,get会被阻塞。
影响任务行为的一种简单方式是调用sleep(),这将使任务停止执行给定的时间。
//: concurrency/SleepingTask.java // Calling sleep() to pause for a while. import java.util.concurrent.*; public class SleepingTask extends LiftOff { public void run() { try { while(countDown-- > 0) { System.out.print(status()); // Old-style: Thread.sleep(100); // Java SE5/6-style: //TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { System.err.println("Interrupted"); } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new SleepingTask()); exec.shutdown(); } } /* Output: #0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
异常不能跨线程传播回main(),因此你必须在本地处理全部在任务内部产生的异常。
线程的优先级将该线程的重要性传递给调度器。
优先级较低的线程仅仅是执行的频率较低。
在对大多数时间里,全部线程都应该以默认的优先级运行,试图操做线程的优先级一般是一种错误。
//: concurrency/SimplePriorities.java // Shows the use of thread priorities. import java.util.concurrent.*; public class SimplePriorities implements Runnable { private int countDown = 5; private volatile double d; // No optimization private int priority; public SimplePriorities(int priority) { this.priority = priority; } public String toString() { return Thread.currentThread() + ": " + countDown; } public void run() { Thread.currentThread().setPriority(priority);//设置当前线程优先级,使用getPriority获取当前优先级 while(true) { // An expensive, interruptable operation: for(int i = 1; i < 100000; i++) { d += (Math.PI + Math.E) / (double)i; if(i % 1000 == 0) Thread.yield(); } System.out.println(this); if(--countDown == 0) return; } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute( new SimplePriorities(Thread.MIN_PRIORITY)); exec.execute( new SimplePriorities(Thread.MAX_PRIORITY)); exec.shutdown(); } } /* Output: (70% match) Thread[pool-1-thread-6,10,main]: 5 Thread[pool-1-thread-6,10,main]: 4 Thread[pool-1-thread-6,10,main]: 3 Thread[pool-1-thread-6,10,main]: 2 Thread[pool-1-thread-6,10,main]: 1 Thread[pool-1-thread-3,1,main]: 5 Thread[pool-1-thread-2,1,main]: 5 Thread[pool-1-thread-1,1,main]: 5 Thread[pool-1-thread-5,1,main]: 5 Thread[pool-1-thread-4,1,main]: 5 ... *///:~
当调用yieId()时,你也是在建议具备相同优先级的其余线程能够运行。
大致上,对于任何重要的控制或在调用整个应用时,都不能依赖yieId(),实际上,yieId()常常被误用。
所谓后台线程,是指在程序运行的时候在后台提供一种通用服务的线程,而且这种线程并不属于程序中不可或缺的部分。当全部非后台线程结束时,程序也就终止了,同时会杀死进程中的全部后台线程。
//: concurrency/SimpleDaemons.java // Daemon threads don't prevent the program from ending. import java.util.concurrent.*; import static net.mindview.util.Print.*; public class SimpleDaemons implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("sleep() interrupted"); } } public static void main(String[] args) throws Exception { for(int i = 0; i < 10; i++) { Thread daemon = new Thread(new SimpleDaemons()); daemon.setDaemon(true); // 必须在线程被调用以前设置setDaemon daemon.start(); } print("All daemons started"); TimeUnit.MILLISECONDS.sleep(175); } } /* Output: (Sample) All daemons started Thread[Thread-0,5,main] SimpleDaemons@530daa Thread[Thread-1,5,main] SimpleDaemons@a62fc3 Thread[Thread-2,5,main] SimpleDaemons@89ae9e Thread[Thread-3,5,main] SimpleDaemons@1270b73 Thread[Thread-4,5,main] SimpleDaemons@60aeb0 Thread[Thread-5,5,main] SimpleDaemons@16caf43 Thread[Thread-6,5,main] SimpleDaemons@66848c Thread[Thread-7,5,main] SimpleDaemons@8813f2 Thread[Thread-8,5,main] SimpleDaemons@1d58aae Thread[Thread-9,5,main] SimpleDaemons@83cc67 ... *///:~
必须在线程启动以前调用setDaemom()方法,才能把它设置为后台线程。
经过编写定制的ThreadFactory能够定制由Executor建立的线程的属性:
package net.mindview.util; import java.util.concurrent.ThreadFactory; public class DaemonThreadFactory implements ThreadFactory { public DaemonThreadFactory() { } public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }
//: concurrency/DaemonFromFactory.java // Using a Thread Factory to create daemons. import java.util.concurrent.*; import net.mindview.util.*; import static net.mindview.util.Print.*; public class DaemonFromFactory implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("Interrupted"); } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool( new DaemonThreadFactory()); for(int i = 0; i < 10; i++) exec.execute(new DaemonFromFactory()); print("All daemons started"); TimeUnit.MILLISECONDS.sleep(500); // Run for a while } } /* (Execute to see output) *///:~
能够经过调用isDaemon()方法来肯定线程是不是一个后台线程。若是是一个后台线程,那么它建立的任何线程将被自动设置成后台线程:
// Using a Thread Factory to create daemons. import java.util.concurrent.*; import net.mindview.util.*; import static net.mindview.util.Print.*; public class DaemonFromFactory implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("Interrupted"); } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool( new DaemonThreadFactory()); for(int i = 0; i < 10; i++) exec.execute(new DaemonFromFactory()); print("All daemons started"); TimeUnit.MILLISECONDS.sleep(500); // Run for a while } } /* (Execute to see output) *///:~
后台进程在不执行finaiiy子句的状况下就会终止其run()方法:
//: concurrency/DaemonsDontRunFinally.java // Daemon threads don't run the finally clause import java.util.concurrent.*; import static net.mindview.util.Print.*; class ADaemon implements Runnable { public void run() { try { print("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { print("Exiting via InterruptedException"); } finally { print("This should always run?"); } } } public class DaemonsDontRunFinally { public static void main(String[] args) throws Exception { Thread t = new Thread(new ADaemon()); t.setDaemon(true); t.start(); } } /* Output: Starting ADaemon *///:~
若是你注释调对setDaemon()的调用,就会看到finally子句将会执行。
当最后一个非后台线程终止时,后台线程会忽然终止。所以一旦main()退出,JVM就会当即关闭全部后台线程。由于不能以优雅的方式来关闭后台线程,因此它们几乎不是一种好的思想。非后台的Executor一般是一种更好的方法,它控制的全部任务均可以同时被关闭,关闭将以有序的方式执行。
使用直接从Thread继承这种可替代的方式:
//: concurrency/SimpleThread.java // Inheriting directly from the Thread class. public class SimpleThread extends Thread { private int countDown = 5; private static int threadCount = 0; public SimpleThread() { // Store the thread name: super(Integer.toString(++threadCount)); start(); } public String toString() { return "#" + getName() + "(" + countDown + "), "; } public void run() { while(true) { System.out.print(this); if(--countDown == 0) return; } } public static void main(String[] args) { for(int i = 0; i < 5; i++) new SimpleThread(); } } /* Output: #1(5), #1(4), #1(3), #1(2), #1(1), #2(5), #2(4), #2(3), #2(2), #2(1), #3(5), #3(4), #3(3), #3(2), #3(1), #4(5), #4(4), #4(3), #4(2), #4(1), #5(5), #5(4), #5(3), #5(2), #5(1), *///:~
经过调用适当的Thread构造器为Thread对象赋予具体的名称,这个名称能够经过使用GetName()的toString()中得到。
惯用法是自管理的Runnable:
public class SelfManaged implements Runnable { private int countDown = 5; private Thread t = new Thread(this);//传入当前对象 public SelfManaged() { t.start(); } public String toString() { return Thread.currentThread().getName() + "(" + countDown + "), "; } public void run() { while(true) { System.out.print(this); if(--countDown == 0) return; } } public static void main(String[] args) { for(int i = 0; i < 5; i++) new SelfManaged(); } } /* Output: Thread-0(5), Thread-0(4), Thread-0(3), Thread-0(2), Thread-0(1), Thread-1(5), Thread-1(4), Thread-1(3), Thread-1(2), Thread-1(1), Thread-2(5), Thread-2(4), Thread-2(3), Thread-2(2), Thread-2(1), Thread-3(5), Thread-3(4), Thread-3(3), Thread-3(2), Thread-3(1), Thread-4(5), Thread-4(4), Thread-4(3), Thread-4(2), Thread-4(1), *///:~
这里实现接口使得你能够继承另外一个不一样的类。
经过使用内部类来将线程代码隐藏在类中:
//: concurrency/ThreadVariations.java // Creating threads with inner classes. import java.util.concurrent.*; import static net.mindview.util.Print.*; // Using a named inner class: class InnerThread1 {//建立一个扩展自Thread的匿名内部类 private int countDown = 5; private Inner inner; private class Inner extends Thread { Inner(String name) { super(name); start(); } public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("interrupted"); } } public String toString() { return getName() + ": " + countDown; } } public InnerThread1(String name) {//建立这个内部类的实例 inner = new Inner(name); } } // Using an anonymous inner class: class InnerThread2 { private int countDown = 5; private Thread t; public InnerThread2(String name) {//可替换方式:在构造器中建立了一个匿名的Thread子类,而且将其向上转型为Thread引用t。 t = new Thread(name) { public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return getName() + ": " + countDown; } }; t.start(); } } // Using a named Runnable implementation: class InnerRunnable1 { private int countDown = 5; private Inner inner; private class Inner implements Runnable { Thread t; Inner(String name) { t = new Thread(this, name); t.start(); } public void run() { try { while (true) { print(this); if (--countDown == 0) return; TimeUnit.MILLISECONDS.sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return t.getName() + ": " + countDown; } } public InnerRunnable1(String name) { inner = new Inner(name); } } // Using an anonymous Runnable implementation: class InnerRunnable2 { private int countDown = 5; private Thread t; public InnerRunnable2(String name) { t = new Thread(new Runnable() { public void run() { try { while (true) { print(this); if (--countDown == 0) return; TimeUnit.MILLISECONDS.sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return Thread.currentThread().getName() + ": " + countDown; } }, name); t.start(); } } // A separate method to run some code as a task: class ThreadMethod {//在方法内部建立线程 private int countDown = 5; private Thread t; private String name; public ThreadMethod(String name) { this.name = name; } public void runTask() { if (t == null) { t = new Thread(name) { public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return getName() + ": " + countDown; } }; t.start(); } } } public class ThreadVariations { public static void main(String[] args) { new InnerThread1("InnerThread1"); new InnerThread2("InnerThread2"); new InnerRunnable1("InnerRunnable1"); new InnerRunnable2("InnerRunnable2"); new ThreadMethod("ThreadMethod").runTask(); } } /* (Execute to see output) *///:~
你对Thread类实际没有任何控制权。你建立任务,并经过某种方式将一个线程附着到任务上,以使得这个线程能够驱动任务。
Java的线程机制基于来自C的低级的p线程方式,这是一种你必须深刻研究,而且须要彻底理解其全部细节的方式。
一个线程能够在其余线程上调用join()方法,其效果是等待一段时间知道第二个线程结束才继续执行。
若是某个线程在另外一个线程t上调用t.join(),此线程将被挂起,知道目标线程t结束才恢复。
也能够调用join()时带上一个超时参数,这样若是目标线程在这段时间到期时尚未结束的话,join()方式总能返回。
对join()方法的调用能够被中断,作法时在调用线程上调用interrupt()方法。
//: concurrency/Joining.java // Understanding join(). import static net.mindview.util.Print.*; class Sleeper extends Thread { private int duration; public Sleeper(String name, int sleepTime) { super(name); duration = sleepTime; start(); System.out.println(name); } public void run() { try { sleep(duration); } catch(InterruptedException e) { print(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted()); return; } print(getName() + " has awakened"); } } class Joiner extends Thread { private Sleeper sleeper; public Joiner(String name, Sleeper sleeper) { super(name); this.sleeper = sleeper; start(); System.out.println(name); } public void run() { try { sleeper.join(); } catch(InterruptedException e) { print("Interrupted"); } print(getName() + " join completed"); } } public class Joining { public static void main(String[] args) { Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500); Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc", grumpy); grumpy.interrupt(); } } /* Output: Grumpy was interrupted. isInterrupted(): false Doc join completed Sleepy has awakened Dopey join completed *///:~
Joiner线程将经过在Sleeper对象上调用join()方法来等待Sleeper醒来。在main()里面,每一个Sleeper都有一个Joiner,这个能够在输出中发现,若是Sleeper被中断或者是正常结束,Joiner将和Sleeper一同结束。
使用线程的动机之一就是创建有响应的用户界面:
//: concurrency/ResponsiveUI.java // User interface responsiveness. // {RunByHand} class UnresponsiveUI { private volatile double d = 1; public UnresponsiveUI() throws Exception { while(d > 0) d = d + (Math.PI + Math.E) / d; System.in.read(); // Never gets here } } public class ResponsiveUI extends Thread { private static volatile double d = 1; public ResponsiveUI() { setDaemon(true); start(); } public void run() { while(true) { d = d + (Math.PI + Math.E) / d; } } public static void main(String[] args) throws Exception { //new UnresponsiveUI(); // Must kill this process new ResponsiveUI();//做为后台运行的同时,还在等待用户的输入 System.in.read(); System.out.println("aaaaaa"); System.out.println(d); // Shows progress } } ///:~
线程组持有一个线程集合
因为线程的本质特性,使得你不能捕获从线程中逃逸的异常。一旦异常逃出任务的run()方法,它就会向外传播到控制台,除非你采起特殊的步骤捕获这种错误的异常。
下面的程序老是会抛出异常:
//: concurrency/ExceptionThread.java // {ThrowsException} import java.util.concurrent.*; public class ExceptionThread implements Runnable { public void run() { throw new RuntimeException(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } } ///:~
在main中放入try carth并不能抓住异常:
import java.util.concurrent.*; public class NaiveExceptionHandling { public static void main(String[] args) { try { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } catch(RuntimeException ue) { // This statement will NOT execute! System.out.println("Exception has been handled!"); } } } ///:~
咱们须要修改Executor产生线程的方式。Thread.UncaughtExceptionHandler是Java SE5中的新接口,它容许在每一个Thread对象上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException()会在线程由于捕获的异常而临近死亡时被调用,为了使用它,咱们建立一个新类型ThreadFactory,它将在每一个新建立的Thread对象上附着一个Thread.UncaughtExceptionHandler:
//: concurrency/CaptureUncaughtException.java import java.util.concurrent.*; class ExceptionThread2 implements Runnable { public void run() { Thread t = Thread.currentThread(); System.out.println("run() by " + t); System.out.println( "eh = " + t.getUncaughtExceptionHandler()); throw new RuntimeException(); } } class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { System.out.println("caught " + e); } } class HandlerThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { System.out.println(this + " creating new Thread"); Thread t = new Thread(r); System.out.println("created " + t); t.setUncaughtExceptionHandler( new MyUncaughtExceptionHandler()); System.out.println( "eh = " + t.getUncaughtExceptionHandler()); return t; } } public class CaptureUncaughtException { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool( new HandlerThreadFactory()); exec.execute(new ExceptionThread2()); } } /* Output: (90% match) HandlerThreadFactory@de6ced creating new Thread created Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 run() by Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 caught java.lang.RuntimeException *///:~
在Thread类中设置一个静态域,并将这个处理器设置为默认的为捕获异常处理器:
import java.util.concurrent.*; public class SettingDefaultHandler { public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler( new MyUncaughtExceptionHandler()); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } } /* Output: caught java.lang.RuntimeException *///:~
下面的任务产生一个偶数,而其余任何消费这些数字。消费者任何惟一工做就是检查偶数的有效性。
public abstract class IntGenerator { private volatile boolean canceled = false; public abstract int next(); // Allow this to be canceled: public void cancel() { canceled = true; }//修改canceled标识 public boolean isCanceled() { return canceled; }//查看该对象是否被取消 } ///:~
import java.util.concurrent.*; public class EvenChecker implements Runnable {//消费者任务 private IntGenerator generator; private final int id; public EvenChecker(IntGenerator g, int ident) { generator = g; id = ident; } public void run() { while(!generator.isCanceled()) { int val = generator.next(); if(val % 2 != 0) {//程序将检查是不是偶数,若是是奇数,那么就是另外一个线程尚未执行完next()就调用了检查判断。 System.out.println(val + " not even!"); generator.cancel(); // Cancels all EvenCheckers } } } // Test any type of IntGenerator: public static void test(IntGenerator gp, int count) { System.out.println("Press Control-C to exit"); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < count; i++) exec.execute(new EvenChecker(gp, i)); exec.shutdown(); } // Default value for count: public static void test(IntGenerator gp) { test(gp, 10); } } ///:~
共享公共资源的任务能够观察该资源的终止信号。这能够消除所谓竞争条件,即两个或更多的任务竞争响应某个条件,所以产生的冲突或以一致结果:
public class EvenGenerator extends IntGenerator { private volatile int currentEvenValue = 0; public int next() {//一个任务可能在另外一个任务执行第一个对currentEvenValue递增操做以后,但没有执行第二个操做以前,调用next()方法。 ++currentEvenValue; // Danger point here! ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new EvenGenerator()); } } /* Output: (Sample) Press Control-C to exit 89476993 not even! 89476993 not even! *///:~
递增也不是原子操做,因此,必需要保护任务。
使用线程时的一个基本问题:你永远都不知道一个线程什么时候在运行。
对于并发,你须要某种方式来防止两个任务访问相同的资源。
防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。
基本全部并发模式在解决线程冲突问题的时候,都是采用序列化访问共享资源的方案。这意味着在给定时刻只容许一个任务访问共享资源。一般这是经过在代码前面加上一条锁语句来实现的,这就使得在一段时间内只有一个任务能够运行这段代码。由于锁语句产生了一种互相排斥的效果,全部这种机制被称为互斥量。
Java提供关键字synchronized的形式,为防止资源冲突提供了内置支持。当任务要执行被synchronized关键字保护的代码片断的时候,它将检查锁是否可用,而后获取锁,执行代码,释放锁。
要控制对共享资源的访问,得先把它包装进一个对象,而后把全部要访问这个资源的方法标记为synchronized。
synchronized void f(){}
全部对象都自动含有单一的锁(也称为监视器)。当对象上调用其任意synchronized方法的时候,此对象都被加锁。
在使用并发时,将域设置为private是很是重要的,不然,synchronized关键字就不能防止其余任务直接访问域,这样就会产生冲突。
一个任务能够获取多个锁。
JVM负责跟踪对象被加锁的次数。若是一个对象被解锁,其计数变为0。在任务第一个给对象加锁的时候,计数变为1.每当这个相同的任务在这个对象上获取锁,计数都会递增。只有首先得到锁的任务才能容许继续获取多个锁。每当任务离开一个synchronized方法,计数递减,当计数为0的时候,锁被彻底释放,此时别的任务就可使用此资源。
针对每一个类,也有一个锁,全部synchronized static方法能够在类的范围内防止对static数据的并发访问。
你应该在何时同步,能够运用Brian的同步规则:
若是你在写一个变量,它接下来将被另外一个线程读取,或者正在读取一个上一次已经被另外一个线程写过的变量,那么你必须使用同步,而且,读写线程都必须使用相同的监视器同步。
每一个访问临界共享资源的方法都必须被同步,不然它们就不会正确的工做。
public class SynchronizedEvenGenerator extends IntGenerator { private int currentEvenValue = 0; public synchronized int next() { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new SynchronizedEvenGenerator()); } } ///:~
第一个进入next()的任务将得到锁,任何试图获取锁的任务都将从其开始尝试之时被组赛,直到第一个任务释放锁。经过这种方式,任什么时候刻只有一个任务能够经过由互斥量看护的代码。
Lock对象必须被显示的建立,锁定和释放。
对于解决某些类型的问题来讲,它更灵活。
import java.util.concurrent.locks.*; public class MutexEvenGenerator extends IntGenerator { private int currentEvenValue = 0; private Lock lock = new ReentrantLock(); public int next() { lock.lock(); try { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } finally { lock.unlock(); } } public static void main(String[] args) { EvenChecker.test(new MutexEvenGenerator()); } } ///:~
添加一个被互斥调用的锁,并使用lock和unlock方法在next()内建立临界资源
当你使用synchronized关键字时,须要写的代码量更少,而且用户错误出现的可能性也会下降,所以一般只有在解决特殊问题时,才能显示使用Lock对象。例如:使用synchronized关键字不能尝试获取锁且最终获取锁会失败,或者尝试着获取锁一段时间,而后放弃它,要实现这些,你必须使用concurrent类库:
//: concurrency/AttemptLocking.java // Locks in the concurrent library allow you // to give up on trying to acquire a lock. import java.util.concurrent.*; import java.util.concurrent.locks.*; public class AttemptLocking { private ReentrantLock lock = new ReentrantLock();//ReentrantLock可让你尝试获取锁,但最终没有获取到锁 public void untimed() { boolean captured = lock.tryLock(); try { System.out.println("tryLock(): " + captured); } finally { if(captured) lock.unlock(); } } public void timed() { boolean captured = false; try { captured = lock.tryLock(2, TimeUnit.SECONDS);//尝试获取锁,在2秒后失败 } catch(InterruptedException e) { throw new RuntimeException(e); } try { System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured); } finally { if(captured) lock.unlock(); } } public static void main(String[] args) { final AttemptLocking al = new AttemptLocking(); al.untimed(); // True -- lock is available al.timed(); // True -- lock is available // Now create a separate task to grab the lock: new Thread() { { setDaemon(true); } public void run() { al.lock.lock(); System.out.println("acquired"); } }.start(); Thread.yield(); // Give the 2nd task a chance al.untimed(); // False -- lock grabbed by task al.timed(); // False -- lock grabbed by task } } /* Output: tryLock(): true tryLock(2, TimeUnit.SECONDS): true acquired tryLock(): false tryLock(2, TimeUnit.SECONDS): false *///:~
一个常不正确的知识是“原子操做不须要进行同步控制”
经过Goetz测试你就可使用原子性:若是你能够编写用于现代微处理器的高性能JVM,那么就有资格去考虑是否能够避免同步。
使用volatile关键字,就会得到(简单的赋值和返回操做)原子性。
在多处理器系统上,相对于单处理系统而言,可视性问题远比原子性问题多得多。一个任务作出的修改,即便在不中断的意义上讲是原子性的,对其余任务也多是不可视的,所以不一样的任务对应用的状态有不一样的视图。另外一方面,同步机制强制在处理系统中,一个任务作出的修改必须在应用中是可视的。若是没有同步机制,那么修改时可视将没法肯定。
volatile关键字还确保了应用中的可视性。若是你将一个域声明为volatile的,那么只要对这个域产生了写操做,那么全部的读操做就均可以看到这个修改。
原子性和易变性是不一样的概念。在非volatile域上的原子操做没必要刷新到主存中去,所以其它读取该域的任务也没必要看到这个新值。若是多个任务在同时访问某个域,那么这个域就应该是volatile的,不然,这个域应该只能经由同步来访问。同步也会致使向主存中刷新,所以若是一个域彻底由syschronized方法或语句来防御,那就没必要将其设置为是volatile的。
一个任务所做的任务写入操做对这个任务来讲都是可视的,所以若是它只须要在这个任务内部可视,那么你就不须要将其设置为volatile的。
当一个域的值依赖于它以前的值时,volatile就没法工做了。
使用volatile而不是synchronized的惟一彻底状况是类中只有一个可变域。全部,通常,你的第一选择应该是synchronized。
不要盲目的应用原子性:
import java.util.concurrent.*; public class AtomicityTest implements Runnable { private int i = 0; public int getValue() { return i; } private synchronized void evenIncrement() { i++; i++; } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); AtomicityTest at = new AtomicityTest(); exec.execute(at); while(true) { int val = at.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
尽管return i确实是原子性操做,可是缺乏同步使得其数值能够在处于不稳定的中间状态被读取。除此以外,因为i也不是volatile的,所以还存在可视性问题。
一个产生序列数的类每当nextSerialNumber()被调用时,它必须调用者返回惟一的值:
public class SerialNumberGenerator { private static volatile int serialNumber = 0; public static int nextSerialNumber() { return serialNumber++; // Not thread-safe } }
若是一个域可能会被多个任务同时访问,或者这些任务中至少有一个是写入任务,那么你就应该将这个域设置为volatile。将一个域定义为volatile,那么它就会告诉编译器不要执行任务移除读取和写入操做的优化,这些操做的目的是用线程中的局部变量维护对这个域的精确同步。
import java.util.concurrent.*; // Reuses storage so we don't run out of memory: class CircularSet { private int[] array; private int len; private int index = 0; public CircularSet(int size) { array = new int[size]; len = size; // Initialize to a value not produced // by the SerialNumberGenerator: for(int i = 0; i < size; i++) array[i] = -1; } public synchronized void add(int i) { array[index] = i; // Wrap index and write over old elements: index = ++index % len; } public synchronized boolean contains(int val) { for(int i = 0; i < len; i++) if(array[i] == val) return true; return false; } } public class SerialNumberChecker { private static final int SIZE = 10; private static CircularSet serials = new CircularSet(1000); private static ExecutorService exec = Executors.newCachedThreadPool(); static class SerialChecker implements Runnable { public void run() { while(true) { int serial = SerialNumberGenerator.nextSerialNumber(); if(serials.contains(serial)) { System.out.println("Duplicate: " + serial); System.exit(0); } serials.add(serial); } } } public static void main(String[] args) throws Exception { for(int i = 0; i < SIZE; i++) exec.execute(new SerialChecker()); // Stop after n seconds if there's an argument: if(args.length > 0) { TimeUnit.SECONDS.sleep(new Integer(args[0])); System.out.println("No duplicates detected"); System.exit(0); } } }
上面这个程序,最终会获得重复的序列数。若是要解决这个问题,须要在nextSerialNumber()前面加上synchronized关键字。
Java SE5引入了诸如AtomicIntger,AtomicLong,AtomicReference等特殊的原子性变量类,它们提供下面形式的原子性条件更新操做:
booleean compareAndSet(expectedValue,updateValue)
咱们可使用AtomicInteger来重写:
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; public class AtomicIntegerTest implements Runnable { private AtomicInteger i = new AtomicInteger(0); public int getValue() { return i.get(); } private void evenIncrement() { i.addAndGet(2); } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { new Timer().schedule(new TimerTask() { public void run() { System.err.println("Aborting"); System.exit(0); } }, 5000); // Terminate after 5 seconds ExecutorService exec = Executors.newCachedThreadPool(); AtomicIntegerTest ait = new AtomicIntegerTest(); exec.execute(ait); while(true) { int val = ait.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
只是但愿防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法。经过这种方式分离出来的代码段被称为临界区,它也使用synchronized关键字创建,synchronized被用来指定某个对象,此对象的锁被用来对花括号内代码进行同步控制:
synchronized(syncObject){}
这也被称为同步控制块;在进入此段代码前,必须获得syncObject对象的锁。若是其余线程已经获得这个锁,那么就得等到锁被释放之后,才能进入临界区。
若是把一个非保护类型的类,在其余类的保护和控制下,应用于多线程环境:
//: concurrency/CriticalSection.java // Synchronizing blocks instead of entire methods. Also // demonstrates protection of a non-thread-safe class // with a thread-safe one. import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; class Pair { // Not thread-safe private int x, y; public Pair(int x, int y) { this.x = x; this.y = y; } public Pair() { this(0, 0); } public int getX() { return x; } public int getY() { return y; } public void incrementX() { x++; } public void incrementY() { y++; } public String toString() { return "x: " + x + ", y: " + y; } public class PairValuesNotEqualException extends RuntimeException { public PairValuesNotEqualException() { super("Pair values not equal: " + Pair.this); } } // Arbitrary invariant -- both variables must be equal: public void checkState() { if(x != y) throw new PairValuesNotEqualException(); } } // Protect a Pair inside a thread-safe class: abstract class PairManager {//持有一个Pair对象,并控制一切对它的访问 AtomicInteger checkCounter = new AtomicInteger(0); protected Pair p = new Pair(); private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>()); public synchronized Pair getPair() { // Make a copy to keep the original safe: return new Pair(p.getX(), p.getY()); } // Assume this is a time consuming operation protected void store(Pair p) { storage.add(p); try { TimeUnit.MILLISECONDS.sleep(50); } catch(InterruptedException ignore) {} } public abstract void increment(); } // Synchronize the entire method: class PairManager1 extends PairManager { public synchronized void increment() { p.incrementX(); p.incrementY(); store(getPair()); } } // Use a critical section: class PairManager2 extends PairManager { public void increment() { Pair temp; synchronized(this) { p.incrementX(); p.incrementY(); temp = getPair(); } store(temp); } } class PairManipulator implements Runnable { private PairManager pm; public PairManipulator(PairManager pm) { this.pm = pm; } public void run() { while(true) pm.increment(); } public String toString() { return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get(); } } class PairChecker implements Runnable { private PairManager pm; public PairChecker(PairManager pm) { this.pm = pm; } public void run() { while(true) { pm.checkCounter.incrementAndGet(); pm.getPair().checkState(); } } } public class CriticalSection { // Test the two different approaches: static void testApproaches(PairManager pman1, PairManager pman2) { ExecutorService exec = Executors.newCachedThreadPool(); PairManipulator pm1 = new PairManipulator(pman1), pm2 = new PairManipulator(pman2); PairChecker pcheck1 = new PairChecker(pman1), pcheck2 = new PairChecker(pman2); exec.execute(pm1); exec.execute(pm2); exec.execute(pcheck1); exec.execute(pcheck2); try { TimeUnit.MILLISECONDS.sleep(500); } catch(InterruptedException e) { System.out.println("Sleep interrupted"); } System.out.println("pm1: " + pm1 + "\npm2: " + pm2); System.exit(0); } public static void main(String[] args) { PairManager pman1 = new PairManager1(), pman2 = new PairManager2(); testApproaches(pman1, pman2); } } /* Output: (Sample) pm1: Pair: x: 15, y: 15 checkCounter = 272565 pm2: Pair: x: 16, y: 16 checkCounter = 3956974 *///:~
交给你的一个非线程安全的Pair类,你须要在一个线程环境中使用它。经过建立PairManager类就能够实现,PairManager类持有一个Pair对象并控制对它的一切访问。
PairManager类结构,它的一些功能在基类中实现,而且一个或多个抽象方法在派生类中定义,这种结构在设计模式中称为模板方法。
对于PairChecker的检查频率,PairManager1.increment()不容许有PairManager2.increment()那样多。后者采用同步控制块进行控制的典型缘由:使得其余线程能更多的访问。
使用显示的Lock对象来建立临界区:
//: concurrency/ExplicitCriticalSection.java // Using explicit Lock objects to create critical sections. import java.util.concurrent.locks.*; // Synchronize the entire method: class ExplicitPairManager1 extends PairManager { private Lock lock = new ReentrantLock(); public synchronized void increment() { lock.lock(); try { p.incrementX(); p.incrementY(); store(getPair()); } finally { lock.unlock(); } } } // Use a critical section: class ExplicitPairManager2 extends PairManager { private Lock lock = new ReentrantLock(); public void increment() { Pair temp; lock.lock(); try { p.incrementX(); p.incrementY(); temp = getPair(); } finally { lock.unlock(); } store(temp); } } public class ExplicitCriticalSection { public static void main(String[] args) throws Exception { PairManager pman1 = new ExplicitPairManager1(), pman2 = new ExplicitPairManager2(); CriticalSection.testApproaches(pman1, pman2); } } /* Output: (Sample) pm1: Pair: x: 15, y: 15 checkCounter = 174035 pm2: Pair: x: 16, y: 16 checkCounter = 2608588 *///:~
synchronized块必须给定一个在其上进行同步的对象,而且最合理的方式是,使用其方法正在被调用的当前对象:synchronized(this)。
若是得到了synchronized块上的锁,那么改对象其余的synchronized方法和临界区就不能被调用了,所以,若是在this上同步,临界区的效果就会直接缩小到同步的范围内。
两个任务能够同时进入同一个对象,只要这个对象上的方法是在不一样的锁上同步的便可:
//: concurrency/SyncObject.java // Synchronizing on another object. import javax.xml.crypto.Data; import java.text.SimpleDateFormat; import java.util.Date; import static net.mindview.util.Print.*; class DualSynch { private Object syncObject = new Object(); public synchronized void f() {//同步整个方法,在this上同步。 for(int i = 0; i < 5; i++) { print("f():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date())); Thread.yield(); } } public void g() {//在syncObject对象上同步 synchronized(syncObject) { for(int i = 0; i < 5; i++) { print("g():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date())); Thread.yield(); } } } } public class SyncObject { public static void main(String[] args) { final DualSynch ds = new DualSynch(); new Thread() { public void run() { ds.f(); } }.start(); ds.g(); } } /* Output: (Sample) g():2018/05/31-10:36:32:635 f():2018/05/31-10:36:32:635 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:638 g():2018/05/31-10:36:32:638 *///:~
防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,能够为使用相同变量的每一个不一样的线程都建立不一样的存储。
//: concurrency/ThreadLocalVariableHolder.java // Automatically giving each thread its own storage. import java.util.concurrent.*; import java.util.*; class Accessor implements Runnable { private final int id; public Accessor(int idn) { id = idn; } public void run() { while(!Thread.currentThread().isInterrupted()) { ThreadLocalVariableHolder.increment(); System.out.println(this); Thread.yield(); } } public String toString() { return "#" + id + ": " + ThreadLocalVariableHolder.get(); } } public class ThreadLocalVariableHolder { private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() { private Random rand = new Random(47); protected synchronized Integer initialValue() { return rand.nextInt(10000); } }; public static void increment() { value.set(value.get() + 1); } public static int get() { return value.get(); } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Accessor(i)); TimeUnit.SECONDS.sleep(3); // Run for a while exec.shutdownNow(); // All Accessors will quit } } /* Output: (Sample) #0: 9259 #1: 556 #2: 6694 #3: 1862 #4: 962 #0: 9260 #1: 557 #2: 6695 #3: 1863 #4: 963 ... *///:~
ThreadLoca;对象一般看成静态域存储。
每一个单独的线程都被分配了本身的存储,由于它们每一个都须要跟踪本身的计数值。
下面演示一个终止问题,并且仍是一个资源共享的示例
获取天天进入公园的总人数。在公园的任何一个门口都有计数器能够递增。
//: concurrency/OrnamentalGarden.java import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; class Count { private int count = 0; private Random rand = new Random(47); // Remove the synchronized keyword to see counting fail: public synchronized int increment() { int temp = count; if(rand.nextBoolean()) // Yield half the time Thread.yield(); return (count = ++temp); } public synchronized int value() { return count; } } class Entrance implements Runnable { private static Count count = new Count(); private static List<Entrance> entrances = new ArrayList<Entrance>(); private int number = 0; // Doesn't need synchronization to read: private final int id; private static volatile boolean canceled = false; // Atomic operation on a volatile field: public static void cancel() { canceled = true; } public Entrance(int id) { this.id = id; // Keep this task in a list. Also prevents // garbage collection of dead tasks: entrances.add(this); } public void run() { while(!canceled) { synchronized(this) { ++number; } print(this + " Total: " + count.increment()); try { TimeUnit.MILLISECONDS.sleep(100); } catch(InterruptedException e) { print("sleep interrupted"); } } print("Stopping " + this); } public synchronized int getValue() { return number; } public String toString() { return "Entrance " + id + ": " + getValue(); } public static int getTotalCount() { return count.value(); } public static int sumEntrances() { int sum = 0; for(Entrance entrance : entrances) sum += entrance.getValue(); return sum; } } public class OrnamentalGarden { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Entrance(i)); // Run for a while, then stop and collect the data: TimeUnit.SECONDS.sleep(3); Entrance.cancel(); exec.shutdown(); if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) print("Some tasks were not terminated!"); print("Total: " + Entrance.getTotalCount()); print("Sum of Entrances: " + Entrance.sumEntrances()); } } /* Output: (Sample) Entrance 0: 1 Total: 1 Entrance 2: 1 Total: 3 Entrance 1: 1 Total: 2 Entrance 4: 1 Total: 5 Entrance 3: 1 Total: 4 Entrance 2: 2 Total: 6 Entrance 4: 2 Total: 7 Entrance 0: 2 Total: 8 ... Entrance 3: 29 Total: 143 Entrance 0: 29 Total: 144 Entrance 4: 29 Total: 145 Entrance 2: 30 Total: 147 Entrance 1: 30 Total: 146 Entrance 0: 30 Total: 149 Entrance 3: 30 Total: 148 Entrance 4: 30 Total: 150 Stopping Entrance 2: 30 Stopping Entrance 1: 30 Stopping Entrance 0: 30 Stopping Entrance 3: 30 Stopping Entrance 4: 30 Total: 150 Sum of Entrances: 150 *///:~
sleep()的一种状况,它使任务从执行状态变为被阻塞状态,而有时你必须终止被阻塞的任务。
一个线程能够处于如下四种状态之一:
一个任务进入阻塞状态,可能有以下缘由:
查看的问题:但愿可以终止处于阻塞状态的任务。
在任务的run()方法中间打断,更像是抛出的异常,所以在Java线程种的这种类型的异常中断种用到了异常。
Thread类包含interrupt()方法,所以你能够终止被阻塞的任务,这个方法将设置线程的中断状态。若是一个线程已经被阻塞,或者试图执行一个阻塞操做,那么设置这个线程的中断状态将抛出InterruptedException。
在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的全部线程。
使用Executor,那么经过调用submin()而不是executor()来启动任务,就能够持有改任务的上下文。submit()将返回一个泛型Future<?>,其中有一个未修饰的参数,由于你永远不会调用上面的get()——持有这种Future的关键在于你能够在其上调用cancel(),并所以可使用它来中断某个特定的任务。
Executor展现了基本的interrupt()用法:
//: concurrency/Interrupting.java // Interrupting a blocked thread. import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class SleepBlocked implements Runnable { public void run() { try { TimeUnit.SECONDS.sleep(10000); } catch (InterruptedException e) {//被中断时抛出异常 print("InterruptedException"); } print("Exiting SleepBlocked.run()"); } } class IOBlocked implements Runnable { private InputStream in; public IOBlocked(InputStream is) { in = is; } public void run() { try { print("Waiting for read():"); in.read();//不可被中断 } catch (IOException e) { if (Thread.currentThread().isInterrupted()) { print("Interrupted from blocked I/O"); } else { throw new RuntimeException(e); } } print("Exiting IOBlocked.run()"); } } class SynchronizedBlocked implements Runnable { public synchronized void f() {//不可被中断 while (true) // Never releases lock Thread.yield(); } public SynchronizedBlocked() { new Thread() { public void run() { f(); // Lock acquired by this thread } }.start(); } public void run() { print("Trying to call f()"); f(); print("Exiting SynchronizedBlocked.run()"); } } public class Interrupting { private static ExecutorService exec = Executors.newCachedThreadPool(); static void test(Runnable r) throws InterruptedException { Future<?> f = exec.submit(r); TimeUnit.MILLISECONDS.sleep(100); print("Interrupting " + r.getClass().getName()); f.cancel(true); // Interrupts if running print("Interrupt sent to " + r.getClass().getName()); } public static void main(String[] args) throws Exception { //test(new SleepBlocked()); //test(new IOBlocked(System.in)); test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3); print("Aborting with System.exit(0)"); System.exit(0); // ... since last 2 interrupts failed } } /* Output: (95% match) Interrupting SleepBlocked InterruptedException Exiting SleepBlocked.run() Interrupt sent to SleepBlocked Waiting for read(): Interrupting IOBlocked Interrupt sent to IOBlocked Trying to call f() Interrupting SynchronizedBlocked Interrupt sent to SynchronizedBlocked Aborting with System.exit(0) *///:~
你可以中断对sleep()的调用,可是不能中断正在试图获取synchronized锁或者试图执行I/O操做的线程。
对于这类问题,有一个笨拙的解决方案,即关闭任务在其上发生阻塞的底层资源:
//: concurrency/CloseResource.java // Interrupting a blocked task by // closing the underlying resource. // {RunByHand} import java.net.*; import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; public class CloseResource { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InputStream socketInput = new Socket("localhost", 8080).getInputStream(); exec.execute(new IOBlocked(socketInput));//线程中断在关闭socket的时候 exec.execute(new IOBlocked(System.in));//线程没有中断 TimeUnit.MILLISECONDS.sleep(100); print("Shutting down all threads"); exec.shutdownNow(); TimeUnit.SECONDS.sleep(1); print("Closing " + socketInput.getClass().getName()); socketInput.close(); // Releases blocked thread TimeUnit.SECONDS.sleep(1); print("Closing " + System.in.getClass().getName()); System.in.close(); // Releases blocked thread } } /* Output: (85% match) Waiting for read(): Waiting for read(): Shutting down all threads Closing java.net.SocketInputStream Interrupted from blocked I/O Exiting IOBlocked.run() Closing java.io.BufferedInputStream Exiting IOBlocked.run() *///:~
可是各类nio类提供了更人性化的I/O中断。被阻塞的nio通道回自动的响应中断:
//: concurrency/NIOInterruption.java // Interrupting a blocked NIO channel. import java.net.*; import java.nio.*; import java.nio.channels.*; import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class NIOBlocked implements Runnable { private final SocketChannel sc; public NIOBlocked(SocketChannel sc) { this.sc = sc; } public void run() { try { print("Waiting for read() in " + this); sc.read(ByteBuffer.allocate(1)); } catch(ClosedByInterruptException e) { print("ClosedByInterruptException"); } catch(AsynchronousCloseException e) { print("AsynchronousCloseException"); } catch(IOException e) { throw new RuntimeException(e); } print("Exiting NIOBlocked.run() " + this); } } public class NIOInterruption { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InetSocketAddress isa = new InetSocketAddress("localhost", 8080); SocketChannel sc1 = SocketChannel.open(isa); SocketChannel sc2 = SocketChannel.open(isa); Future<?> f = exec.submit(new NIOBlocked(sc1)); exec.execute(new NIOBlocked(sc2)); exec.shutdown(); TimeUnit.SECONDS.sleep(1); // Produce an interrupt via cancel: f.cancel(true); TimeUnit.SECONDS.sleep(1); // Release the block by closing the channel: sc2.close(); } } /* Output: (Sample) Waiting for read() in NIOBlocked@7a84e4 Waiting for read() in NIOBlocked@15c7850 ClosedByInterruptException Exiting NIOBlocked.run() NIOBlocked@15c7850 AsynchronousCloseException Exiting NIOBlocked.run() NIOBlocked@7a84e4 *///:~
若是你尝试着在一个对象上调用其synchronized方法,而这个对象的所已经被其余任务得到,那么调用任务将被挂起,直至这个锁可得到。
示例说明了同一个互斥能够如何能被同一个任务屡次得到:
import static net.mindview.util.Print.*; public class MultiLock { public synchronized void f1(int count) { if(count-- > 0) { print("f1() calling f2() with count " + count); f2(count); } } public synchronized void f2(int count) { if(count-- > 0) { print("f2() calling f1() with count " + count); f1(count); } } public static void main(String[] args) throws Exception { final MultiLock multiLock = new MultiLock(); new Thread() { public void run() { multiLock.f1(10); } }.start(); } } /* Output: f1() calling f2() with count 9 f2() calling f1() with count 8 f1() calling f2() with count 7 f2() calling f1() with count 6 f1() calling f2() with count 5 f2() calling f1() with count 4 f1() calling f2() with count 3 f2() calling f1() with count 2 f1() calling f2() with count 1 f2() calling f1() with count 0 *///:~
一个任务应该可以调用在同一个对象种的其余synchronized方法,而这个任务已经持有锁。
Java SE5并发类种添加了一个特性,即在ReentrantLock上阻塞的任务具有能够被中断的能力,这与在synchronized方法或临界区上阻塞的任务不一样:
//: concurrency/Interrupting2.java // Interrupting a task blocked with a ReentrantLock. import java.util.concurrent.*; import java.util.concurrent.locks.*; import static net.mindview.util.Print.*; class BlockedMutex { private Lock lock = new ReentrantLock(); public BlockedMutex() { // Acquire it right away, to demonstrate interruption // of a task blocked on a ReentrantLock: lock.lock();//获取锁 } public void f() { try { // This will never be available to a second task lock.lockInterruptibly(); // Special call print("lock acquired in f()"); } catch(InterruptedException e) { print("Interrupted from lock acquisition in f()"); } } } class Blocked2 implements Runnable { BlockedMutex blocked = new BlockedMutex(); public void run() { print("Waiting for f() in BlockedMutex"); blocked.f(); print("Broken out of blocked call"); } } public class Interrupting2 { public static void main(String[] args) throws Exception { Thread t = new Thread(new Blocked2()); t.start(); TimeUnit.SECONDS.sleep(1); System.out.println("Issuing t.interrupt()"); t.interrupt(); } } /* Output: Waiting for f() in BlockedMutex Issuing t.interrupt() Interrupted from lock acquisition in f() Broken out of blocked call *///:~