Java并发教程-7高级并发对象

目前为止,该教程重点讲述了最初做为Java平台一部分的低级别API。这些API对于很是基本的任务来讲已经足够,可是对于更高级的任务就须要更高级的API。特别是针对充分利用了当今多处理器和多核系统的大规模并发应用程序。 本节,咱们将着眼于Java 5.0新增的一些高级并发特征。大多数特征已经在新的java.util.concurrent包中实现。Java集合框架中也定义了新的并发数据结构。 

  • 锁对象提供了能够简化许多并发应用的锁的惯用法。
  • Executors为加载和管理线程定义了高级API。Executors的实现由java.util.concurrent包提供,提供了适合大规模应用的线程池管理。
  • 并发集合简化了大型数据集合管理,且极大的减小了同步的需求。
  • 原子变量有减少同步粒度和避免内存一致性错误的特征。
  • 并发随机数(JDK7)提供了高效的多线程生成伪随机数的方法。
1.  锁对象  

同步代码依赖于一种简单的可重入锁。这种锁使用简单,但也有诸多限制。 

java.util.concurrent.locks 包提供了更复杂的锁。咱们不会详细考察这个包,但会重点关注其最基本的接口,锁。  锁对象做用很是相似同步代码使用的隐式锁。如同隐式锁,每次只有一个线程能够得到锁对象。经过关联 Condition 对象,锁对象也支持wait/notify机制。 锁对象之于隐式锁最大的优点在于,它们有能力收回得到锁的尝试。若是当前锁对象不可用,或者锁请求超时(若是超时时间已指定),tryLock方法会收回获取锁的请求。若是在锁获取前,另外一个线程发送了一个中断,lockInterruptibly方法也会收回获取锁的请求。 让咱们使用锁对象来解决咱们在 活跃度 中见到的死锁问题。Alphonse和Gaston已经把本身训练成能注意到朋友什么时候要鞠躬。咱们经过要求Friend对象在双方鞠躬前必须先得到锁来模拟此次改善。下面是改善后模型的源代码,Safelock。为了展现其用途普遍,咱们假设Alphonse和Gaston对于他们新发现的稳定鞠躬的能力是如此入迷,以致于他们没法不相互鞠躬。 

Java代码 
  1. import java.util.concurrent.locks.Lock;  
  2. import java.util.concurrent.locks.ReentrantLock;  
  3. import java.util.Random;  
  4.   
  5. public class Safelock {  
  6.     static class Friend {  
  7.         private final String name;  
  8.         private final Lock lock = new ReentrantLock();  
  9.   
  10.         public Friend(String name) {  
  11.             this.name = name;  
  12.         }  
  13.   
  14.         public String getName() {  
  15.             return this.name;  
  16.         }  
  17.   
  18.         public boolean impendingBow(Friend bower) {  
  19.             Boolean myLock = false;  
  20.             Boolean yourLock = false;  
  21.             try {  
  22.                 myLock = lock.tryLock();  
  23.                 yourLock = bower.lock.tryLock();  
  24.             } finally {  
  25.                 if (! (myLock && yourLock)) {  
  26.                     if (myLock) {  
  27.                         lock.unlock();  
  28.                     }  
  29.                     if (yourLock) {  
  30.                         bower.lock.unlock();  
  31.                     }  
  32.                 }  
  33.             }  
  34.             return myLock && yourLock;  
  35.         }  
  36.   
  37.         public void bow(Friend bower) {  
  38.             if (impendingBow(bower)) {  
  39.                 try {  
  40.                     System.out.format("%s: %s has"  
  41.                         + " bowed to me!%n",  
  42.                         this.name, bower.getName());  
  43.                     bower.bowBack(this);  
  44.                 } finally {  
  45.                     lock.unlock();  
  46.                     bower.lock.unlock();  
  47.                 }  
  48.             } else {  
  49.                 System.out.format("%s: %s started"  
  50.                     + " to bow to me, but saw that"  
  51.                     + " I was already bowing to"  
  52.                     + " him.%n",  
  53.                     this.name, bower.getName());  
  54.             }  
  55.         }  
  56.   
  57.         public void bowBack(Friend bower) {  
  58.             System.out.format("%s: %s has" +  
  59.                 " bowed back to me!%n",  
  60.                 this.name, bower.getName());  
  61.         }  
  62. }  
  63.   
  64.     static class BowLoop implements Runnable {  
  65.         private Friend bower;  
  66.         private Friend bowee;  
  67.   
  68.         public BowLoop(Friend bower, Friend bowee) {  
  69.             this.bower = bower;  
  70.             this.bowee = bowee;  
  71.         }  
  72.   
  73.         public void run() {  
  74.             Random random = new Random();  
  75.             for (;;) {  
  76.                 try {  
  77.                     Thread.sleep(random.nextInt(10));  
  78.                 } catch (InterruptedException e) {}  
  79.                 bowee.bow(bower);  
  80.             }  
  81.         }  
  82.     }  
  83.   
  84.     public static void main(String[] args) {  
  85.         final Friend alphonse =  
  86.             new Friend("Alphonse");  
  87.         final Friend gaston =  
  88.             new Friend("Gaston");  
  89.         new Thread(new BowLoop(alphonse, gaston)).start();  
  90.         new Thread(new BowLoop(gaston, alphonse)).start();  
  91.     }  
  92. }  


