Java基础知识笔记-14-并发html
读者可能已经很熟悉操做系统中的多任务(multitasking): 在同一刻运行多个程序的能力。 例如,在编辑或下载邮件的同时能够打印文件。今天,人们极可能有单台拥有多个CPU的计算机,可是,并发执行的进程数目并非由CPU数目制约的。操做系统将CPU的时间片分配给每个进程,给人并行处理的感受。java
多线程程序在较低的层次上扩展了多任务的概念:一个程序同时执行多个任务。一般,每个任务称为一个线程(thread), 它是线程控制的简称。能够同时运行一个以上线程的程序称为多线程程序(multithreaded)。程序员
那么,多进程与多线程有哪些区别呢? 本质的区别在于每一个进程拥有本身的一整套变量,而线程则共享数据。这听起来彷佛有些风险,的确也是这样,在本章稍后将能够看到这个问题。然而,共享变量使线程之间的通讯比进程之间的通讯更有效、更容易。此外,在有些操做系统中,与进程相比较,线程更“ 轻量级”,建立、撤销一个线程比启动新进程的开销要小得多。web
在实际应用中,多线程很是有用。例如,一个浏览器能够同时下载几幅图片。一个Web服务器须要同时处理几个并发的请求。图形用户界面(GUI)程序用一个独立的线程从宿主操做环境中收集用户界面的事件。本章将介绍如何为Java应用程序添加多线程能力。数据库
这里从察看一个没有使用多线程的程序开始。用户很难让它执行多个任务。在对其进行剖析以后,将展现让这个程序运行几个彼此独立的多个线程是很容易的。这个程序采用不断地移动位置的方式实现球跳动的动画效果,若是发现球碰到墙壁,将进行重绘。编程
当点击Start按钮时,程序将从屏幕的左上角弹出一个球,这个球便开始弹跳。Start按钮的处理程序将调用addBall方法。这个方法循环运行1000次move。每调用一次move, 球就会移动一点,当碰到墙壁时,球将调整方向,并从新绘制面板。数组
Ball ball = new Ball(); panel.add(ball); for (int i = 1 ;i <= STEPS;i++) { ball.move(panel.getBounds()); panel.paint(panel.getCraphics()); Thread.sleep(DELAY); }
调用Threadsleep不会建立一个新线程,sleep是Thread类的静态方法,用于暂停当前线程的活动。sleep方法能够抛出一个InterruptedException异常。稍后将讨论这个异常以及对它的处理。如今,只是在发生异常时简单地终止弹跳。若是运行这个程序,球就会自如地来回弹跳,可是,这个程序彻底控制了整个应用程序。若是你在球完成1000次弹跳以前已经感到厌倦了,并点击Close按钮会发现球仍然还在弹跳。在球本身结束弹跳以前没法与程序进行交互。浏览器
能够将移动球的代码放置在一个独立的线程中,运行这段代码能够提升弹跳球的响应能力。实际上,能够发起多个球,每一个球都在本身的线程中运行。另外,AWT的事件分派线程(event dispatch thread)将一直地并行运行,以处理用户界面的事件。因为每一个线程都有机会得以运行,因此在球弹跳期间,当用户点击Close按钮时,事件调度线程将有机会关注到这个事件,并处理“关闭”这一动做。缓存
这里用球弹跳代码做为示例,让你们对并发处理有一个视觉印象。一般,人们总会提防长时间的计算。这个计算极可能是某个大框架的一个组成部分,例如,GUI或web框架。不管什么时候框架调用自身的方法都会很快地返回一个异常。若是须要执行一个比较耗时的任务,应当并发地运行任务。安全
下面是在一个单独的线程中执行一个任务的简单过程:
public interface Runnable { void run(); }
因为Runnable是一个函数式接口,能够用lambda表达式创建一个实例:
Runnable r = () -> { taskcode };
Thread t = new Thread(r);
t.start()
;Runnable r = () -> { try { for (int i = 1 ; i <=: STEPS; i++) { ball.move(comp.getBounds()); comp.repaint(); Thread.sleep(DELAY); } } catch (InterruptedException e) { } }; Thread t = new Thread(r); t.start();
一样地,须要捕获sleep方法可能抛出的异常InterruptedException。下一节将讨论这个异常。在通常状况下,线程在中断时被终止。所以,当发生InterruptedException异常时,run方法将结束执行。
不管什么时候点击Start按钮,球会移入一个新线程。仅此而已!如今应该知道如何并行运行多个任务了。本章其他部分将阐述如何控制线程之间的交互。
Runnable对象仅仅做为Thread对象的target,Runable实现类里包含的run()方法仅做为线程执行体。而实际的线程对象仍然是Thread实例,只是该Thread线程负责执行其target的run()方法。
也能够经过构建一个Thread类的子类定义一个线程,以下所示
- 定义Thread类的子类 ,并重写该类的run()方法,该run()方法的方法体就体现了线程须要完成的任务。所以把run()方法称为线程执行体.。
- 建立Thread子类的实例,即建立了线程对象。
- 调用线程对象的start()方法来启动该线程。
class MyThread extends Thread { public void run() { taskcode } }而后,构造一个子类的对象,并调用start方法。不过,这种方法已再也不推荐。应该将要并行运行的任务与运行机制解耦合。若是有不少任务,要为每一个任务建立一个独立的线程所付出的代价太大了。可使用线程池来解决这个问题,有关内容请参看第14.9节。
警告:不要调用Thread类或Runnable对象的run方法。直接调用run方法,只会执行同一个线程中的任务,而不会启动新线程。应该调用
Thread.start
方法。这个方法将建立一个执行run方法的新线程。
当线程的run方法执行方法体中最后一条语句后,并经由执行return语句返冋时,或者出现了在方法中没有捕获的异常时,线程将终止。在Java的早期版本中,还有一个stop方法,其余线程能够调用它终止线程。可是,这个方法如今已经被弃用了。
没有能够强制线程终止的方法。然而,interrupt方法能够用来请求终止线程。
当对一个线程调用interrupt方法时,线程的中断状态将被置位。这是每个线程都具备的boolean标志。每一个线程都应该不时地检査这个标志,以判断线程是否被中断。
要想弄清中断状态是否被置位,首先调用静态的Thread.currentThread方法得到当前线程,而后调用islnterrupted方法:
while (!Thread.currentThread().islnterrupted() && more work todo) { domorework }
可是,若是线程被阻塞,就没法检测中断状态。这是产生InterruptedException异常的地方。当在一个被阻塞的线程(调用sleep或wait)上调用interrupt方法时,阻塞调用将会被InterruptedException异常中断。(存在不能被中断的阻塞I/O调用,应该考虑选择可中断的调用。有关细节请参看卷2的第1章和第3章。)
没有任何语言方面的需求要求一个被中断的线程应该终止。中断一个线程不过是引发它的注意。被中断的线程能够决定如何响应中断。某些线程是如此重要以致于应该处理完异常后,继续执行,而不理会中断。可是,更广泛的状况是,线程将简单地将中断做为一个终止的请求。这种线程的run方法具备以下形式:
Runnable r = () -> { try { while (!Thread.currentThread().islnterrupted0 && more work todo) { do morework } } catch(InterruptedException e) { // thread was interrupted during sleep or wait } finally { cleanup,if required } // exiting the run method terminates the thread };
若是在每次工做迭代以后都调用sleep方法(或者其余的可中断方法),islnterrupted检测既没有必要也没有用处。若是在中断状态被置位时调用sleep方法,它不会休眠。相反,它将清除这一状态(!)并拋出InterruptedException。所以,若是你的循环调用sleep,不会检测中断状态。相反,要以下所示捕获 InterruptedException异常:
Runnable r = () -> { try { while (!Thread.currentThread().isInterrupter() && more work todo) { do morework Tread.sleep(delay); } } catch(InterruptedException e) { // thread was interrupted during sleep } finally{ cleanup,if required } // exiting the run method terminates the thread };
注释:有两个很是相似的方法,interrupted和islnterrupted。Interrupted方法是一个静态方法,它检测当前的线程是否被中断。并且,调用interrupted方法会清除该线程的中断状态。另外一方面,islnterrupted方法是一个实例方法,可用来检验是否有线程被中断。调用这个方法不会改变中断状态。
在不少发布的代码中会发现InterruptedException异常被抑制在很低的层次上,像这样:
void mySubTask() { ... try { sleep(delay); } catch (InterruptedException e) {} // Don't ignore! ... }
不要这样作!若是不认为在catch子句中作这一处理有什么好处的话,仍然有两种合理的选择:
Thread.currentThread().interrupt()
来设置中断状态。因而,调用者能够对其进行检测。void mySubTask() { try { sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
void mySubTask() throws InterruptedException { ... sleep(delay); ... }
java.Iang.Thread1.0
void interrupts(); //向线程发送中断请求。线程的中断状态将被设置为true。若是目前该线程被一个sleep调用阻塞,那么,InterruptedException异常被抛出。 static boolean interrupted(); //测试当前线程(即正在执行这一命令的线程)是否被中断。注意,这是一个静态方法。这一调用会产生反作用---它将当前线程的中断状态重置为false。 boolean islnterrupted(); //测试线程是否被终止。不像静态的中断方法,这一调用不改变线程的中断状态。 static Thread currentThread(); //返回表明当前执行线程的Thread对象。
线程能够有以下6种状态:
下一节对每一种状态进行解释。要肯定一个线程的当前状态,可调用getState方法。
当用new操做符建立一个新线程时,如newThread(r),该线程尚未开始运行。这意味着它的状态是new。当一个线程处于新建立状态时,程序尚未开始运行线程中的代码。在线程运行以前还有一些基础工做要作。
一旦调用start方法,线程处于runnable状态。一个可运行的线桿可能正在运行也可能没有运行,这取决于操做系统给线程提供运行的时间。(Java的规范说明没有将它做为一个单独状态。一个正在运行中的线程仍然处于可运行状态。)
一旦一个线程开始运行,它没必要始终保持运行。事实上,运行中的线程被中断,目的是为了让其余线程得到运行机会。线程调度的细节依赖于操做系统提供的服务。抢占式调度系统给每个可运行线程一个时间片来执行任务。当时间片用完,操做系统剥夺该线程的运行权,并给另外一个线程运行机会(见图14-4 )。当选择下一个线程时,操做系统考虑线程的优先级---更多的内容见第4.1节。
如今全部的桌面以及服务器操做系统都使用抢占式调度。可是,像手机这样的小型设备可能使用协做式调度。在这样的设备中,一个线程只有在调用yield方法、或者被阻塞或等待时,线程才失去控制权。
在具备多个处理器的机器上,每个处理器运行一个线程,能够有多个线程并行运行。固然,若是线程的数目多于处理器的数目,调度器依然采用时间片机制。
记住,在任何给定时刻,二个可运行的线程可能正在运行也可能没有运行(这就是为何将这个状态称为可运行而不是运行)。
当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器从新激活它。细节取决于它是怎样达到非活动状态的。
当一个线程试图获取一个内部的对象锁(而不是javiutiUoncurrent库中的锁),而该锁被其余线程持有,则该线程进入阻塞状态(咱们在14.5.3节讨论java.util.concurrent锁,在14.5.5节讨论内部对象锁)。当全部其余线程释放该锁,而且线程调度器容许本线程持有它的时候,该线程将变成非阻塞状态。
当线程等待另外一个线程通知调度器一个条件时,它本身进入等待状态。咱们在第14.5.4节来讨论条件。在调用Object.wait方法或Thread.join方法,或者是等待java.util.concurrent库中的Lock或Condition时,就会出现这种状况。实际上,被阻塞状态与等待状态是有很大不一样的。
有几个方法有一个超时参数。调用它们致使线程进入计时等待(timed waiting) 状态。这一状态将一直保持到超时期满或者接收到适当的通知。带有超时参数的方法有Thread.sleep和Object.wait、Thread.join、Lock.tryLock以及Condition.await的计时版。
图14-3展现了线程能够具备的状态以及从一个状态到另外一个状态可能的转换。当一个线程被阻塞或等待时(或终止时),另外一个线程被调度为运行状态。当一个线程被从新激活(例如,由于超时期满或成功地得到了一个锁),调度器检查它是否具备比当前运行线程更高的优先级。若是是这样,调度器从当前运行线程中挑选一个,剥夺其运行权,选择一个新的线程运行。
线程因以下两个缘由之一而被终止:
注意:当主线程结束时,其余线程不受任何影响,能够调用线程对象的isAlive()方法,当线程处于就绪,运行,阻塞三种状态时,该方法将返回true,当线程处于新建,死亡两种状态时,该方法将返回false
不要试图对一个已近死亡的线程调用start()方法使它从新启动。会抛出异常
在Java程序设计语言中,每个线程有一个优先级。默认状况下,一个线程继承它的父线程的优先级。能够用setPriority方法提升或下降任何一个线程的优先级。能够将优先级设 置为在MIN_PRIORITY(在Thread类中定义为1)与MAX_PRIORITY(定义为10)之间的任何值。NORM_PRIORITY被定义为5。
每当线程调度器有机会选择新线程时,它首先选择具备较高优先级的线程。可是,线程优先级是高度依赖于系统的。当虚拟机依赖于宿主机平台的线程实现机制时,Java线程的优先级被映射到宿主机平台的优先级上,优先级个数也许更多,也许更少。
例如,Windows有7个优先级别。一些Java优先级将映射到相同的操做系统优先级。在Oracle为Linux提供的Java虚拟机中,线程的优先级被忽略一全部线程具备相同的优先级。初级程序员经常过分使用线程优先级。为优先级而烦恼是事出有因的。不要将程序构建为功能的正确性依赖于优先级。
警告:若是确实要使用优先级,应该避免初学者常犯的一个错误。若是有几个高优先级的线程没有进入非活动状态,低优先级的线程可能永远也不能执行。每当调度器决定运行一个新线程时,首先会在具备高优先级的线程中进行选择,尽管这样会使低优先级的线程彻底饿死。
void setPriority(int newPriority) //设置线程的优先级。优先级必须在Thread.MIN_PRIORITY与Thread.MAX_PRIORITY之间。通常使用Thread.NORMJ»RIORITY优先级 static int MIN_PRIORITY //线程的最小优先级。最小优先级的值为1。 static int N0RM_PRI0RITY //线程的默认优先级。默认优先级为5 static int MAX_PRIORITY //线程的最高优先级。最高优先级的值为10 static void yield() //致使当前执行线程处于让步状态。若是有其余的可运行线程具备至少与此线程一样高 的优先级,那么这些线程接下来会被调度。注意,这是一个静态方法
有一种线程,他是在后台运行的,他的任务就是为其余的线程提供服务,这种线程被称为守护线程,好比JVM的垃圾回收线程。
能够经过调用
t.setDaemon(true);
将线程转换为守护线程(daemon thread)。这样一个线程没有什么神奇。守护线程的惟一用途是为其余线程提供服务。计时线程就是一个例子,它定时地发送“计时器嘀嗒”信号给其余 线程或清空过期的高速缓存项的线程。当只剩下守护线程时,虚拟机就退出了,因为若是只剩下守护线程,就不必继续运行程序了。
守护线程有时会被初学者错误地使用, 他们不打算考虑关机(shutdown) 动做。可是,这是很危险的。守护线程应该永远不去访问固有资源,如文件、数据库,由于它会在任什么时候候甚至在一个操做的中间发生中断。
线程的run方法不能抛出任何受查异常,可是,非受査异常会致使线程终止。在这种状况下,线程就死亡了。
可是,不须要任何catch子句来处理能够被传播的异常。相反,就在线程死亡以前,异常被传递到一个用于未捕获异常的处理器。
该处理器必须属于一个实现Thread.UncaughtExceptionHandler
接口的类。这个接口只有一个方法。
void uncaughtException(Thread t, Throwable e)
能够用setUncaughtExceptionHandler
方法为任何线程安装一个处理器。也能够用Thread类的静态方法setDefaultUncaughtExceptionHandler
为全部线程安装一个默认的处理器。替换处理器可使用日志API发送未捕获异常的报告到日志文件。
若是不安装默认的处理器,默认的处理器为空。可是,若是不为独立的线程安装处理器,此时的处理器就是该线程的ThreadGroup对象。
注释:线程组是一个能够统一管理的线程集合。默认状况下,建立的全部线程属于相同的线程组,可是,也可能会创建其余的组。如今引入了更好的特性用于线程集合的操做,因此建议不要在本身的程序中使用线程组。
ThreadGroup类实现Thread.UncaughtExceptionHandler
接口。它的uncaughtException方法作以下操做:
Thread.getDefaultExceptionHandler
方法返回一个非空的处理器,则调用该处理器。4)不然,线程的名字以及Throwable的栈轨迹被输出到System.err
上。这是你在程序中确定看到过许屡次的栈轨迹。
若是须要让当前正在执行的线程暂停一段时间,进入阻塞状态,则能够经过调用Thread类的静态sleep()方法来实现。sleep()方法有两种重载方式。
static void sleep(long millis); //让当前正在执行的线程暂停millis毫秒,并进入阻塞状态 static void sleep(long millis,int nanos); ////让当前正在执行的线程暂停millis毫秒+nanos毫秒,并进入阻塞状态
当当前线程调用sleep()方法进入阻塞状态后,在睡眠时间段内,该线程不会得到执行的机会,即便系统中没有其余可执行的线程,处于sleep()中的线程也不会执行,所以sleep()方法经常使用来暂停程序的执行。
在大多数实际的多线程应用中,两个或两个以上的线程须要共享对同一数据的存取。若是两个线程存取相同的对象,而且每个线程都调用了一个修改该对象状态的方法,将会发生什么呢?能够想象,线程彼此踩了对方的脚。根据各线程访问数据的次序,可能会产生讹误的对象。这样一个状况一般称为竞争条件(race condition)。
为了不多线程引发的对共享数据的说误,必须学习如何同步存取。在本节中,你会看到若是没有使用同步会发生什么。在下一节中,将会看到如何同步数据存取。在下面的测试程序中,模拟一个有若干帐户的银行。随机地生成在这些帐户之间转移钱款的交易。每个帐户有一个线程。每一笔交易中,会从线程所服务的帐户中随机转移必定数目的钱款到另外一个随机帐户。模拟代码很是直观。咱们有具备transfer方法的Bank类。该方法从一个帐户转移必定数目的钱款到另外一个帐户(尚未考虑负的帐户余额)。以下是Bank类的transfer方法的代码。
public void transfer(int from, int to, double amount) // CAUTION: unsafe when called from multiple threads { System.out.print(Thread,currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %d to %d", amount, from, to); accounts[to] += amount; System.out.printf("Total Balance: %10.2fXn", getTotalBalance()); }
这里是Runnable类的代码。它的run方法不断地从一个固定的银行帐户取出钱款。在每一次迭代中,run方法随机选择一个目标帐户和一个随机帐户,调用bank对象的transfer方法,而后睡眠。
Runnable r = () -> { try { while (true) { int toAccount = (int) (bank.size() * Math.random()); double amount = MAX_AMOUNT * Math.random(); bank.transfer(fromAccount, toAccount, amount); Thread.sleep((int) (DELAY * Math.random())); } } catch (InterruptedExeeption e) { } };
当这个模拟程序运行时,不清楚在某一时刻某一银行帐户中有多少钱。可是,知道全部帐户的总金额应该保持不变,由于所作的一切不过是从一个帐户转移钱款到另外一个帐户。在每一次交易的结尾,transfer方法从新计算总值并打印出来。本程序永远不会结束。只能按CTRL+C来终止这个程序。
下面是典型的输出:
Thread[Thread-11,5,main] 588.48 from 11to 44 Total Balance: 100000.00 Thread[Thread-12,5,main] 976.11from 12 to 22 Total Balance: 100000.00 Thread[Thread-14,5,main] 521.51 from 14 to 22 Total Balance: 100000.00 Thread[Thread-13,5,main] 359.89 from 13 to 81Total Balance: 100000.00 ... Thread[Thread-36,5,main] 401.71from 36 to 73 Total Balance: 99291.06 Thread[Thread-35,5,main] 691.46 from 35 to 77 Total Balance: 99291.06 Thread[Thread-37,5,main] 78.64 from 37 to 3 Total Balance: 99291.06 Thread[Thread-34,5,main] 197.11from 34 to 69 Total Balance: 99291.06 Thread[Thread-36,5,main] 85.96 from 36 to 4 Total Balance: 99291.06 Thread[Thread-4,5,main]Thread[Thread-33,5,main] 7.31 from 31to 32 Total Balance: 99979.24 627.50 from 4 to 5 Total Balance: 99979.24
正如前面所示,出现了错误。在最初的交易中,银行的余额保持在$100000, 这是正确的,由于共100个帐户,每一个帐户$1000。可是,过一段时间,余额总量有轻微的变化。当运行这个程序的时候,会发现有时很快就出错了,有时很长的时间后余额发生混乱。这样的状态不会带来信任感,人们极可能不肯意将辛苦挣来的钱存到这个银行。程序清单14-5和程序清单14-6中的程序提供了完整的源代码。看看是否能够从代码中找出问题。下一节将解说其中奥秘。
上一节中运行了一个程序,其中有几个线程更新银行帐户余额。一段时间以后,错误不知不觉地出现了,总额要么增长,要么变少。当两个线程试图同时更新同一个帐户的时候,这个问题就出现了。假定两个线程同时执行指令accounts[to] += amount
; 问题在于这不是原子操做。该指令可能被处理以下:
如今,假定第1个线程执行步骤1和2, 而后,它被剥夺了运行权。假定第2个线程被唤醒并修改了accounts数组中的同一项。而后,第1个线程被唤醒并完成其第3步。这样,这一动做擦去了第二个线程所作的更新。因而,总金额再也不正确。咱们的测试程序检测到这一讹误。(固然,若是线程在运行这一测试时被中断,也有可能会出现失败警告!)
出现这一讹误的可能性有多大呢?这里经过将打印语句和更新余额的语句交织在一块儿执行,增长了发生这种状况的机会。
若是删除打印语句,讹误的风险会下降一点,由于每一个线程在再次睡眠以前所作的工做不多,调度器在计算过程当中剥夺线程的运行权可能性很小。可是,讹误的风险并无彻底消失。若是在负载很重的机器上运行许多线程,那么,即便删除了打印语句,程序依然会出错。这种错误可能会几分钟、几小时或几天出现一次。坦白地说,对程序员而言,不多有比无规律出现错误更糟的事情了。
真正的问题是transfer方法的执行过程当中可能会被中断。若是可以确保线程在失去控制以前方法运行完成,那么银行帐户对象的状态永远不会出现讹误。
有两种机制防止代码块受并发访问的干扰。Java语言提供一个synchronized关键字达 到这一目的,而且Java SE 5.0引入了ReentrantLock类。synchronized关键字自动提供一个锁以及相关的“条件”,对于大多数须要显式锁的状况,这是很便利的。可是,咱们相信在读者分別阅读了锁和条件的内容以后,理解 synchronized关键字是很轻松的事情。
java.util.concurrent
框架为这些基础机制提供独立的类,在此以及第14.5.4节加以解释这个内容。读者理解了这些构建块以后,将讨论第14.5.5节。
用ReentrantLock保护代码块的基本结构以下:
myLock.lock(); // a ReentrantLock object try { critical section } finally { myLock.unlock();// make sure the lock is unlocked even if an exception is thrown }
这一结构确保任什么时候刻只有一个线程进入临界区。一旦一个线程封锁了锁对象,其余任何线程都没法经过lock语句。当其余线程调用lock时,它们被阻塞,直到第一个线程释放锁对象。
警告:把解锁操做括在finally子句以内是相当重要的。若是在临界区的代码抛出异常,锁必须被释放。不然,其余线程将永远阻塞。
注释:若是使用锁,就不能使用带资源的try语句。首先,解锁方法名不是close。不过,即便将它重命名,带资源的try语句也没法正常工做。它的首部但愿声明一个新变量。可是若是使用一个锁,你可能想使用多个线程共享的那个变量(而不是新变量)。
让咱们使用一个锁来保护Bank类的transfer方法。
public class Bank { private Lock bankLock = new ReentrantLock();// ReentrantLock implements the Lock interface public void transfer(int from, int to, int amount) { bankLock.lock(); try { System.out.print(Thread.currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %A to %d", amount, from, to); accounts[to] += amount; System.out.printf("Total Balance: %10.2f%n", getTotalBalance()); } finally { banklock.unlock(); } } }
假定一个线程调用transfer,在执行结束前被剥夺了运行权。假定第二个线程也调用transfer,因为第二个线程不能得到锁,将在调用lock方法时被阻塞。它必须等待第一个线程完成transfer方法的执行以后才能再度被激活。当第一个线程释放锁时,那么第二个线程才能开始运行(见图 14-5)。
尝试一下。添加加锁代码到transfer方法而且再次运行程序。你能够永远运行它,而银行的余额不会出现讹误。
注意每个Bank对象有本身的ReentrantLock对象。若是两个线程试图访问同一个Bank对象,那么锁以串行方式提供服务。可是,若是两个线程访问不一样的Bank 对象,每个线程获得不一样的锁对象,两个线程都不会发生阻塞。本该如此,由于线程在操纵不一样的Bank实例的时候,线程之间不会相互影响。
锁是可重入的,由于线程能够重复地得到已经持有的锁。锁保持一个持有计数(hold count)来跟踪对lock方法的嵌套调用。线程在每一次调用lock都要调用unlock来释放锁。因为这一特性,被一个锁保护的代码能够调用另外一个使用相同的锁的方法。
例如,transfer方法调用getTotalBalance方法,这也会封锁bankLock对象,此时bankLock对象的持有计数为2。当getTotalBalance方法退出的时候,持有计数变回1。当transfer方法退出的时候,持有计数变为0。线程释放锁。一般,可能想要保护需若干个操做来更新或检查共享对象的代码块。要确保这些操做完成后,另外一个线程才能使用相同对象。
警告:要留心临界区中的代码,不要由于异常的抛出而跳出临界区。若是在临界区代码 结束以前抛出了异常,finally子句将释放锁,但会使对象可能处于一种受损状态。
java.util.concurrent.locks.Lock 5.0
void lock(); //获取这个锁;若是锁同时被另外一个线程拥有则发生阻塞。 void unlock(); //释放这个锁。
java.util.concurrent.locks.ReentrantLock 5.0
ReentrantLock(); //构建一个能够被用来保护临界区的可重入锁。 ReentrantLock(boolean fair); //构建一个带有公平策略的锁。一个公平锁偏心等待时间最长的线程。可是,这一公平的保证将大大下降性能。因此,默认状况下,锁没有被强制为公平的。
警告: 听起来公平锁更合理一些,可是使用公平锁比使用常规锁要慢不少。 只有当你确实了解本身要作什么而且对于你要解决的问题有一个特定的理由必须使用公平锁的时候,才可使用公平锁。即便使用公平锁,也没法确保线程调度器是公平的。若是线程调度 器选择忽略一个线程,而该线程为了这个锁已经等待了很长时间,那么就没有机会公平地处理这个锁了。
一般,线程进入临界区,却发如今某一条件知足以后它才能执行。要使用一个条件对象来管理那些已经得到了一个锁可是却不能作有用工做的线程。在这一节里,咱们介绍Java库中条件对象的实现。(因为历史的缘由, 条件对象常常被称为条件变量(conditional variable)。 )如今来细化银行的模拟程序。咱们避免选择没有足够资金的帐户做为转出帐户。注意不能使用下面这样的代码:
if (bank.getBalance(from) >= amount) bank.transfer(from, to, amount);
当前线程彻底有可能在成功地完成测试,且在调用transfer方法以前将被中断。
if (bank.getBalance(from) >= amount) // thread night be deactivated at this point bank.transfer(from, to, amount);
在线程再次运行前,帐户余额可能已经低于提款金额。必须确保没有其余线程在本检査余额 与转帐活动之间修改余额。经过使用锁来保护检査与转帐动做来作到这一点:
public void transfer(int from, int to,int amount) { bankLock.lock(); try { while (accounts[from] < amount) { // wait ... } // transfer funds ... } finally { bankLock.unlock(); } }
如今,当帐户中没有足够的余额时,应该作什么呢?等待直到另外一个线程向帐户中注入了资金。可是,这一线程刚刚得到了对bankLock的排它性访问,所以别的线程没有进行存款操做的机会。这就是为何咱们须要条件对象的缘由。
一个锁对象能够有一个或多个相关的条件对象。你能够用newCondition方法得到一个条件对象。习惯上给每个条件对象命名为能够反映它所表达的条件的名字。例如,在此设置一个条件对象来表达“余额充足”条件。
class Bank { private Condition sufficientFunds; ... public Bank() { ... sufficientFunds = bankLock.newCondition(); } }
若是transfer方法发现余额不足,它调用
sufficientFunds.await();
当前线程如今被阻塞了,并放弃了锁。咱们但愿这样可使得另外一个线程能够进行增长帐户余额的操做。
等待得到锁的线程和调用await方法的线程存在本质上的不一样。一旦一个线程调用await方法,它进入该条件的等待集。当锁可用时,该线程不能立刻解除阻塞。相反,它处于阻塞状态,直到另外一个线程调用同一条件上的signalAll方法时为止。
当另外一个线程转帐时,它应该调用
sufficientFunds.signalAll();
这一调用从新激活由于这一条件而等待的全部线程。当这些线程从等待集当中移出时,它们再次成为可运行的,调度器将再次激活它们。同时,它们将试图从新进入该对象。一旦锁成为可用的,它们中的某个将从await调用返回,得到该锁并从被阻塞的地方继续执行。此时,线程应该再次测试该条件。因为没法确保该条件被知足---signalAll方法仅仅是通知正在等待的线程:此时有可能已经知足条件,值得再次去检测该条件。
注释: 一般,对await的调用应该在以下形式的循环体中
while (!(ok to proceed)) condition.await();
相当重要的是最终须要某个其余线程调用signalAll方法。当一个线程调用await时,它没有办法从新激活自身。它寄但愿于其余线程。若是没有其余线程来从新激活等待的线程,它就永远再也不运行了。这将致使使人不快的死锁(deadlock) 现象。若是全部其余线程被阻塞,最后一个活动线程在解除其余线程的阻塞状态以前就调用await方法,那么它也被阻塞。没有任何线程能够解除其余线程的阻塞,那么该程序就挂起了。
应该什么时候调用signalAll呢?经验上讲,在对象的状态有利于等待线程的方向改变时调用signalAll。例如,当一个帐户余额发生改变时,等待的线程会应该有机会检查余额。在例子中,当完成了转帐时,调用signalAll方法。
public void transfer(int from, int to, int amount) { bankLock.lock(); try { while (accounts[from] < amount) sufficientFunds.await(); // transfer funds sufficientFunds.signalAll(); } finally { bankLock.unlock(); } }
注意调用signalAll不会当即激活一个等待线程。它仅仅解除等待线程的阻塞,以便这些线程能够在当前线程退出同步方法以后,经过竞争实现对对象的访问。
另外一个方法signal, 则是随机解除等待集中某个线程的阻塞状态。这比解除全部线程的 阻塞更加有效,但也存在危险。若是随机选择的线程发现本身仍然不能运行,那么它再次被阻塞。若是没有其余线程再次调用signal, 那么系统就死锁了。
警告:当一个线程拥有某个条件的锁时,它仅仅能够在该条件上调用await、signalAll或signal方法。
若是你运行程序清单14-7中的程序,会注意到没有出现任何错误。总余额永远是$100 000。
没有任何帐户曾出现负的余额(可是,你仍是须要按下CTRL+C键来终止程序)。你可能还注意到这个程序运行起来稍微有些慢---这是为同步机制中的簿记操做所付出的代价。
实际上,正确地使用条件是富有挑战性的。在开始实现本身的条件对象以前,应该考虑使用14.10节中描述的结构。
package synch; import java.util.*; import java.util.concurrent.locks.*; /** * A bank with a number of bank accounts that uses locks for serializing access. * ©version 1.30 2004-08-01 * ©author Cay Horstmann */ public class Bank { private final double[] accounts; private Lock bankLock; private Condition sufficientFunds; /** * Constructs the bank. * @param n the number of accounts * @param initialBalance the initial balance for each account */ public Bank(int n, double initialBalance) { accounts = new double[n]; Arrays.fill(accounts, initialBalance); bankLock = new ReentrantLock(); sufficientFunds = bankLock.newCondition(); /** * Transfers money from one account to another. * @param from the account to transfer from * @param to the account to transfer to * @paran amount the amount to transfer */ public void transfer(int from, int to, double amount) throws InterruptedException { bankLock.lock(); try { while (accounts[from] < amount) sufficientFunds.await(); System.out.print(Thread.currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %6 to %d", amount, from, to); accounts[to] += amount; System.out.printf("Total Balance: %10.2f%n", getTotalBalance()); sufficientFunds.signalAll(); } finally { bankLock.unlock(); } } /** * Gets the sum of all account balances. * ©return the total balance */ public double getTotalBalance() { bankLock.lock(); try { double sum = 0; for (double a:accounts) sum += a; return sum; } finally { bankLock.unlock(); } } /** * Gets the number of accounts in the bank. * ©return the number of accounts */ public int size() { return accounts.length; } }
在前面一节中,介绍了如何使用Lock和Condition对象。在进一步深刻以前,总结一下有关锁和条件的关键之处:
Lock和Condition接口为程序设计人员提供了高度的锁定控制。然而,大多数状况下,并不须要那样的控制,而且可使用一种嵌入到Java语言内部的机制。从1.0版开始,Java中的每个对象都有一个内部锁。若是一个方法用synchronized关键字声明,那么对象的锁将保护整个方法。也就是说,要调用该方法,线程必须得到内部的对象锁。
换句话说,
public synchronized void method() { method body }
等价于
public void method() { this.intrinsidock.lock(); try { method body } finally { this.intrinsicLock.unlock(); } }
例如,能够简单地声明Bank类的transfer方法为synchronized, 而不是使用一个显式的锁。内部对象锁只有一个相关条件。wait方法添加一个线程到等待集中,notifyAll/notify
方法解除等待线程的阻塞状态。换句话说,调用wait
或notityAll
等价于
intrinsicCondition.await(); intrinsicCondition.signalAll();
注释:
wait
、notifyAll
以及notify
方法是Object类的final方法。Condition方法必须被命名为await
、signalAll
和signal
以便它们不会与那些方法发生冲突。
例如,能够用Java实现Bank类以下:
class Bank { private double[] accounts; public synchronized void transfer(int from,int to, int amount) throws InterruptedException { while (accounts[from] < amount) wait(); // wait on intrinsic object lock's single condition accounts[from] -= amount; accounts[to] += amount; notifyAll();// notify all threads waiting on the condition } public synchronized double getTotalBalance() { ... } }
能够看到,使用synchronized关键字来编写代码要简洁得多。固然,要理解这一代码,你必须了解每个对象有一个内部锁, 而且该锁有一个内部条件。由锁来管理那些试图进入synchronized方法的线程,由条件来管理那些调用wait的线程。
提示:Synchronized方法是相对简单的。可是,初学者经常对条件感到困惑。在使用wait/notifyAll以前,应该考虑使用第14.10节描述的结构之一。
将静态方法声明为synchronized也是合法的。若是调用这种方法,该方法得到相关的类对象的内部锁。例如,若是Bank类有一个静态同步的方法,那么当该方法被调用时,Bankxlass对象的锁被锁住。所以,没有其余线程能够调用同一个类的这个或任何其余的同步静态方法。
内部锁和条件存在一些局限。包括:
在代码中应该使用哪种?Lock和Condition对象仍是同步方法?下面是一些建议:
程序清单14-8 synch2/Bank.java
package synch2; import java.util.*; /** * A bank with a number of bank accounts that uses synchronization primitives. * ©version 1.30 2004-08-01 s * ©author Cay Horstmann */ public class Bank { private final doublet[] accounts; /** * Constructs the bank. * @parain n the number of accounts * @param initialBalance the initial balance for each account */ public Bank(int n, double initialBalance) { accounts = new double[n]; Arrays.fill (accounts, initialBalance); } /** Transfers money from one account to another. * @param from the account to transfer from * @param to the account to transfer to * @param amount the amount to transfer */ public synchronized void transfer(int from, int to, double amount) throws InterruptedException { while (accounts[from] < amount) wait(); System.out.print(Thread.currentThread()); accounts[from] -= amount; System.out.printf(" %10.2f from %d to %d", amount, from, to); accounts[to] += amount; System.out.printf(" Total Balance: %10.2f%n", getTotalBalanceO); notifyAll(); } /** * Gets the sum of all account balances. * return the total balance */ public synchronized double getTotalBalance() { double sum = 0; for (double a : accounts) sum += a; return sum; } /** * Gets the number of accounts in the bank. * ©return the number of accounts */ public int size() { return accounts.length; } }
java.lang.Object 1.0
void notifyAll(); //解除那些在该对象上调用wait方法的线程的阻塞状态。该方法只能在同步方法或同步块内部调用。若是当前线程不是对象锁的持有者,该方法拋出一个IllegalMonitorStateException异常。 void notify(); //随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态。该方法只能在一个同步方法或同步块中调用。若是当前线程不是对象锁的持有者,该方法抛出一个IllegalMonitorStateException异常。 void wait(); //致使线程进入等待状态直到它被通知。该方法只能在一个同步方法中调用。若是当前线程不是对象锁的持有者,该方法拋出一个IllegalMonitorStateException异常。 void wait(long millis); void wait(long millis, int nanos); //致使线程进入等待状态直到它被通知或者通过指定的时间。这些方法只能在一个同步方法中调用。若是当前线程不是对象锁的持有者该方法拋出一个IllegalMonitorStateException异常。 参数 millis 毫秒数 nanos 纳秒数,<1 000 000
正如刚刚讨论的,每个Java对象有一个锁。线程能够经过调用同步方法得到锁。还有另外一种机制能够得到锁,经过进入一个同步阻塞。当线程进入以下形式的阻塞:
synchronized (obj) // this is the syntax for a synchronized block { critical section }
因而它得到Obj的锁。有时会发现“特殊的”锁,例如:
public class Bank { private doublet[] accounts; private Object lock = new Object(); public void transfer(int from, int to, int amount) { synchronized (lock) // an ad-hoc lock { accounts[from] -= amount; accounts[to] += amount; } System.out.println(...) } }
在此,lock
对象被建立仅仅是用来使用每一个Java对象持有的锁。
有时程序员使用一个对象的锁来实现额外的原子操做,实际上称为客户端锁定(clientside locking) 例如,考虑Vector类,一个列表,它的方法是同步的。如今,假定在Vector<Double>
中存储银行余额。这里有一个transfer
方法的原始实现:
public void transfer(Vector<Double> accounts, int from, int to, int amount)// Error { accounts.set(from, accounts.get(from)-amount); accounts.set(to, accounts.get(to)+ amount); System.out.println(...); }
Vector类的get和set方法是同步的,可是,这对于咱们并无什么帮助。在第一次对get的调用已经完成以后,一个线程彻底可能在transfer方法中被剥夺运行权。因而,另外一个线程可能在相同的存储位置存入不一样的值。可是,咱们能够截获这个锁:
public void transfer(Vector<Double> accounts, int from, int to, int amount) { synchronized(accounts) { accounts.set(from, accounts.get(from)- amount); accounts.set(to, accounts.get(to)+ amount); } System.out.println(...); }
这个方法能够工做,可是它彻底依赖于这样一个事实,Vector类对本身的全部可修改方法都使用内部锁。然而,这是真的吗?Vector类的文档没有给出这样的承诺。不得不仔细研究源代码并但愿未来的版本能介绍非同步的可修改方法。如你所见,客户端锁定是很是脆弱的,一般不推荐使用。
锁和条件是线程同步的强大工具,可是,严格地讲,它们不是面向对象的。多年来,研究人员努力寻找一种方法,能够在不须要程序员考虑如何加锁的状况下,就能够保证多线程的安全性。最成功的解决方案之一是监视器(monitor), 这一律念最先是由PerBrinchHansen和TonyHoare在20世纪70年代提出的。用Java的术语来说,监视器具备以下特性:
obj.method()
, 那么obj对象的锁是在方法调用开始时自动得到,而且当方法返回时自动释放该锁。由于全部的域是私有的,这样的安排能够确保一个线程在对对象操做时, 没有其余线程能访问该域。监视器的早期版本只有单一的条件,使用一种很优雅的句法。能够简单地调用await accounts[from] >= balance
而不使用任何显式的条件变量。然而,研究代表盲目地从新测试条件是低效的。显式的条件变量解决了这一问题。每个条件变量管理一个独立的线程集。
Java设计者以不是很精确的方式采用了监视器概念,Java中的每个对象有一个内部的锁和内部的条件。若是一个方法用synchronized关键字声明,那么,它表现的就像是一个监视器方法。经过调用wait/notifyAll/notify
来访问条件变量。
然而,在下述的3个方面Java对象不一样于监视器,从而使得线程的安全性降低:
这种对安全性的轻视激怒了Per Brinch Hansen。他在一次对原始Java中的多线程的严厉评论中,写道:“这实在是令我震惊,在监视器和并发Pascal出现四分之一个世纪后,Java的这种不安全的并行机制被编程社区接受。这没有任何益处。” [Java’ s Insecure Parallelism, ACM SIGPLANNotices 34:38-45, April 1999.]
有时,仅仅为了读写一个或两个实例域就使用同步,显得开销过大了。毕竟,什么地方能出错呢?遗憾的是,使用现代的处理器与编译器,出错的可能性很大。
若是你使用锁来保护能够被多个线程访问的代码, 那么能够不考虑这种问题。编译 器被要求经过在必要的时候刷新本地缓存来保持锁的效应,而且不能不正当地从新排序 指令。详细的解释见JSR 133的Java内存模型和线程规范(参看http://www.jcp.org/en/jsr/detail?id=133) 该规范的大部分很复杂并且技术性强,可是文档中也包含了不少解释得很清晰的例子。在http://www-106.ibm.com/developerworks/java/library/j-jtp02244.html 有Brian Goetz写的一个更易懂的概要介绍。
注释:Brian Goetz给出了下述 “同步格言”:“若是向一个变量写入值,而这个变量接下来可能会被另外一个线程读取,或者,从一个变量读值,而这个变量多是以前被另外一个线程写入的,此时必须使用同步”。
volatile关键字为实例域的同步访问提供了一种免锁机制。若是声明一个域为volatile, 那么编译器和虚拟机就知道该域是可能被另外一个线程并发更新的。
例如,假定一个对象有一个布尔标记done, 它的值被一个线程设置却被另外一个线程査询,如同咱们讨论过的那样,你可使用锁:
private boolean done; public synchronized boolean isDone() { return done; } public synchronized void setDone() { done = true; }
或许使用内部锁不是个好主意。若是另外一个线程已经对该对象加锁,isDone和setDone方法可能阻塞。若是注意到这个方面,一个线程能够为这一变量使用独立的Lock。可是,这也会带来许多麻烦。
在这种状况下,将域声明为volatile是合理的:
private volatile boolean done; public boolean isDone() { return done; } public void setDone() { done = true; }
警告:Volatile变量不能提供原子性。例如,方法
public void flipDone() { done = !done; } // not atomic不能确保翻转域中的值。不能保证读取、翻转和写入不被中断。
上一节已经了解到,除非使用锁或volatile修饰符,不然没法从多个线程安全地读取一个域。
还有一种状况能够安全地访问一个共享域,即这个域声明为final时。考虑如下声明:
final Map<String, Double> accounts = new HashKap<>();
其余线程会在构造函数完成构造以后才看到这个accounts变量。
若是不使用 final,就不能保证其余线程看到的是accounts更新后的值,它们可能都只是看到null, 而不是新构造的 HashMap。
固然,对这个映射表的操做并非线程安全的。若是多个线程在读写这个映射表,仍然须要进行同步。
锁和条件不能解决多线程中的全部问题。考虑下面的状况:
帐户 1: $200 帐户 2: $300 线程 1: 从帐户 1 转移 $300 到帐户 2 线程 2: 从帐户 2 转移 $400 到帐户 1
如图14-6所示,线程1和线程2都被阻塞了。由于帐户1以及帐户2中的余额都不足以进行转帐,两个线程都没法执行下去。
有可能会由于每个线程要等待更多的钱款存入而致使全部线程都被阻塞。这样的状态称为死锁(deadlock)。
在这个程序里,死锁不会发生,缘由很简单。每一次转帐至多$1 000。由于有100个帐户,并且全部帐户的总金额是 $100 000, 在任意时刻,至少有一个帐户的余额髙于$1 000。从该帐户取钱的线程能够继续运行。
可是,若是修改run方法,把每次转帐至多$1 000的限制去掉,死锁很快就会发生。试试看。将NACCOUNTS
设为10。每次交易的金额上限设置为2*INITIAL_BALANCE
, 而后运行该程序。程序将运行一段时间后就会挂起。
致使死锁的另外一种途径是让第i个线程负责向第i个帐户存钱,而不是从第i个帐户取钱。 这样一来,有可能将全部的线程都集中到一个帐户上,每个线程都试图从这个帐户中取出大于该帐户余额的钱。试试看。在SynchBankTest程序中,转用TransferRunnable类的run方法。在调用transfer时,交换fromAccount和toAccount。运行该程序并查看它为何会当即死锁。
还有一种很容易致使死锁的状况: 在SynchBankTest程序中, 将signalAll方法转换为signal, 会发现该程序最终会挂起(将 NACCOUNTS设为10能够更快地看到结果)。signalAll通知全部等待增长资金的线程,与此不一样的是signal方法仅仅对一个线程解锁。若是该线程不能继续运行,全部的线程可能都被阻塞。考虑下面这个会发生死锁的例子。
帐户1 :$1990 全部其余帐户:每个 $990 线程 1: 从帐户 1 转移 $995 到帐户 2 全部其余线程: 从他们的帐户转移 $995 到另外一个帐户
显然,除了线程1, 全部的线程都被阻塞,由于他们的帐户中没有足够的余额。
线程1继续执行,运行后出现以下情况:
帐户 1: $995 帐户 2: $1985 全部其余帐户:每一个 $990
而后,线程1调用signal。signal方法随机选择一个线程为它解锁。假定它选择了线程3。该线程被唤醒,发如今它的帐户里没有足够的金额,它再次调用await。可是,线程1仍在运行,将随机地产生一个新的交易,例如,
线程1 :从帐户 1 转移 $997 到帐户 2
如今,线程1也调用await, 全部的线程都被阻塞。系统死锁。问题的原由在于调用signal。它仅仅为一个线程解锁,并且,它极可能选择一个不能继续运行的线程(在咱们的例子中,线程2必须把钱从帐户2中取出)遗憾的是,Java编程语言中没有任何东西能够避免或打破这种死锁现象。必须仔细设计程序,以确保不会出现死锁。
线程在调用lock方法来得到另外一个线程所持有的锁的时候,极可能发生阻塞。应该更加谨慎地申请锁。tryLock方法试图申请一个锁,在成功得到锁后返回true, 不然,当即返回 false, 并且线程能够当即离开去作其余事情。
if (myLock.tryLock()) { // now the thread owns the lock try { ... } finally { myLock.unlock(); } } else // do something else
能够调用tryLock时,使用超时参数,像这样:
if (myLock.tryLock(100, TineUnit.MILLISECONDS)) ...
TimeUnit是一 枚举类型,能够取的值包括SECONDS
、MILLISECONDS
,MICROSECONDS
和NANOSECONDS
。
lock方法不能被中断。若是一个线程在等待得到一个锁时被中断,中断线程在得到锁以前一直处于阻塞状态。若是出现死锁,那么,lock方法就没法终止。
然而,若是调用带有用超时参数的tryLock, 那么若是线程在等待期间被中断,将抛出InterruptedException异常。这是一个很是有用的特性,由于容许程序打破死锁。
也能够调用locklnterruptibly方法。它就至关于一个超时设为无限的tryLock方法。
在等待一个条件时,也能够提供一个超时:
myCondition.await(100, TineUniBILLISECONDS))
若是一个线程被另外一个线程经过调用signalAU或signal激活,或者超时时限已达到,或者线程被中断,那么await方法将返回。
若是等待的线程被中断,await方法将抛出一个InterruptedException异常。在你但愿出现这种状况时线程继续等待(可能不太合理),可使用awaitUninterruptibly方法代替 await。
java.util.concurrent.locks.Lock 5.0
boolean tryLock(); //尝试得到锁而没有发生阻塞;若是成功返回真。这个方法会抢夺可用的锁,即便该锁有公平加锁策略,即使其余线程已经等待好久也是如此。 boolean tryLock(long time, TimeUnit unit); //尝试得到锁,阻塞时间不会超过给定的值;若是成功返回 true。 void lockInterruptibly(); //得到锁,可是会不肯定地发生阻塞。若是线程被中断,抛出一个InterruptedException异常。
java.util.concurrent.locks.Condition 5.0
boolean await(long time, TimeUnit unit); //进入该条件的等待集,直到线程从等待集中移出或等待了指定的时间以后才解除阻塞。若是由于等待时间到了而返回就返回false, 不然返回true。 void awaitUninterruptibly(); //进入该条件的等待集,直到线程从等待集移出才解除阻塞。若是线程被中断,该方法 不会抛出InterruptedException异常。
如今,读者已经看到了造成Java并发程序设计基础的底层构建块。然而,对于实际编程来讲,应该尽量远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便得多、要安全得多。
对于许多线程问题,能够经过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素, 消费者线程则取出它们。使用队列,能够安全地从一个线程向另 一个线程传递数据。例如,考虑银行转帐程序,转帐线程将转帐指令对象插入一个队列中,而不是直接访问银行对象。另外一个线程从队列中取出指令执行转帐。只有该线程能够访问该银行对象的内部。所以不须要同步。(固然,线程安全的队列类的实现者不能不考虑锁和条件,可是, 那是他们的问题而不是你的问题。
当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blocking queue)致使线程阻塞。在协调多个线程之间的合做时,阻塞队列是一个有用的工具。工做者线程能够周期性地将中间结果存储在阻塞队列中。其余的工做者线程移出中间结果并进一步加以修改。队列会自动地平衡负载。若是第一个线程集运行得比第二个慢, 第二个线程集在等待结果时会阻塞。若是第一个线程集运行得快,它将等待第二个队列集遇上来。表14-1给出了阻塞队列的方法
方法 | 正常动做 | t特殊状况下的动做 |
---|---|---|
add | 添加一个元素 | 若是队列满,则抛出IllegalStateException异常 |
element | 返回队列的头元素 | 若是队列空,抛出NoSuchElementException异常 |
offer | 添加一个元素并返回true | 若是队列满,返回false |
peek | 返回队列的头元素 | 若是队列空,则返回null |
poll | 移出并返回队列的头元素 | 若是队列空,则返回null |
put | 添加一个元素 | 若是队列满,则阻塞 |
remove | 移出并返回头元素 | 若是队列空。则抛出NoSuchElementException异常 |
take | 移出并返回头元素 | 若是队列空,则阻塞 |
阻塞队列方法分为如下3类,这取决于当队列满或空时它们的响应方式。若是将队列看成线程管理工具来使用,将要用到put和take方法。当试图向满的队列中添加或从空的队列 中移出元素时,add、remove和element操做抛出异常。固然,在一个多线程程序中,队列会在任什么时候候空或满,所以,必定要使用offer、poll和peek方法做为替代。这些方法若是不能完成任务,只是给出一个错误提示而不会抛出异常。
注释: poll和peek方法返回空来指示失败。所以,向这些队列中插入null值是非法的。
还有带有超时的offer方法和poll方法的变体。例如,下面的调用:
boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
尝试在100毫秒的时间内在队列的尾部插入一个元素。若是成功返回true;不然,达到超时时,返回false。相似地,下面的调用:
Object head = q.poll(100, TimeUnit.MILLISECONDS);
尝试用100毫秒的时间移除队列的头元素;若是成功返回头元素,不然,达到在超时时,返回null。
若是队列满,则put方法阻塞;若是队列空,则take方法阻塞。在不带超时参数时,offer和poll方法等效。
java.util.concurrent包提供了阻塞队列的几个变种。默认状况下,LinkedBlockingQueue的容量是没有上边界的,可是,也能够选择指定最大容量。LinkedBlockingDeque是一个双端的版本。ArrayBlockingQueue在构造时须要指定容量,而且有一个可选的参数来指定是否须要公平性。若设置了公平参数,则那么等待了最长时间的线程会优先获得处理。一般,公平性会下降性能,只有在确实很是须要时才使用它。
PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出。该队列是没有容量上限,可是,若是队列是空的,取元素的操做会阻塞。(有关优先级队列的详细内容参看第9章。 )
最后,DelayQueue包含实现Delayed接口的对象:
interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
getDelay方法返回对象的残留延迟。负值表示延迟已经结束。元素只有在延迟用完的情 况下才能从DelayQueue移除。还必须实现compareTo方法。DelayQueue使用该方法对元素进行排序。
JavaSE 7增长了一个TranSferQueue接口,容许生产者线程等待,直到消费者准备就绪能够接收一个元素。若是生产者调用
q.transfer(item);
这个调用会阻塞,直到另外一个线程将元素(item)删除。LinkedTransferQueue类实现了这个接口。
程序清单14-9中的程序展现了如何使用阻塞队列来控制一组线程。程序在一个目录及它的全部子目录下搜索全部文件,打印出包含指定关键字的行。
程序清单 14-9 blockingQueue/BlockingQueueTest.java
package blockingQueue; import java.io.*; import java.util.*; import java.util.concurrent.*; /** * ©version 1.02 2015-06-21 * author Cay Horstmann */ public class BlockingQueueTest { private static final int FILE_QUEUE_SIZE = 10; private static final int SEARCH_THREADS = 100; private static final File DUMMY = new File(""); private static BlockingQueue<File> queue = new ArrayBlockingQueueo(FILE_QUEUE_SIZE); public static void main(String[] args) { try (Scanner in = new Scanner(System.in)) { System.out.print("Enter base directory (e.g. /opt/jdkl.8.0/src): "); String directory = in.nextline(); System.out.print("Enter keyword (e.g. volatile): "); String keyword = in.nextLine(); Runnable enumerator = () -> { try { enumerate(new File(directory)); queue.put(DUMMY); } catch (InterruptedException e) { } }; new Thread(enumerator).start(); for (int i = 1 ; i <= SEARCH.THREADS; i++) { Runnable searcher = () -> { try { boolean done = false; while (!done) { File file = queue.take(); if (file = DUMMY) { queue.put(file); done = true; } else search(file, keyword); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { } }; new Thread(searcher).start(); } } } /** * Recursively enumerates all files in a given directory and its subdirectories. * @paran directory the directory in which to start */ public static void enumerate(File directory) throws InterruptedException { File[] files = directory.listFiles(); for (File file : files) { if (file.isDirectory()) enumerate(file); else queue.put(file); } } /** * Searches a file for a given keyword and prints all matching lines. * @param file the file to search * @param keyword the keyword to search for */ public static void search(File file, String keyword) throws IOException { try (Scanner in = new Scanner(file, "UTF-8")) { int lineNuinber = 0; while (in.hasNextLine()) { lineNumber++; String line = in.nextLine(); if (line,contains(keyword)) System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber,line); } } } }
生产者线程枚举在全部子目录下的全部文件并把它们放到一个阻塞队列中。这个操做很快,若是没有上限的话,很快就包含了全部找到的文件。
咱们同时启动了大量搜索线程。每一个搜索线程从队列中取出一个文件,打开它,打印全部包含该关键字的行,而后取出下一个文件。咱们使用一个小技巧在工做结束后终止这个应用程序。为了发出完成信号,枚举线程放置一个虚拟对象到队列中(这就像在行李输送带上放一个写着“最后一个包”的虚拟包)。当搜索线程取到这个虚拟对象时,将其放回并终止。
注意,不须要显式的线程同步。在这个应用程序中,咱们使用队列数据结构做为一种同步机制。