咱们要编写一个Socket应用,监听指定端口,实现数据包的接收和发送逻辑,这在早期系统间进行数据交互是常用的,这类接口一般须要考虑两个问题:一个是避免线程阻塞,保证接收的数据尽快处理;二是:接口的稳定性和可靠性问题,数据包很复杂,接口服务的系统也不少,一旦守候线程出现异常就会致使Socket中止,这是很是危险的,那咱们有什么办法避免吗?java
Java1.5版本之后在Thread类中增长了setUncaughtExceptionHandler方法,实现了线程异常的捕捉和处理。可能你们会有一个疑问:若是Socket应用出现了不可预测的异常是否能够自动重启呢?其实使用线程异常处理器很容易解决,咱们来看一个异常处理器应用实例,代码以下: 安全
class TcpServer implements Runnable { // 建立后即运行 public TcpServer() { Thread t = new Thread(this); t.setUncaughtExceptionHandler(new TcpServerExceptionHandler()); t.start(); } @Override public void run() { for (int i = 0; i < 3; i++) { try { Thread.sleep(1000); System.out.println("系统正常运行:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } // 抛出异常 throw new RuntimeException(); } // 异常处理器 private static class TcpServerExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { // 记录线程异常信息 System.out.println("线程" + t.getName() + " 出现异常,自行重启,请分析缘由。"); e.printStackTrace(); // 重启线程,保证业务不中断 new TcpServer(); } } }
这段代码的逻辑比较简单,在TcpServer类建立时即启动一个线程,提供TCP服务,例如接收和发送文件,具体逻辑在run方法中实现。同时,设置了该线程出现运行期异常(也就是Uncaught Exception)时,由TcpServerExceptionHandler异常处理器来处理异常。那么TcpServerExceptionHandler作什么事呢?两件事:服务器
有了这两点,TcpServer就能够稳定的运行了,即便出现异常也能自动重启,客户端代码比较简单,只须要new TcpServer()便可,运行结果以下:多线程
从运行结果上能够看出,当Thread-0出现异常时,系统自动重启了Thread-1线程,继续提供服务,大大提升了系统的性能。框架
这段程序只是一个示例程序,若要在实际环境中应用,则须要注意如下三个方面:异步
volatile关键字比较少用,缘由无外乎两点,一是在Java1.5以前该关键字在不一样的操做系统上有不一样的表现,所带来的问题就是移植性较差;并且比较难设计,并且误用较多,这也致使它的"名誉" 受损。ide
咱们知道,每一个线程都运行在栈内存中,每一个线程都有本身的工做内存(Working Memory,好比寄存器Register、高速缓冲存储器Cache等),线程的计算通常是经过工做内存进行交互的,其示意图以下图所示:工具
从示意图上咱们能够看到,线程在初始化时从主内存中加载所需的变量值到工做内存中,而后在线程运行时,若是是读取,则直接从工做内存中读取,如果写入则先写到工做内存中,以后刷新到主内存中,这是JVM的一个简答的内存模型,可是这样的结构在多线程的状况下有可能会出现问题,好比:A线程修改变量的值,也刷新到了主内存,但B、C线程在此时间内读取的仍是本线程的工做内存,也就是说它们读取的不是最"新鲜"的值,此时就出现了不一样线程持有的公共资源不一样步的状况。oop
对于此类问题有不少解决办法,好比使用synchronized同步代码块,或者使用Lock锁来解决该问题,不过,Java可使用volatile更简单地解决此类问题,好比在一个变量前加上volatile关键字,能够确保每一个线程对本地变量的访问和修改都是直接与内存交互的,而不是与本线程的工做内存交互的,保证每一个线程都能得到最"新鲜"的变量值,其示意图以下:性能
明白了volatile变量的原理,那咱们思考一下:volatile变量是否可以保证数据的同步性呢?两个线程同时修改一个volatile是否会产生脏数据呢?咱们看看下面代码:
class UnsafeThread implements Runnable { // 共享资源 private volatile int count = 0; @Override public void run() { // 增长CPU的繁忙程度,没必要关心其逻辑含义 for (int i = 0; i < 1000; i++) { Math.hypot(Math.pow(92456789, i), Math.cos(i)); } count++; } public int getCount() { return count; } }
上面的代码定义了一个多线程类,run方法的主要逻辑是共享资源count的自加运算,并且咱们还为count变量加上了volatile关键字,确保是从内存中读取和写入的,若是有多个线程运行,也就是多个线程执行count变量的自加操做,count变量会产生脏数据吗?想一想看,咱们已经为count加上了volatile关键字呀!模拟多线程的代码以下:
public static void main(String[] args) throws InterruptedException { // 理想值,并做为最大循环次数 int value = 1000; // 循环次数,防止形成无限循环或者死循环 int loops = 0; // 主线程组,用于估计活动线程数 ThreadGroup tg = Thread.currentThread().getThreadGroup(); while (loops++ < value) { // 共享资源清零 UnsafeThread ut = new UnsafeThread(); for (int i = 0; i < value; i++) { new Thread(ut).start(); } // 先等15毫秒,等待活动线程为1 do { Thread.sleep(15); } while (tg.activeCount() != 1); // 检查实际值与理论值是否一致 if (ut.getCount() != value) { // 出现线程不安全的状况 System.out.println("循环到:" + loops + " 遍,出现线程不安全的状况"); System.out.println("此时,count= " + ut.getCount()); System.exit(0); } } }
想让volatite变量"出点丑",仍是须要花点功夫的。此段程序的运行逻辑以下:
运行结果以下:
循环到:40 遍,出现线程不安全的状况
此时,count= 999
这只是一种可能的结果,每次执行都有可能产生不一样的结果。这也说明咱们的count变量没有实现数据同步,在多个线程修改的状况下,count的实际值与理论值产生了误差,直接说明了volatile关键字并不能保证线程的安全。
在解释缘由以前,咱们先说一下自加操做。count++表示的是先取出count的值而后再加1,也就是count=count+1,因此,在某个紧邻时间片断内会发生以下神奇的事情:
(1)、第一个时间片断
A线程得到执行机会,由于有关键字volatile修饰,因此它从主内存中得到count的最新值为998,接下来的事情又分为两种类型:
(2)、第二个片断
这两个时间片断执行完毕后,本来指望的结果为1000,单运行后的值为999,这表示出现了线程不安全的状况。这也是咱们要说明的:volatile关键字并不能保证线程安全,它只能保证当前线程须要该变量的值时可以得到最新的值,而不能保证线程修改的安全性。
顺便说一下,在上面的代码中,UnsafeThread类的消耗CPU计算是必须的,其目的是加剧线程的负荷,以便出现单个线程抢占整个CPU资源的情景,不然很难模拟出volatile线程不安全的状况,你们能够自行模拟测试。
多线程应用有两种实现方式,一种是实现Runnable接口,另外一种是继承Thread类,这两个方法都有缺点:run方法没有返回值,不能抛出异常(这两个缺点归根究竟是Runnable接口的缺陷,Thread类也实现了Runnable接口),若是须要知道一个线程的运行结果就须要用户自行设计,线程类自己也不能提供返回值和异常。可是从Java1.5开始引入了一个新的接口Callable,它相似于Runnable接口,实现它就能够实现多线程任务,Callable的接口定义以下:
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
实现Callable接口的类,只是代表它是一个可调用的任务,并不表示它具备多线程运算能力,仍是须要执行器来执行的,咱们先编写一个任务类,代码以下:
//税款计算器 class TaxCalculator implements Callable<Integer> { // 本金 private int seedMoney; // 接收主线程提供的参数 public TaxCalculator(int _seedMoney) { seedMoney = _seedMoney; } @Override public Integer call() throws Exception { // 复杂计算,运行一次须要2秒 TimeUnit.MILLISECONDS.sleep(2000); return seedMoney / 10; } }
这里模拟了一个复杂运算:税款计算器,该运算可能要花费10秒钟的时间,此时不能让用户一直等着吧,须要给用户输出点什么,让用户知道系统还在运行,这也是系统友好性的体现:用户输入即有输出,若耗时较长,则显示运算进度。若是咱们直接计算,就只有一个main线程,是不可能有友好提示的,若是税金不计算完毕,也不会执行后续动做,因此此时最好的办法就是重启一个线程来运算,让main线程作进度提示,代码以下:
public static void main(String[] args) throws InterruptedException, ExecutionException { // 生成一个单线程的异步执行器 ExecutorService es = Executors.newSingleThreadExecutor(); // 线程执行后的指望值 Future<Integer> future = es.submit(new TaxCalculator(100)); while (!future.isDone()) { // 尚未运算完成,等待200毫秒 TimeUnit.MICROSECONDS.sleep(200); // 输出进度符号 System.out.print("*"); } System.out.println("\n计算完成,税金是:" + future.get() + " 元 "); es.shutdown(); }
在这段代码中,Executors是一个静态工具类,提供了异步执行器的建立能力,如单线程异步执行器newSingleThreadExecutor、固定线程数量的执行器newFixedThreadPool等,通常它是异步计算的入口类。future关注的是线程执行后的结果,好比没有运行完毕,执行结果是多少等。此段代码的运行结果以下所示:
**********************************************......
计算完成,税金是:10 元
执行时,"*"会依次递增,表示系统正在运算,为用户提供了运算进度,此类异步计算的好处是:
在Java1.5以前,实现多线程比较麻烦,须要本身启动线程,并关注同步资源,防止出现线程死锁等问题,在1.5版本以后引入了并行计算框架,大大简化了多线程开发。咱们知道一个线程有五个状态:新建状态(NEW)、可运行状态(Runnable,也叫做运行状态)、阻塞状态(Blocked)、等待状态(Waiting)、结束状态(Terminated),线程的状态只能由新建转变为了运行状态后才能被阻塞或等待,最后终结,不可能产生本末倒置的状况,好比把一个结束状态的线程转变为新建状态,则会出现异常,例如以下代码会抛出异常:
public static void main(String[] args) throws InterruptedException { // 建立一个线程,新建状态 Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println("线程正在运行"); } }); // 运行状态 t.start(); // 是不是运行状态,若不是则等待10毫秒 while (!t.getState().equals(Thread.State.TERMINATED)) { TimeUnit.MICROSECONDS.sleep(10); } // 直接由结束转变为云心态 t.start(); }
此段程序运行时会报java.lang.IllegalThreadStateException异常,缘由就是不能从结束状态直接转变为运行状态,咱们知道一个线程的运行时间分为3部分:T1为线程启动时间,T2为线程的运行时间,T3为线程销毁时间,若是一个线程不能被重复使用,每次建立一个线程都须要通过启动、运行、销毁时间,这势必增大系统的响应时间,有没有更好的办法下降线程的运行时间呢?
T2是没法避免的,只有经过优化代码来实现下降运行时间。T1和T2均可以经过线程池(Thread Pool)来缩减时间,好比在容器(或系统)启动时,建立足够多的线程,当容器(或系统)须要时直接从线程池中得到线程,运算出结果,再把线程返回到线程池中___ExecutorService就是实现了线程池的执行器,咱们来看一个示例代码:
public static void main(String[] args) throws InterruptedException { // 2个线程的线程池 ExecutorService es = Executors.newFixedThreadPool(2); // 屡次执行线程体 for (int i = 0; i < 4; i++) { es.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } // 关闭执行器 es.shutdown(); }
此段代码首先建立了一个包含两个线程的线程池,而后在线程池中屡次运行线程体,输出运行时的线程名称,结果以下:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
本次代码执行了4遍线程体,按照咱们以前阐述的" 一个线程不可能从结束状态转变为可运行状态 ",那为何此处的2个线程能够反复使用呢?这就是咱们要搞清楚的重点。
线程池涉及如下几个名词:
咱们首先从线程池的建立提及,Executors.newFixedThreadPool(2)表示建立一个具备两个线程的线程池,源代码以下:
public class Executors { //生成一个最大为nThreads的线程池执行器 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } }
这里使用了LinkedBlockingQueue做为队列任务管理器,全部等待处理的任务都会放在该对列中,须要注意的是,此队列是一个阻塞式的单端队列。线程池创建好了,那就须要线程在其中运行了,线程池中的线程是在submit第一次提交任务时创建的,代码以下:
public Future<?> submit(Runnable task) { //检查任务是否为null if (task == null) throw new NullPointerException(); //把Runnable任务包装成具备返回值的任务对象,不过此时并无执行,只是包装 RunnableFuture<Object> ftask = newTaskFor(task, null); //执行此任务 execute(ftask); //返回任务预期执行结果 return ftask; }
此处的代码关键是execute方法,它实现了三个职责。
其中此处的关键是工做线程的建立,它也是经过new Thread方式建立的一个线程,只是它建立的并非咱们的任务线程(虽然咱们的任务实现了Runnable接口,但它只是起了一个标志性的做用),而是通过包装的Worker线程,代码以下:
private final class Worker implements Runnable { // 运行一次任务 private void runTask(Runnable task) { /* 这里的task才是咱们自定义实现Runnable接口的任务 */ task.run(); /* 该方法其它代码略 */ } // 工做线程也是线程,必须实现run方法 public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } // 任务队列中得到任务 Runnable getTask() { /* 其它代码略 */ for (;;) { return r = workQueue.take(); } } }
此处为示意代码,删除了大量的判断条件和锁资源。execute方法是经过Worker类启动的一个工做线程,执行的是咱们的第一个任务,而后改线程经过getTask方法从任务队列中获取任务,以后再继续执行,但问题是任务队列是一个BlockingQuene,是阻塞式的,也就是说若是该队列的元素为0,则保持等待状态,直到有任务进入为止,咱们来看LinkedBlockingQuene的take方法,代码以下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { try { // 若是队列中的元素为0,则等待 while (count.get() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to a non-interrupted thread throw ie; } // 等待状态结束,弹出头元素 x = extract(); c = count.getAndDecrement(); // 若是队列数量还多于一个,唤醒其它线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); // 返回头元素 return x; }
分析到这里,咱们就明白了线程池的建立过程:建立一个阻塞队列以容纳任务,在第一次执行任务时建立作够多的线程(不超过许可线程数),并处理任务,以后每一个工做线程自行从任务对列中得到任务,直到任务队列中的任务数量为0为止,此时,线程将处于等待状态,一旦有任务再加入到队列中,即召唤醒工做线程进行处理,实现线程的可复用性。
使用线程池减小的是线程的建立和销毁时间,这对于多线程应用来讲很是有帮助,好比咱们经常使用的Servlet容器,每次请求处理的都是一个线程,若是不采用线程池技术,每次请求都会从新建立一个新的线程,这会致使系统的性能符合加大,响应效率降低,下降了系统的友好性。