2.  执行器(Executors)  

在以前全部的例子中,Thread对象表示的线程和Runnable对象表示的线程所执行的任务之间是紧耦合的。这对于小型应用程序来讲没问题,但对于大规模并发应用来讲,合理的作法是将线程的建立与管理和程序的其余部分分离开。封装这些功能的对象就是执行器,接下来的部分将讲详细描述执行器。  

3.  Executor接口  

java.util.concurrent中包括三个Executor接口: 

  • Executor,一个运行新任务的简单接口。
  • ExecutorService,扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法。
  • ScheduledExecutorService,扩展了ExecutorService。支持Future和按期执行任务。
一般来讲,指向Executor对象的变量应被声明为以上三种接口之一,而不是具体的实现类。 

Executor接口  

Executor 接口只有一个execute方法,用来替代一般建立(启动)线程的方法。例如:r是一个Runnable对象,e是一个Executor对象。可使用 
Java代码 
  1. e.execute(r);  

来代替 
Java代码 
  1. (new Thread(r)).start();  

但execute方法没有定义具体的实现方式。对于不一样的Executor实现,execute方法多是建立一个新线程并当即启动,但更有多是使用已有的工做线程运行r,或者将r放入到队列中等待可用的工做线程。(咱们将在线程池一节中描述工做线程。) 

ExecutorService接口  

ExecutorService 接口在提供了execute方法的同时,新加了更加通用的submit方法。submit方法除了和execute方法同样能够接受Runnable对象做为参数,还能够接受Callable对象做为参数。使用Callable对象能够能使任务返还执行的结果。经过submit方法返回的Future对象能够读取Callable任务的执行结果,或是管理Callable任务和Runnable任务的状态。 ExecutorService也提供了批量运行Callable任务的方法。最后,ExecutorService还提供了一些关闭执行器的方法。若是须要支持即时关闭,执行器所执行的任务须要正确处理中断。 

ScheduledExecutorService接口  

ScheduledExecutorService 扩展ExecutorService接口并添加了schedule方法。调用schedule方法能够在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔按期执行任务的scheduleAtFixedRate方法和scheduleWithFixedDelay方法。 

4.  线程池  

在java.util.concurrent包中多数的执行器实现都使用了由工做线程组成的线程池,工做线程独立于所它所执行的Runnable任务和Callable任务,而且经常使用来执行多个任务。 使用工做线程可使建立线程的开销最小化。 

在大规模并发应用中,建立大量的Thread对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。一种最多见的线程池是固定大小的线程池。这种线程池始终有必定数量的线程在运行,若是一个线程因为某种缘由终止运行了,线程池会自动建立一个新的线程来代替它。须要执行的任务经过一个内部队列提交给线程,当没有更多的工做线程能够用来执行任务时,队列保存额外的任务。 使用固定大小的线程池一个很重要的好处是能够实现优雅退化。例如一个Web服务器,每个HTTP请求都是由一个单独的线程来处理的,若是为每个HTTP都建立一个新线程,那么当系统的开销超出其能力时,会忽然地对全部请求都中止响应。若是限制Web服务器能够建立的线程数量,那么它就没必要当即处理全部收到的请求,而是在有能力处理请求时才处理。 建立一个使用线程池的执行器最简单的方法是调用 java.util.concurrent.Executors newFixedThreadPool 方法。Executors类还提供了下列一下方法: 

  • newCachedThreadPool方法建立了一个可扩展的线程池。适合用来启动不少短任务的应用程序。
  • newSingleThreadExecutor方法建立了每次执行一个任务的执行器。
  • 还有一些建立ScheduledExecutorService执行器的方法。
若是上面的方法都不知足须要,能够尝试 java.util.concurrent.ThreadPoolExecutor 或者 java.util.concurrent.ScheduledThreadPoolExecutor 。 

5.  Fork/Joint  

fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些可以被递归地拆解成子任务的工做类型量身设计的。其目的在于可以使用全部可用的运算能力来提高你的应用的性能。   相似于ExecutorService接口的其余实现,fork/join框架会将任务分发给线程池中的工做线程。fork/join框架的独特之处在与它使用工做窃取(work-stealing)算法。完成本身的工做而处于空闲的工做线程可以从其余仍然处于忙碌(busy)状态的工做线程处窃取等待执行的任务。 fork/join框架的核心是 ForkJoinPool 类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工做偷取算法,并能够执行 ForkJoinTask 任务。 

