java从诞生开始就明智的选择了内置对多线程的支持,这使得java语言相比同一时期的其余语言具备明显的优点。线程做为操做系统调度的最小单元,多个线程可以同时执行,这将显著提高程序的性能,在多核环境中表现的更加明显。可是,过多的建立线程和对线程的不当管理也容易形成问题。本章将着重介绍java并发编程的基础知识,从启动一个线程到线程间不一样的通讯方式,最后经过简单的线程池示例以及应用(简单的Web服务器)来串联本章所介绍的内容。html
1.线程简介java
1.1 什么是线程web
现代操做系统中在运行一个程序时,会为其建立一个进程。例如,启动一个java程序,操做系统就会建立一个java进程。如今操做系统调度的最小单元就是线程,也叫轻量级进程(Light Weight Process),在一个进程中能够建立多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,而且可以访问共享的内存变量。处理器在这些线程上高速切换,让使用者感受到这些线程在同时进行。算法
一个java程序从main()方法开始执行,而后按照既定的代码逻辑执行,看似没有其余线程参与,但实际上java程序天生就是多线程程序,由于执行main()方法的是一个名称为main的线程,下面使用JMX来查看一个普通的java程序包含哪些线程sql
public static void main(String[] args) { //使用java线程管理MXbean ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); //不须要获取同步的monitor和synchronizer信息,进获取线程和线程堆栈信息 ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); for (ThreadInfo threadInfo : threadInfos) { System.out.println("["+threadInfo.getThreadId()+"]"+threadInfo.getThreadName()); } } // [7]JDWP Command Reader // [6]JDWP Event Helper Thread // [5]JDWP Transport Listener: dt_socket // [4]Signal Dispatcher 分发处理发送给jvm信号的线程 // [3]Finalizer 调用对象finalize方法的线程 // [2]Reference Handler 清除reference 的线程 // [1]main main线程,用户程序入口
能够看到,一个java程序的运行不只仅是面()方法的运行,而是main线程和多个其余线程的同时执行。数据库
1.2 为何要使用多线程编程
执行一个简单的“Hello word!”,却启动了那么多“无关”线程,是否是把简单的问题复杂化了?固然不是,由于真确的使用多线程,总可以给开发人员带来显著的好处,而使用多线程的缘由主要有如下几点:浏览器
1.更多的处理器核心tomcat
随着处理器上的核心数量愈来愈多,以及超线程技术的普遍运用,如今大多数计算机都比以往更加擅长并行计算,而处理器性能的提高方式,也从更高的主频向更多的核心发展。如何利用好处理器上的核心也成了如今的主要问题安全
线程是大多数操做系统调度的基本单元,一个程序做为一个进程来运行,程序运行过程当中可以建立多个线程,而一个线程在一个时刻只能运行在一个处理器核心上。试想一下,一个单线程程序在运行是只能使用一个处理器核心,那么再多的处理器核心加入也没法显著提高该程序的执行效率。相反,若是改程序使用多线程技术,将计算逻辑分配到多个处理器核心上,就会显著减小程序的处理时间,而且随着更多处理器核心的加入而变得更有效率。
2.更快的响应时间
有事咱们会编写一些较为复杂的代码(这的复杂代码不是说复杂的算法,而是复杂的业务逻辑)。例如,一笔订单的建立,他包括插入订单数据、生成订单快照、发送邮件通知买家和记录货品销售数量等。用户从单击“订购”按钮开始,就要等待这些操做所有完成才能看到定购成功的结果。可是这么多业务操做,如何可以让其跟快的完成呢?
在上面的场景中,可使用多线程技术,即将数据一致性不强的操做派发个其余线程处理(也可使用消息队列),如生成订单快照、发送邮件等这样作的好处就是想用用户请求的线程可以尽量的处理完成,缩短了响应时间,提高了用户体验。
3.更好的编程模型
java为多线程模型提供了良好、考究而且一致的编程模型,是开发人员可以更加专一于问题的解决,即为所遇到的问题创建合适的模型,而不是绞尽脑汁的考虑如何将其多线程化。一旦开发人员创建好模型,稍作修改老是可以方便的映射到java提供的多线程编程模型上。
1.3 线程优先级
如今操做系统基本采用时分的形式调度运行的线程,操做系统会分出一个时间片,线程会分配到若干时间片,当线程的时间片用完了就发生线程的调度,并等待下次分配。线程分配到的时间片多少也就决定了线程使用处理器资源的多少,而线程优先级就是决定线程须要多或者少分配一些处理资源的线程属性。
在java线程中,经过一个整型成员变量priority来控制优先级,优先级的范围1~10,默认是5,在线程构建的时候能够经过setPriority(int)方法来修改优先级,优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级是,针对频繁阻塞(休眠或者I/O操做)的现场须要设置较高的优先级,而偏重计算(须要较多cpu时间或者片预算)的线程则设置较低的优先级,确保处理器不会被独占。在不一样jvm以及操做系统上,线程规划存在差别,有些操做系统甚至会忽略线程优先级的设定。
public class Priority { private static volatile boolean notStart = true; private static volatile boolean notEnd = true; public static void main(String[] args) throws InterruptedException { ArrayList<Job> jobs = new ArrayList<>(); for (int i = 0; i < 10; i++) { int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY; Job job = new Job(priority); jobs.add(job); Thread thread = new Thread(job,"thread:"+i); thread.setPriority(priority); thread.start(); } notStart = false; TimeUnit.SECONDS.sleep(10); notEnd = false; for (Job job : jobs) { System.out.println("job priority :"+job.priority+",count : "+job.jobCount); } } static class Job implements Runnable{ private int priority; private long jobCount; public Job(int priority) { this.priority = priority; } @Override public void run() { while (notStart){ Thread.yield(); } while (notEnd){ Thread.yield(); jobCount++; } } } }
job priority :1,count : 9225351
job priority :1,count : 9103654
job priority :1,count : 9276681
job priority :1,count : 9197398
job priority :1,count : 9292870
job priority :10,count : 9330619
job priority :10,count : 9238517
job priority :10,count : 9389986
job priority :10,count : 9340567
job priority :10,count : 8984817
从输出能够看到线程的优先级没有生效,优先级1和优先级10的Job计数器的结果很是先进,没有明显差距,这表示程序正确性不能依赖优先级高低。
1.4 线程的状态
java线程在运行的生命周期可能处于的6中状态,在给定的一个时刻,线程只能处于其中的一个状态
public class ThreadState { public static void main(String[] args) { new Thread(new TIMEWaiting(),"timeWatingThread").start(); new Thread(new Waiting(),"watingThread").start(); //使用两个block线程,一个获取锁成功,另外一个阻塞 new Thread(new Blocked(),"blockedThread-1").start(); new Thread(new Blocked(),"blockedThread-2").start(); } //该线程不断地进行睡 static class TIMEWaiting implements Runnable{ @Override public void run() { while (true){ SleepUtils.second(100); } } } // 该线程在 waiting。class实例上等代 static class Waiting implements Runnable{ @Override public void run() { while (true){ synchronized (Waiting.class){ try { Waiting.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } // 改线成在Blocked。class 实例上加锁后,不会释放锁 static class Blocked implements Runnable{ @Override public void run() { synchronized (Blocked.class){ while (true){ SleepUtils.second(100); } } } } static class SleepUtils{ public static final void second(long seconds){ try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行改实例代开中断或者命令提示符 输入 jps
4067 Launcher
4068 ThreadState
4103 Jps
3983
jstack PID
线程建立以后,调用start()方法开始执行。当线程执行wait()方法以后,线程进入等待状态。进入等待状态的线程须要依靠其余线程的通知才可以返回到运行状态,而超时等待状态至关于在等待方法的基础上增长了超时限制,也就是超时时间到达将会返回运行状态。当线程调用同步方法时,在没有获取到锁的状况下,线程将会进入到阻塞状态,线程在执行Runnable的run() 方法以后将进入到终止状态,
1.5 Daemon线程
Daemon线程是一种支持线程,由于它主要被用做程序中后台调度以及支持性工做,这意味着,当一个java虚拟机中不存在非Daemon线程的时候,java虚拟机将会推出,能够经过Thread.setDeamon(true)将线程设置为Daemon线程,当java虚拟机退出时,Daemon线程中的finally块并不必定执行
public class Daemon { public static void main(String[] args) { Thread thread = new Thread(new DaemonRunner(), "daemonRunner"); thread.setDaemon(true); thread.start(); } static class DaemonRunner implements Runnable{ @Override public void run() { try { ThreadState.SleepUtils.second(10); }catch (Exception e){ System.out.println("daemonThread finally run ."); } } } }
运行Daemon程序,能够看到在终端或者命令符上没有任何的输出。main线程(非Daemon线程)在启动了DaemonRunner以后随着main方法执行完毕而终止,而此时java虚拟机中已经没有非Daemon线程,虚拟机须要退出。java虚拟机中的全部Daemon线程都须要当即终止,所以DaemonRunner当即终止,可是DaemonRunner中的finally块并无执行。
在建立Daemon线程时,不能依靠Finally快中的内容来确保执行关闭或清理资源的逻辑
2.启动和终止线程
2.1 构造线程
在运行线程以前手下要构造一个线程对象,线程对象在构造的时候须要提供线程所须要的属性,如线程所属的线程组,线程优先级,是不是Daemon线程等信息
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { if (name == null) { throw new NullPointerException("name cannot be null"); } this.name = name; //当前线程就是该线程的父线程 Thread parent = currentThread(); this.group = g; //将Daemon。priority属性设置为父线程的对应属性 this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext(); this.target = target; setPriority(priority); // 将父线程的inheritThreadLocal复制过来 if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* 分配一个线程id */ tid = nextThreadID(); }
一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程继承了parent是否为Daemon、优先级和加载资源的contenxtClassLoader以及可继承的ThreadLocal,同时还会分配一个惟一id来标识这个Child线程。至此一个可以运行的线程对象就初始化好了,在堆内存中等待运行。
2.2 启动线程
线程对象在初始化完成以后,调用start()方法就能够启动这个线程。线程Start()方法的含义是:当前线程(即parent线程同步告知java虚拟机,只要线程划器空闲,应当即启动调用start()方法的线程)
2.3 理解中断
中断能够理解为线程的一个标识位属性,他表示一个运行中的线程是否被其余线程进行了中断,中断比如其余线程对该线程打了一个招呼,其余线程经过调用该线程的interupt()方法对其进行了中断操做。
线程经过检查自身时候被中断来进行响应,线程经过方法isInterrupted()来进行判断是否被中断,也能够调用静态方法Thread.interrupted()对当前线程的中断标识位进行复位,若是该线程已经处于终结状态,即便线程被中断过,即便该线程被中断过,在调用该线程对象的isInterrupted()时依旧会返回false。
从java的API中能够看到,许多声明抛出InterruptedException的方法(例如Thread。sleep(Long millis))这些方法在抛出InterruptedException以前,java虚拟机会想讲线程的中断标识位清除,而后抛出InterruptedException,此时调用IsIterrupted()方法将会返回false。
public class Interrupted { public static void main(String[] args) throws InterruptedException { //SleepThread不停的尝试失眠 Thread sleepThread = new Thread(new SleepRunner(), "SleepThread"); sleepThread.setDaemon(true); //busyThread 不停的运行 Thread busyThread = new Thread(new BusyRunner(), "busyThread"); busyThread.setDaemon(true); sleepThread.start(); busyThread.start(); //休眠5秒,让sleepThread和busyThread充分运行 TimeUnit.SECONDS.sleep(5); sleepThread.interrupt(); busyThread.interrupt(); System.out.println("sleepThread interrupted is:"+sleepThread.isInterrupted()); System.out.println("busyThread interrupted is:"+busyThread.isInterrupted()); SleepUtils.second(2); } static class SleepRunner implements Runnable{ @Override public void run() { while (true){ SleepUtils.second(10); } } } static class BusyRunner implements Runnable{ @Override public void run() { while (true){} } } } //输出结果 sleepThread interrupted is:false busyThread interrupted is:true java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at 多线程并发的艺术.并发编程的挑战1.SleepUtils.second(SleepUtils.java:25) at 多线程并发的艺术.并发编程的挑战1.Interrupted$SleepRunner.run(Interrupted.java:45) at java.lang.Thread.run(Thread.java:748)
从结果能够看出,抛出InterruptedException 的线程SleepThread,其中断标识位被清除了,而一直忙碌运做的线程BusyThread,中断标识位没有被清除
2.4 过时的suspend()、resume()、stop()
你们对于CD机确定不陌生,若是把它播放音乐比做一个线程运做,那么对音乐的播放作出暂停、恢复和中止操做对应的线程Thread的API的supend()/resume()/和stop()。
public class Deprecated { public static void main(String[] args) throws InterruptedException { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); Thread thread = new Thread(new Runner(), "printThread"); thread.setDaemon(true); thread.start(); TimeUnit.SECONDS.sleep(3); //将thread 进行暂停,输出内容工做中止 thread.suspend(); System.out.println("main supend thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); thread.resume(); System.out.println("main resume thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); thread.stop(); System.out.println("main stop thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); } static class Runner implements Runnable{ @Override public void run() { DateFormat format = new SimpleDateFormat("HH:mm:ss"); while (true){ System.out.println(Thread.currentThread().getName()+"run at"+format.format(new Date())); SleepUtils.second(1); } } } } printThreadrun at22:17:37 printThreadrun at22:17:38 printThreadrun at22:17:39 main supend thread at 22:17:40 main resume thread at 22:17:43 printThreadrun at22:17:43 printThreadrun at22:17:44 printThreadrun at22:17:45 main stop thread at 22:17:46
不建议使用的缘由有:以suspend()方法为例,在调用后,线程不会释放已经占有的资源(好比锁),而是占有着资源进入睡眠状态,这样容易已发死锁问题。一样,stop()方法在终结一个线程时不会保证线程的资源正常释放,一般是没有给予线程完成资源释放工做的机会,所以会致使程序可能在不肯定状态下。
2.5 安全的终止线程
2.3中提到的中断状态是线程的一个标识位,而中断操做是一种简便的线程间交互方式,而这种交互方式最适合用来取消或中止任务。除了中断之外,还能够利用一个Boolean变量来控制是否须要中止任务并终止任务
public class Shutdown {
public static void main(String[] args) throws InterruptedException { Runner one = new Runner(); Thread countThread = new Thread(one, "countThread"); countThread.start(); //睡眠一秒 mian 线程对CountThread 进行中断,使countThread可以感知中断而结束 TimeUnit.SECONDS.sleep(1); countThread.interrupt(); Runner two = new Runner(); countThread = new Thread(two, "countThread"); countThread.start(); TimeUnit.SECONDS.sleep(1); two.cancel(); } private static class Runner implements Runnable{ private long i; private volatile boolean on = true; @Override public void run() { while(on && !Thread.currentThread().isInterrupted()){ i++; } System.out.println("Count i ="+i); } public void cancel(){ on = false; } } } Count i =753158205
Count i =769424811
示例在执行过程当中,main线程经过中断操做和cancel()方法都可使CountThread线程终止,这种经过标识或者中断操做的方式可以使线程在终止时有机会去清理资源,而不是武断色将线程中止,所以这种终止线程的作法显得更加安全优雅。
3.线程间通讯
线程开始运行,拥有本身的栈空间,就如同一个脚本同样,按照既定的代码一步一步的执行,直至终止,可是每个运行的线程,若是多个线程可以相互配合完成工做,这将会带来巨大价值。
3.1 volatile和synchronized关键字
java支持多个线程同时访问一个对象或者对象的成员变量,因为每一个线程能够拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,可是每一个执行的线程仍是能够拥有一份拷贝,这样作的目的是加速程序的执行,这是现代多核处理器的一个显著特性),因此程序在执行过程当中,一个线程看到的变量并不必定是最新的。
关键字volatile能够用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需哟啊从共享内存中获取,而对他的改变必须同步刷新回共享内存,他能保证全部线程对变量访问的可见性。
举个列子,定义一个表示程序是否运行的成员变量Boolean on= true,那么另外一线程能对他执行关闭动做(on = false),这里涉及多个线程对变量的访问,所以须要将其定义成为 volatile Boolean on= true,这样其余线程对他进行改变时,可让全部线程感知到变化,由于全部对on变量的访问和修改都须要以共享内存为准,可是,过多的使用volatile是没必要要的,由于他会下降程序的执行效率。
关键字synchronized能够修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
public class Synchronized { public static void main(String[] args) { //对Synchronized class对象进行加锁 synchronized (Synchronized.class){} m(); } private static synchronized void m() { } }
public static void main(java.lang.String[]); descriptor: ([Ljava/lang/String;)V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=3, args_size=1 0: ldc #2 // class 多线程并发的艺术/并发编程的挑战1/Synchronized 2: dup 3: astore_1 4: monitorenter //监视器进入,获取锁 5: aload_1 6: monitorexit //监视器退出,释放锁 7: goto 15 10: astore_2 11: aload_1 12: monitorexit 13: aload_2 14: athrow 15: invokestatic #3 // Method m:()V 18: return public static synchronized void m(); descriptor: ()V
//方法修饰符 表示 public static synchronized void m()
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED Code: stack=0, locals=0, args_size=0 0: return
上面class信息中,对于同步代码块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。不管采用哪一种方式,其本质是对一个对象的监视器(moniter)进行获取,而这个获取过程实排他的,也就是同一时刻只能有一个线程获取到由synchronized锁保护对象的监视器。
任意一个对象都拥有本身的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞再同步代码块和同步方法的入口处,进入BLOCKED状态。
该图能够看出,任意线程对Object的访问,首先要得到Object的监视器,若是获取失败,该线程就进入同步状态,线程状态变为BLOCKED,当Object的监视器占有者释放后,在同步队列中得线程就会有机会从新获取该监视器。
3.2 等待/通知机制
一个线程修改了一个对象的值,而另外一线程感知到了变化,而后进行相应的操做,整个过程开始于一个线程,而最终执行又是另外一个线程。前者是生产者,后者就是消费者,这种模式隔离了“作什么(what)”和“怎么作(How)”,在功能层面上实现了解耦,体系结构上具有了良好的伸缩性,可是在java语言中如何实现相似的功能呢?
简单的方法是让消费者线程不断地循环检查变量是否符合预期,以下面代码所示,在while循环中设置不知足的条件,若是条件知足则退出while循环,从而完成消费者的工做.
while (value != desire){ Thread.sleep(1000); } doSomething();
上面这段伪代码在条件不知足时就睡眠一段时间,这样作的目的是为了防止过快的"无效"尝试,这种方式看似可以解实现所需的功能,可是存在的以下问题。
1)难以确保及时性。在睡眠时,基本不消耗处理器资源,可是若是睡的太久,就不能及时发现条件已经变化,也就是及时性难以保证。
2)难以下降开销。若是下降睡眠时间,好比休眠1毫秒,这样消费者能更加迅速地发现条件的变化,可是却可能消耗更多的处理器资源,形成了无故的浪费。
以上两个问题,看似矛盾难以调和,可是java经过内置的等待/通知机制可以很好地解决这个矛盾并实现了所需的工能。
等待/通知的相关方法是任意java对象都具有的,由于这些方法被定义在全部对象的超类java.lang.object上。
等待/通知机制,是指一个线程A调用对象O的wait()方法进入了等待状态,而另外一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操做。上述两个线程经过对象O来完成交互,而对象的wait()和notify()/notifyAll()的关系就如同开关信号同样,用来完成等待方和通知方之间的交互工做
穿建了两个线程——WaiThread和NotifyThread,前者检查flag值是否为false,若是符合要求,进行后续操做,不然在lock上等待,后者在睡眠了一段时间后对lock进行通知
public class WaitNotify { static boolean flag = true; static Object lock = new Object(); public static void main(String[] args) { Thread waitThread = new Thread(new Wait(), "waitThread"); waitThread.start(); SleepUtils.second(1); Thread notifyThread = new Thread(new Notify(), "notifyThread"); notifyThread.start(); } static class Wait implements Runnable{ @Override public void run() { //加锁 拥有lock的 Moniter synchronized (lock){ //当条件不知足时,继续wait同时释放lock锁 while (flag){ try { System.out.println(Thread.currentThread()+ "flag is true. wait@"+new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //条件知足时,完成工做 System.out.println(Thread.currentThread()+"flag is false. running @"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); } } } static class Notify implements Runnable{ @Override public void run() { //加锁 拥有lock的 Moniter synchronized (lock){ //获取lock的锁,而后进行通知,通知时不会释放lock的锁 //直到当前线程释放了lock后,waitThread才能从wait方法中返回 System.out.println(Thread.currentThread()+"hold lock .notify @"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.notifyAll(); flag = false; SleepUtils.second(5); } //再次加锁 synchronized (lock){ System.out.println(Thread.currentThread()+"hold lock again. sleep@"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); SleepUtils.second(5); } } } }
Thread[waitThread,5,main]flag is true. wait@20:38:13
Thread[notifyThread,5,main]hold lock .notify @20:38:14
Thread[notifyThread,5,main]hold lock again. sleep@20:38:19
Thread[waitThread,5,main]flag is false. running @20:38:24
上述第3行和第4行输出的顺序可能会互换,而上述例子主要说明了调用wait(),notify()以及notifyAll()是须要注意的细节,以下:
1)使用wait(),notify()和notifyAll()时须要先对调用对象加锁。
2)使用wait()方法后,线程的状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
3)notify()和notifuAll()方法调用后,等待线程依旧不会从wait()返回,须要调用notify()或notifyAll()的线程释放锁以后,等待线程才有机会wait()返回。
4)notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而totifyAll()方法则是将等待队列中全部线程所有移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
5)从wait()方法返回的前提是得到了调用对象的锁。
从上述细节中能够看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从wait()方法返回时可以感知到通知线程对变量作出的修改、
WaitThread首先获取了对象锁,而后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。因为WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用了对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁以后,WaitThread再次获取到锁并从wait()方法返回继续执行。
3.3 等待/通知经典案例
从3.2中的WaitNotify实例中能够提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知这(生产者)
等待者遵循以下原则
1)获取对象锁
2)若是条件不知足,那么调用对象的wait()方法,被通知后仍要检查条件
3)条件知足则执行对应的逻辑
对应的伪代码
synchronized(对象){
while(条件不知足){
对象.wait();
}
对应的处理逻辑
}
通知方遵循以下原则
1)获取对象锁
2)改变条件
3)通知全部等待的对象的线程
synchronized(对象){
改变条件
对象.notifyAll();
}
3.4 管道输入/输出流
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不一样之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了以下4种具体的实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面相字节,然后两中面相字符
建立了printThread,它用来接受main线程的输入,任何main下城的输入都可经过pipedWriter写入,而printThread在另外一端经过oioedReader将内容读出并打印。
public class Piped { public static void main(String[] args) throws IOException { PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); //将输出流和输入流进行链接,不然在使用时会抛出 IOException out.connect(in); Thread printThread = new Thread(new Print(in), "printThread"); printThread.start(); int receive = 0; try { while ((receive = System.in.read())!= -1){ out.write(receive); } }finally { out.close(); } } static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in) { this.in = in; } @Override public void run() { int receive = 0; try { while ((receive = in.read() )!= -1){ System.out.println((char) receive); } } catch (IOException e) { e.printStackTrace(); } } } }
运行该示例,输入一组字符串,能够看到被printThread进行了原样输出
Repeat my words
Repeat my words
对于piped类型的流,必须先要进行绑定,也就是调用connect()方法,若是没有将输入/输出流绑定起来,对于该流的访问将会抛出异常。
3.5 Thread.jion()
若是线程A 执行了thread.jion()语句,其含义是:当前线程A等待thread线程终止以后才从thread.jion() 返回。线程Thread除了提供jion()方法以外,还提供了jion(long millis)和jion(long millis,int nanos)两个具有超时时间里没有终止,那么将会从该超时方法中返回。
在代码示例中,建立了10个线程,编号0~9,每一个线程调用前一个线程的join()方法,也就是线程0结束了,线程1才能从jion()方法中返回,而线程0须要等待main线程结束
public class Jion { public static void main(String[] args) throws InterruptedException { Thread previous = Thread.currentThread(); for (int i = 0; i < 10; i++) { //每一个线程拥有前一个线程的引用,须要等待前一个线程终止,才能从等待中返回 Thread thread = new Thread(new Domino(previous), String.valueOf(i)); thread.start(); previous = thread; } TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName()+"terminate"); } static class Domino implements Runnable{ private Thread thread; public Domino(Thread thread) { this.thread = thread; } @Override public void run() { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"terminate."); } } }
mainterminate
0terminate.
1terminate.
2terminate.
3terminate.
4terminate.
5terminate.
6terminate.
7terminate.
8terminate.
9terminate.
从上述输出能够看到,每个线程终止的前提是前驱线程的终止,每一个线程等待前驱线程终止后,才从join()方法返回,这里涉及到了等待/通知机制(等待前驱线程结束,接收前驱线程结束通知)
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
当线程终止时,会调用线程自身的notifyAll()方法,会通知全部等待在该线程对象上的线程。能够看到join()方法的逻辑结构与3.3节中描述的等待/通知经典范式一致,即加锁、循环和处理逻辑3个步骤。
3.6 ThreadLocal的使用
ThreadLocal,即线程变量,是一个以threadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程能够根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。
能够经过set(T)方法来设置一个值,在当前线程下再经过get()方法获取到原先设置的值。
在代码清单4-15所示的例子中,构建一个经常使用的profiler类。它具备begin()和end()两个方法,而end()方法返回begin()方法调用开始到end()方法调用湿的时间差,单位是毫秒。
public class Profiler { private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){ @Override protected Long initialValue(){ return System.currentTimeMillis(); } }; public static final void begin(){ TIME_THREADLOCAL.set(System.currentTimeMillis()); } public static final Long end(){ return System.currentTimeMillis()-TIME_THREADLOCAL.get(); } public static void main(String[] args) throws Exception { begin(); TimeUnit.SECONDS.sleep(1); System.out.println("cost: "+end()+"mills"); } }
cost: 1004mills
Profiler能够被复用在方法调用耗时统计的功能上,再方法的入口前执行begin()方法,在方法调用后end() 方法,好处就是两个方法的调用不用在一个方法或者类中,好比在AOP(面向方面编程)中,能够在方法调用前的切入点执行begin()方法,而在方法调用后的切入点执行end()方法,这样依旧能够得到方法的执行耗时。
4.线程应用实例
4.1 等待超时模式
开发人员常常会遇到这样的方法调用经常使用场景:调用一个方法时等待一段时间(通常来讲是给定一个时间段),若是该方法可以在给定的时间段以内获得结果,那么将结果马上返回,反之,超时返回默认结果。
前章介绍了等待、通知的经典范式,即加锁、条件循环和处理逻辑3个步骤,而这种范式没法作到超时等待。而超时等待的加入,只须要对经典范式作出很是小的改动,改动内容以下:
假设超时时间段是T,那么能够推断出在当前时间now+T以后就会超时。
定义变量
1.等待持续时间 REMAINING=T
2.超时时间:FUTURE=now +T
这是仅须要wait(REMAINING)便可,在wait(REMAINING)返回以后将会执行:REMAINING = FUTURE -now,若是REMAINING小于等于0,表示已经超时,直接退出,不然将继续执行wait(REMAINING)。
上述描述等待超时模式的伪代码以下:
public synchronized Object get(long mills) throws InterruptedException { long future = System.currentTimeMillis() + mills; long remaining = mills; //当超时大于0而且result 返回值不知足要求 while((result == null) && remaining > 0){ wait(remaining); remaining = future - System.currentTimeMillis(); } return result; }
能够看出,等待超时模式就是在等待/通知范式的基础上增长了超时控制。这使得该模式相比原有范式更具备灵活性,由于即便方法执行时间过长,也不会"永久阻塞调用者",而时会按照调用者的要求“按时”返回
4.2 一个简单的数据库链接池示例
咱们使用等待超时模式来构造一个简答的数据库链接池,在示例模拟从链接池中获取、使用和释放链接的过程,而客户端获取链接的过程被设定为超时等待的模式,也就是在1000毫秒内若是没法获取到可用的链接,将返回给客户端一个null。设定链接池的大小为10个,而后经过调节客户端的线程数来模拟没法获取链接大场景。
首先看一下链接池的定义。他经过构造函数初始化链接的最大上限,经过一个双向队列来维护链接,调用fetchConnection(long)方法来指定在多少毫秒内超时获取链接,当链接使用完成后,须要调用releaseConnection(Connection)方法将链接放回线程池。
public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<>(); public ConnectionPool(int initialSize) { if (initialSize > 0){ for (int i = 0; i < initialSize; i++) { pool.addLast(ConnectionDriver.createConnection()); } } } public void releaseConnection(Connection connection){ if(connection != null){ synchronized (pool){ // 链接释放后须要进行通知,这样其余消费者可以感知到链接池中已经归还了一个链接 pool.addLast(connection); pool.notifyAll(); } } } public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool){ if(mills <= 0){ while (pool.isEmpty()){ pool.wait(); } return pool.removeFirst(); }else { long future = System.currentTimeMillis() + mills; long remaining = mills; while (pool.isEmpty() && remaining >0){ pool.wait(remaining); remaining = future - System.currentTimeMillis(); } Connection result = null; if(!pool.isEmpty()){ result = pool.removeFirst(); } return result; } } } }
因为java.sql.connection是一个接口,最终的实现是由数据库驱动提供方来实现的,考虑到只是个示例,咱们经过动态代理构造了一个Connection,该connection,该connection的代理实现仅仅实在commit()方法调用时休眠100毫秒
public class ConnectionDriver { static class ConnectionHandler implements InvocationHandler{ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("commit")){ TimeUnit.SECONDS.sleep(100); } return null; } } public static final Connection createConnection(){ return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),new Class<?>[]{Connection.class},new ConnectionHandler()); } }
下面经过一个示例来测试建议数据库链接池的工做状况,模拟客户端ConnextionRunner获取、使用、最后释放链接的过程,当它使用链接将会增长后去到链接的数量,反之,将会增长未获取到链接的数量
public class ConnectionPoolTest { static ConnectionPool connectionPool = new ConnectionPool(10); //保证全部connection 可以同时开始 static CountDownLatch start = new CountDownLatch(1); //main 线程将会等待全部connectionRunner 结束后才能继续执行 static CountDownLatch end; public static void main(String[] args) throws InterruptedException { int threadCount = 10; end = new CountDownLatch(threadCount); int count = 20; AtomicInteger got = new AtomicInteger(); AtomicInteger notGot = new AtomicInteger(); for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "connectionRunnerThread"); thread.start(); start.countDown(); end.await(); System.out.println("total invoke :"+(threadCount * count)); System.out.println("got connection:"+got); System.out.println("notgot connection :"+notGot); } } static class ConnectionRunner implements Runnable{ int count; AtomicInteger got; AtomicInteger notgot; public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notgot) { this.count = count; this.got = got; this.notgot = notgot; } @Override public void run() { try { start.await(); } catch (InterruptedException e) { e.printStackTrace(); } while (count > 0){ //从线程池中获取链接,若是1000ms 内没法获取到,将会返回null //分别统计链接获取的数量got和或获取到的数量 notgot try { Connection connection = connectionPool.fetchConnection(100); if(connection != null){ try { connection.createStatement(); connection.commit(); }finally { connectionPool.releaseConnection(connection); got.incrementAndGet(); } }else { notgot.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); }finally { count --; } } end.countDown(); } } }
上述示例中使用了CountDownLatch来确保ConnectionRunnerThread可以同时开始执行,而且在所有结束以后,才使main线程从等待状态中返回。当前设定的场景是10个线程同时运行获取链接池(10个链接)中链接,经过调整线程数量来观察未获取到链接的状况。线程数、总获取次数、获取到的数量、未获取到的数量以及未获取到的比率。
从表中的数据统计能够看出,在资源必定的状况下(链接池中的10个链接),随着客户端线程的逐步增长,客户端出现超时没法获取链接的比率不断升高。虽然客户端线程在这种超时获取的模式下回出现链接没法获取的状况,可是他可以保证客户端线程不会一直关在链接获取的操做上,而是“按时”返回,并告知客户端链接后去出现问题,是系统的一种自我保护机制。数据库链接池的设计也能够复用到其余的资源获取的场景,针对昂贵的资源(好比数据库链接池)的获取都应该加以超时限制。
4.3 线程技术及其示例
对于一个服务端的程序,常常面对的是客户传入的短小(执行时间短、工做内容较为单一)任务,须要服务端快速处理并返回结果。若是服务端每次接受到一个任务,穿建一个线程。而后进行执行,这在原型阶段是一个不错的选择,可是面对成千上万的任务递交进服务器时,若是仍是采用一个任务一个线程的方式,那么将会建立数以万记得线程,这不是一个好的选择。由于这回使操做系统频繁的进行上下文切换,无端增长系统的负载,而线程的建立和消亡都是须要消费系统资源的,也无疑浪费了系统资源。
线程池技术可以很好的解决这个问题,他预先建立了若干数量的线程,而且不能由用户直接对线程的建立进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样作的好处是,一方面消除了频繁建立和消亡线程的系统资源的开销,另外一方面,面对过量任务的提交可以平缓的劣化。
下面先看一个简单的线程池接口定义
public interface ThreadPool<Job extends Runnable>{ //执行一个Job,这个须要实现Runner void execute(Job job); //关闭线程chi void shutdown(); //增长工做者线程 void addWorkers(int num); //减小工做者线程 void removeWorker(int num); //获得正在等待执行的任务数量 int getJobSize(); }
客户端能够经过execute(Job)方法将Job提交入线程池执行,而客户端自身不用等待Job的执行完成。除了execute(Job)方法之外,线程池接口提供了增大/减小工做者线程以及关闭线程池的方法。这里工做者线程表明者一个重复执行Job的线程,而每一个有客户段提交的Job都将进入一个工做队列中等待工做者线程的处理。
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { //线程池最大限制数 private static final int MAX_WORKER_NUMBERS = 10; //线程池默认的数量 private static final int DEFAULT_WORKER_NUMBERS = 5; //线程池最小的数量 private static final int MIN_WORKER_NUMBERS = 1; //这里是一个工做列表,将会向里面插入工做 private final LinkedList<Job> jobs = new LinkedList<>(); //工做者列表 private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); //工做者线程的数量 private int workerNum = DEFAULT_WORKER_NUMBERS; //线程编号生成 private AtomicInteger threadNum = new AtomicInteger(); public DefaultThreadPool() { initializWokers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS :num; initializWokers(workerNum); } //初始化线程工做者 private void initializWokers(int num) { for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet()); thread.start(); } } @Override public void execute(Job job) { if(job != null){ //添加一个工做,而后进行通知 synchronized (jobs){ jobs.addLast(job); jobs.notify(); } } } @Override public void shutdown() { for (Worker worker : workers) { worker.shutdown(); } } @Override public void addWorkers(int num) { synchronized (jobs){ //限制新增的worker 数量不能超过最大值 if(num + this.workerNum > MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializWokers(num); this.workerNum += num; } } @Override public void removeWorker(int num) { synchronized (jobs){ if(num >= this.workerNum){ throw new IllegalArgumentException("beyond worknum"); } //按照给定数量中止worker int count = 0; while (count < num){ Worker worker = workers.get(count); if(workers.remove(worker)){ worker.shutdown(); count++; } } this.workerNum -= count; } } @Override public int getJobSize() { return jobs.size(); } //工做者,负责消费任务 class Worker implements Runnable{ //是否工做 private volatile boolean running = true; @Override public void run() { while (running){ Job job = null; synchronized (jobs){ while (jobs.isEmpty()){ try { jobs.wait(); } catch (InterruptedException e) { //感知到外部对workerThread的中断操做,返回 Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if(job != null){ try { job.run(); }catch (Exception e){ //忽略执行中的exception } } } } public void shutdown(){ running = false; } } }
从线程池的实现能够看到,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加job,而每一个工做者线程会不断从Jobs上取出一个Job进行执行,当jobs为空时,工做He线程进入等待状态。
添加一个Job后,对工做队列jobs调用了其notify() 方法,而不是notifyAll()方法,由于可以肯定有工做者线程被唤醒,这时使用notify()方法将会比notifyAll()方法得到更小的开销(避免将等待对列中的线程所有移动到阻塞对列).
能够看到,线程池的本质就是使用了一个线程安全的工做队列链接工做者线程和客户端线程,客户端线程将任务放入工做队列后便返回,而工做者线程则不段的从工做队列上取出工做并执行。当工做队列为空时,全部的工做者线程均等待在工做队列上,当有客户段提交了一个任务以后会通知任意一个工做者线程,随着大量的任务被提交,更多的工做者线程会被唤醒。
4.4 一个基于线程池技术的简单web服务器
目前的浏览器都支持多线程访问,好比说在请求一个HTML页面的时候,页面中包含的图片资源、样式资源会被浏览器发起并发的获取,这样用户就不会遇到一直等到一个图片彻底下载完成才能继续查看文字内容的尴尬状况
若是web服务器是单线程的,多线程的浏览器也没有用武之地,由于服务端仍是一个请求一个请求的顺序处理。所以,大部分web服务器都是支持并发访问的。经常使用的java Web服务器,如tomcat、jetty,在其处理请求的过程当中都使用到了线程池技术。
下面经过使用前一节中的线程池来构造一个简单的web服务器,这个web服务器用来处理HTTP请求,目前只能处理简单的文本和JPG图片内容。这个WEB服务器使用main线程不断地接受客户端Socket的链接,将链接以及请求提交给线程池处理,这样使得web服务器可以同时处理多个客户端请求,
public class SimpleHttpServer { //处理HttpRequest的线程池 static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1); //simpleHttpServer的根路径 static String basePath; static ServerSocket serverSocket; static int port = 8080; public static void setPort(int port){ if (port > 0){ SimpleHttpServer.port = port; } } public static void setBasePath(String basePath){ if(basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()){ SimpleHttpServer.basePath = basePath; } } //启动SimpleHttpserver public static void start() throws Exception { serverSocket = new ServerSocket(port); Socket socket = null; while ((socket = serverSocket.accept())!= null){ threadPool.execute(new HttpRequestHandler(socket)); } serverSocket.close(); } static class HttpRequestHandler implements Runnable{ private Socket socket; public HttpRequestHandler(Socket socket) { socket = socket; } @Override public void run() { String line = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; InputStream in = null; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String header = reader.readLine(); //由相对路径计算出绝对路径 String filePath = basePath + header.split(" ")[1]; out = new PrintWriter(socket.getOutputStream()); //若是请求资源的后缀为jpg或者ico,则读取资源并输出 if(filePath.endsWith("jpg") || filePath.endsWith("ico")){ in = new FileInputStream(filePath); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int i = 0; while ((i = in.read()) != -1){ baos.write(i); } byte[] bytes = baos.toByteArray(); out.println("HTTP/1.1 200 OK"); out.println("Server : Molly"); out.println("content-Type: image/jpeg"); out.println("Content-Length: "+bytes.length); out.println(""); socket.getOutputStream().write(bytes,0,bytes.length); }else { br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.println("HTTP/1.1 200 OK"); out.println("Server: Molly"); out.println("Content-Type: text/html; charset=UTF-8"); out.println(""); while ((line = br.readLine()) != null){ out.println(line); } } out.flush(); }catch (Exception e){ out.println("HTTP/1.1 500"); out.println(""); out.flush(); }finally { close(br,in,reader,out,socket); } } private void close(Closeable... closeables) { if (closeables != null){ for (Closeable closeable : closeables) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
在图中,SimpleHttpeServer在创建了与客户端的链接以后,并不会处理客户端的请求,而是将其包装成HttpRequestHandler并交由线程池处理。在线程池中的Worker处理客户端请求的同时,SimpleHttpServer可以继续完成后续客户端链接的创建,不会阻塞后续客户端的请求。
5.本章小结
本章从介绍多线程技术带来的好处开始,讲述了如何启动和终止线程以及线程的状态,详细阐述了多线程之间进行通讯的基本方式和等待/通知经典范式。在线程因果给你示例中,使用了等待超时、数据库链接池以及简单的线程池3个不一样案例。