基本使用方法  

使用fork/join框架的第一步是编写执行一部分工做的代码。你的代码结构看起来应该与下面所示的伪代码相似: 

Java代码 
  1. if (当前这个任务工做量足够小)  
  2.     直接完成这个任务  
  3. else  
  4.     将这个任务或这部分工做分解成两个部分  
  5.     分别触发(invoke)这两个子任务的执行,并等待结果  


你须要将这段代码包裹在一个ForkJoinTask的子类中。不过,一般状况下会使用一种更为具体的的类型,或者是 RecursiveTask (会返回一个结果),或者是 RecursiveAction 。 当你的ForkJoinTask子类准备好了,建立一个表明全部须要完成工做的对象,而后将其做为参数传递给一个ForkJoinPool实例的invoke()方法便可。 

要清晰,先模糊  

想要了解fork/join框架的基本工做原理,接下来的这个例子会有所帮助。假设你想要模糊一张图片。原始的source图片由一个整数的数组表示,每一个整数表示一个像素点的颜色数值。与source图片相同,模糊以后的destination图片也由一个整数数组表示。 对图片的模糊操做是经过对source数组中的每个像素点进行处理完成的。处理的过程是这样的:将每一个像素点的色值取出,与周围像素的色值(红、黄、蓝三个组成部分)放在一块儿取平均值,获得的结果被放入destination数组。由于一张图片会由一个很大的数组来表示,这个流程会花费一段较长的时间。若是使用fork/join框架来实现这个模糊算法,你就可以借助多处理器系统的并行处理能力。下面是上述算法结合fork/join框架的一种简单实现: 

Java代码 
  1. public class ForkBlur extends RecursiveAction {  
  2. private int[] mSource;  
  3. private int mStart;  
  4. private int mLength;  
  5. private int[] mDestination;  
  6.   
  7. // Processing window size; should be odd.  
  8. private int mBlurWidth = 15;  
  9.   
  10. public ForkBlur(int[] src, int start, int length, int[] dst) {  
  11.     mSource = src;  
  12.     mStart = start;  
  13.     mLength = length;  
  14.     mDestination = dst;  
  15. }  
  16.   
  17. protected void computeDirectly() {  
  18.     int sidePixels = (mBlurWidth - 1) / 2;  
  19.     for (int index = mStart; index < mStart + mLength; index++) {  
  20.         // Calculate average.  
  21.         float rt = 0, gt = 0, bt = 0;  
  22.         for (int mi = -sidePixels; mi <= sidePixels; mi++) {  
  23.             int mindex = Math.min(Math.max(mi + index, 0),  
  24.                                 mSource.length - 1);  
  25.             int pixel = mSource[mindex];  
  26.             rt += (float)((pixel & 0x00ff0000) >> 16)  
  27.                   / mBlurWidth;  
  28.             gt += (float)((pixel & 0x0000ff00) >>  8)  
  29.                   / mBlurWidth;  
  30.             bt += (float)((pixel & 0x000000ff) >>  0)  
  31.                   / mBlurWidth;  
  32.         }  
  33.   
  34.         // Reassemble destination pixel.  
  35.         int dpixel = (0xff000000     ) |  
  36.                (((int)rt) << 16) |  
  37.                (((int)gt) <<  8) |  
  38.                (((int)bt) <<  0);  
  39.         mDestination[index] = dpixel;  
  40.     }  
  41. }  


接下来你须要实现父类中的compute()方法,它会直接执行模糊处理,或者将当前的工做拆分红两个更小的任务。数组的长度能够做为一个简单的阀值来判断任务是应该直接完成仍是应该被拆分。 

Java代码 
  1. protected static int sThreshold = 100000;  
  2.   
  3. protected void compute() {  
  4.     if (mLength < sThreshold) {  
  5.         computeDirectly();  
  6.         return;  
  7.     }  
  8.   
  9.     int split = mLength / 2;  
  10.   
  11.     invokeAll(new ForkBlur(mSource, mStart, split, mDestination),  
  12.               new ForkBlur(mSource, mStart + split, mLength - split,  
  13.                            mDestination));  
  14. }  


若是前面这个方法是在一个RecursiveAction的子类中,那么设置任务在ForkJoinPool中执行就再直观不过了。一般会包含如下一些步骤: 

(1) 建立一个表示全部须要完成工做的任务。 

Java代码 
  1. // source image pixels are in src  
  2. // destination image pixels are in dst  
  3. ForkBlur fb = new ForkBlur(src, 0, src.length, dst);  


(2) 建立将要用来执行任务的ForkJoinPool。 

Java代码 
  1. ForkJoinPool pool = new ForkJoinPool();  


(3) 执行任务。 

Java代码 
  1. pool.invoke(fb);  


想要浏览完成的源代码,请查看 ForkBlur ,其中还包含一些建立destination图片文件的额外代码。 

标准实现  

除了可以使用fork/join框架来实现可以在多处理系统中被并行执行的定制化算法(如前文中的ForkBlur.java例子),在Java SE中一些比较经常使用的功能点也已经使用fork/join框架来实现了。在Java SE 8中,java.util.Arrays类的一系列parallelSort()方法就使用了fork/join来实现。这些方法与sort()系列方法很相似,可是经过使用fork/join框架,借助了并发来完成相关工做。在多处理器系统中,对大数组的并行排序会比串行排序更快。这些方法到底是如何运用fork/join框架并不在本教程的讨论范围内。想要了解更多的信息,请参见Java API文档。 其余采用了fork/join框架的方法还包括java.util.streams包中的一些方法,此包是做为Java SE 8发行版中 Project Lambda 的一部分。想要了解更多信息,请参见 Lambda Expressions 一节。 

6.  并发集合  

java.util.concurrent包囊括了Java集合框架的一些附加类。它们也最容易按照集合类所提供的接口来进行分类: 

  • BlockingQueue定义了一个先进先出的数据结构,当你尝试往满队列中添加元素,或者从空队列中获取元素时,将会阻塞或者超时。
  • ConcurrentMapjava.util.Map的子接口,定义了一些有用的原子操做。移除或者替换键值对的操做只有当key存在时才能进行,而新增操做只有当key不存在时。使这些操做原子化,能够避免同步。ConcurrentMap的标准实现是ConcurrentHashMap,它是HashMap的并发模式。
  • ConcurrentNavigableMap是ConcurrentMap的子接口,支持近似匹配。ConcurrentNavigableMap的标准实现是ConcurrentSkipListMap,它是TreeMap的并发模式。
  • 全部这些集合,经过 在集合里新增对象和访问或移除对象的操做之间,定义一个happens-before的关系,来帮助程序员避免内存一致性错误

7.  原子变量  

java.util.concurrent.atomic 包定义了对单一变量进行原子操做的类。全部的类都提供了get和set方法,可使用它们像读写volatile变量同样读写原子类。就是说,同一变量上的一个set操做对于任意后续的get操做存在happens-before关系。原子的compareAndSet方法也有内存一致性特色,就像应用到整型原子变量中的简单原子算法。   为了看看这个包如何使用,让咱们返回到最初用于演示线程干扰的 Counter 类: 

Java代码 
  1. class Counter {  
  2.     private int c = 0;  
  3.     public void increment() {  
  4.         c++;  
  5.     }  
  6.   
  7.     public void decrement() {  
  8.         c--;  
  9.     }  
  10.   
  11.     public int value() {  
  12.         return c;  
  13.     }  
  14. }  


使用同步是一种使Counter类变得线程安全的方法,如 SynchronizedCounter : 

Java代码 
  1. class SynchronizedCounter {  
  2. private int c = 0;  
  3. public synchronized void increment() {  
  4. c++;  
  5. }  
  6. public synchronized void decrement() {  
  7. c--;  
  8. }  
  9. public synchronized int value() {  
  10. return c;  
  11. }  
  12. }  


对于这个简单的类,同步是一种可接受的解决方案。可是对于更复杂的类,咱们可能想要避免没必要要同步所带来的活跃度影响。将int替换为AtomicInteger容许咱们在不进行同步的状况下阻止线程干扰,如 AtomicCounter : 

Java代码 
  1. import java.util.concurrent.atomic.AtomicInteger;  
  2. class AtomicCounter {  
  3. private AtomicInteger c = new AtomicInteger(0);  
  4. public void increment() {  
  5. c.incrementAndGet();  
  6. }  
  7.   
  8. public void decrement() {  
  9. c.decrementAndGet();  
  10. }  
  11.   
  12. public int value() {  
  13. return c.get();  
  14. }  


8.  并发随机数  

在JDK7中,java.util.concurrent包含了一个至关便利的类,ThreadLocalRandom,当应用程序指望在多个线程或ForkJoinTasks中使用随机数时。 

对于并发访问,使用TheadLocalRandom代替Math.random()能够减小竞争,从而得到更好的性能。 

你只需调用ThreadLocalRandom.current(), 而后调用它的其中一个方法去获取一个随机数便可。下面是一个例子: 

Java代码 
  1. int r = ThreadLocalRandom.current().nextInt(4,77);  
相关文章
相关标签/搜索