跟着 The Java Tutorials 把并发的一些基础过了一遍,发现仍然仍是有不少不清楚的地方,主要是由于日常没有机会实际应用吧,理论知识要有,实践也很重要,哪怕是写些小 demo 也能够的。html
虽然主要是跟着 tutorials 的 concurrency 章节整理的,但这并非官方文档的一个翻译哈,看到一个地方有前置技能不足的时候,就会穿插着一些对 API 的学习之类的。java
还有就是。。segmentfault 对 markdown 的解析和我用的编辑器好像不太一致,引用的地方有点乱,实在懒得改了。。要是看着不舒服,能够看这个 并发基础(一)git
Java 提供了两种基础的同步语法:1. synchronized 方法 2. synchronized 语句块github
同步是在内置锁(intrinsic lock)或者叫作监视锁(monitor lock)的基础上创建的(API 中一般将其称为 monitor),这个 monitor 在同步的两个方面发挥做用: 1. 强制性的单独访问对象 2. 创建 happens-before 关系编程
每一个对象都有本身的内置锁,若是有线程想要访问这个对象,须要先获取这个对象的内置锁,那么此时其余线程是没法获取这个锁的,也就是没法访问这个对象,直至先前的线程释放锁。segmentfault
synchronized 就是 Java 对内置锁的支持。api
当线程调用 synchronized 方法时,就会自动获取该方法对象的 synchronized 锁,返回时才会释放,即便是由没被 catch 的异常返回的,也会释放锁。数组
那么,对于 static synchronized method 呢?这个让人有些迷惑,由于 static method 是和类关联的,而不是对象。缓存
Intrinsic Locks and Synchronization (The Java™ Tutorials > Essential Classes > Concurrency)安全
In this case, the thread acquires the intrinsic lock for the Class object associated with the class. Thus access to class's static fields is controlled by a lock that's distinct from the lock for any instance of the class.
根据文档的描述,这种状况下,对类的静态域的访问和对类的普通实例的访问所获取的锁是不一样的。
其实就是类锁和对象锁的区别,类锁也就是 static synchronized method 和 synchronized(xx.class){} 所使用的锁,对象锁就是 non-static synchronized method 和 synchronized(xxx){} 所使用的锁。
由于这两个锁是不一样的,因此若是对同一个类 A,线程 1 想获取类 A 对象实例的锁,和线程 2 想获取类 A 的类锁,这两个线程是不存在竞争关系的。
下面看看 synchronized method 的具体做用:
做用:
这个 warning 值得注意
Synchronized Methods (The Java™ Tutorials > Essential Classes > Concurrency)
Warning: When constructing an object that will be shared between threads, be very careful that a reference to the object does not "leak" prematurely. For example, suppose you want to maintain aList
calledinstances
containing every instance of class. You might be tempted to add the following line to your constructor:
<div class="codeblock"><pre>
instances.add(this);
</pre></div>But then other threads can use
instances
to access the object before construction of the object is complete.
这是在多线程中很容易忽略的一个问题,就是在尚未构建对象完成时,其余线程就已经访问了这个对象。
这部分以及活跃性问题在以前的博客里有详细写了 线程安全性
原子性的动做是一次完成的
Atomic Access (The Java™ Tutorials > Essential Classes > Concurrency)
<ul><li>Reads and writes are atomic for reference variables and for most primitive variables (all types exceptlong
anddouble
).</li><li>Reads and writes are atomic for all variables declaredvolatile
(includinglong
anddouble
variables).</li></ul>
翻译过来就是:
这段我看了以后有点迷,由于以前的概念中就是 volatile 只能保证被修饰变量的可见性而不能保证原子性,为什么文档中说 "Reads and writes are atomic for all variables declared volatile"?还有就是,对引用变量的读写是原子操做?
想了想,我以为他想表达的意思是这样的:
首先,文档中说的读写,确定指的是单独的读、单独的写,好比 int a = 1; 这句确定是一个原子操做,就是向 a 写入值 1,这很容易看出来。
可若是是 b = a; 呢?这样就很容易让人感到迷惑了,猛然一看,这就是对引用变量的写入,可是,它应该是分为几步操做的,1. 读取 a 2. 写入 b,若是说单独对 a 的读取和单独对 b 的写入,这些都是原子操做,文档中说的是没有错的,但是其实是对于 a 这个变量引用,它是随时有可能被更新的,也就是说 b = a; 可能被分解为 1. 读取 a 2. 写入 b 3. 写入 a,这个时候 b = a; 这个语句就会带来错误。
由于这种状况实际上是很容易遇到的,因此给个人印象就是 b = a; 这种对引用变量的读写一般并非原子操做,因此看到文档中这段话感到很迷惑,文档的描述是正确的,只不过他所界定的原子操做和读写的概念,指的是一种最窄的概念,而且,对于 b = a; 这种对引用变量的赋值,之因此出现问题,其实是由于对变量 a 可见性没有保证,这就不能让原子操做背这个锅了。
文档中后面也说到了,虽然原子操做是完整的操做,使用它们能够不用担忧线程干扰,可是有时仍然须要对原子操做进行同步,由于即便是原子操做也避免不了可能出现的内存一致性错误。
Atomic actions cannot be interleaved, so they can be used without fear of thread interference. However, this does not eliminate all need to synchronize atomic actions, because memory consistency errors are still possible.
因此说,使用简单的原子操做访问会比用 synchronized 访问变量会更有效,可是这就须要咱们考虑到内存一致性的问题,所以须要使用哪一种方式访问变量,就要看应用的规模和复杂程度了。
多线程之间确定少不了协同工做,最多见的方式就是使用 Guarded block:
public void guardedJoy() { // Simple loop guard. Wastes // processor time. Don't do this! while(!joy) {} System.out.println("Joy has been achieved!"); }
这就是一个 Guarded block,它会不断地检查 joy 这个值,而 joy 是由另外一个线程所设置的。
不过,像上面这样的循环等待对资源也太浪费了,能够采用另外一种方式:
public synchronized void guardedJoy() { // This guard only loops once for each special event, which may not // be the event we're waiting for. while(!joy) { try { wait(); } catch (InterruptedException e) {} } System.out.println("Joy and efficiency have been achieved!"); }
这里使用到了 Object.wait()
Object (Java Platform SE 8 )
Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. In other words, this method behaves exactly as if it simply performs the call wait(0).
它的做用就是让调用该方法的线程进入 WAITING 状态,直到有其余线程调用该对象的 notify() or notifyAll() 方法
The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution.
调用 wait() 以前须要先获取这个对象的锁,一旦调用以后就会释放对象的锁,而后等待,直到其余线程调用 notify() or notifyAll(),但也不是能让等待的线程立马从 wait() 返回,而是要等到调用 notify() or notifyAll() 的线程释放锁以后,等待线程才有机会从 wait() 返回,这个机会就是:当前线程能够从新获取锁的持有权限,而后才能继续执行。
仍是回到 guardedJoy 那个例子,这个版本的 guardedJoy 变成了 synchronized 的了,为何呢?刚刚介绍 wait() 方法时说到了,一个线程想要调用一个对象的 wait() 方法首先要获取到这个对象的锁,而获取锁的最简单的方式就是在 synchronized 方法里调用 wait() 啦
那么,当 Thread1 调用 wait() 时,就会释放锁而后挂起。当其余线程 Thread2 请求获取这个锁并成功后调用 notifyAll() 通知等待的全部线程,在 Thread2 释放这个锁后,等待的线程 Thread1 再次申请就能够从新得到这个锁,以后就能够从 wait() 方法返回继续执行了。
public synchronized notifyJoy() { joy = true; notifyAll(); }
关于 notify(),这个方法只会唤醒一个线程,而且不容许指定唤醒哪一个线程,这是可能会发生死锁的。以生产者消费者问题为例,假设分别有 2 个consumer 和 producer,缓冲区大小为 1,有可能你唤醒的那个线程可能刚好是相同角色的线程,也就是说如今多是一个 consumer 唤醒了另外一个 consumer,原本 consumer 想要唤醒的是 producer,缓存区仍然为空的,可是 producer 却还在 wait,由于错过了被 consumer 唤醒的机会,从而就会产生死锁。
那么何时可使用 notify() 呢?文档中是这样给的建议:
Guarded Blocks (The Java™ Tutorials >Essential Classes > Concurrency)
Note: There is a second notification method, notify, which wakes up a single thread. Because notify doesn't allow you to specify the thread that is woken up, it is useful only in massively parallel applications — that is, programs with a large number of threads, all doing similar chores. In such an application, you don't care which thread gets woken up.
因此 notify() 通常只在大规模并发应用(即系统有大量类似任务的线程)中使用。由于对于大规模并发应用,咱们其实并不关心哪个线程被唤醒。
看一下官网给的生产者消费者的示例程序:
数据经过 Drop 对象共享信息:
public class Drop { // Message sent from producer // to consumer. private String message; // true 表示为空,须要等待生产数据 // false 表示能够取数据了 private boolean empty = true; public synchronized String take() { // Wait until message is // available. while (empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = true; // Notify producer that // status has changed. notifyAll(); return message; } public synchronized void put(String message) { // Wait until message has // been retrieved. while (!empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = false; // Store message. this.message = message; // Notify consumer that status // has changed. notifyAll(); } }
生产者:
import java.util.Random; public class Producer implements Runnable { private Drop drop; public Producer(Drop drop) { this.drop = drop; } public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; Random random = new Random(); for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); try { // 随机的暂停一段时间,接近实际中状况 Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) {} } drop.put("DONE"); } }
消费者:
import java.util.Random; public class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { Random random = new Random(); for (String message = drop.take(); ! message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) {} } } }
主线程
public class ProducerConsumerExample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
在并发编程中,一种被广泛承认的原则就是:尽量的使用不可变对象来建立简单、可靠的代码。
由于 immutable objects 建立后不能被修改,因此不会出现因为线程干扰产生的错误 or 内存一致性错误。
但有时候会有这种担心,就是使用 immutable objects 每次建立新对象的开销会不会太大,若是不使用 immutable objects,就能够避免建立过多新对象,只须要 update 就能够。
而文档中说到,这种建立对象的开销经常被过度高估,由于使用不可变对象所带来的一些效率提高能够抵消这种开销。
e.g. 使用不可变对象下降了垃圾回收所产生的额外开销,同时也能够减小一些为了维护在并发中的 mutable objects 的代码开销。
看一下举得这个例子:
public class SynchronizedRGB { // Values must be between 0 and 255. private int red; private int green; private int blue; private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public SynchronizedRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public void set(int red, int green, int blue, String name) { check(red, green, blue); synchronized (this) { this.red = red; this.green = green; this.blue = blue; this.name = name; } } public synchronized int getRGB() { return ((red << 16) | (green << 8) | blue); } public synchronized String getName() { return name; } public synchronized void invert() { red = 255 - red; green = 255 - green; blue = 255 - blue; name = "Inverse of " + name; } }
假如如今 线程1 在执行下列代码,已经执行到 Statement 1,而后 线程2 刚好也在执行,可是 线程2 恰恰是执行到 Statement 1 和 2 之间,此时 线程1 若是再继续执行 Statement 2,就会出现 getRGB() 和 getName() 结果不匹配的状况,这就是在并发中很容易出现的问题。
SynchronizedRGB color = new SynchronizedRGB(0, 0, 0, "Pitch Black"); ... int myColorInt = color.getRGB(); //Statement 1 String myColorName = color.getName(); //Statement 2
此时加上了 synchronized,将 Statement 1 和 2 绑定到了一块儿,这样并发就不会出问题了。
synchronized (color) { int myColorInt = color.getRGB(); String myColorName = color.getName(); }
会出现上面这种不匹配的状况,是由于 color 是一个 mutable object,若是它变成 immutable object,就不会出现这种问题了。
对于如何定义 immutable objects,文档给出了一个 strategy
A Strategy for Defining Immutable Objects (The Java™ Tutorials >Essential Classes > Concurrency)
<ol><li>Don't provide "setter" methods methods that modify fields or objects referred to by fields.</li><li>Make all fieldsfinal
andprivate
.</li><li>Don't allow subclasses to override methods. The simplest way to do this is to declare the class asfinal
. A more sophisticated approach is to make the constructorprivate
and construct instances in factory methods.</li><li>If the instance fields include references to mutable objects, don't allow those objects to be changed:<ul><li>Don't provide methods that modify the mutable objects.</li><li>Don't share references to the mutable objects. Never store references to external, mutable objects passed to the constructor; if necessary, create copies, and store references to the copies. Similarly, create copies of your internal mutable objects when necessary to avoid returning the originals in your methods.</li></ul></li></ol>
最后一点,关于对 mutable objects 的引用:若是一个对外部可变对象的引用须要被传到构造函数中,必定不要保存这个引用,若是必需要保存,就作一个该对象的 copy,而后保存这个 copy。相似的,若是是引用内部的可变对象,必要时也要建立内部可变对象的 copy,以免在方法中返回原对象引用。
这一点是很容易被忽略的,在 Core Java 中也提到了相似的状况,做者的建议也是作 copy,不要直接引用原对象,由于很难知道这个对象引用在其余地方是否是会被改变。
根据上面的 strategy,从新定义 RGB:
final public class ImmutableRGB { // Values must be between 0 and 255. final private int red; final private int green; final private int blue; final private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public ImmutableRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public int getRGB() { return ((red << 16) | (green << 8) | blue); } public String getName() { return name; } public ImmutableRGB invert() { return new ImmutableRGB(255 - red, 255 - green, 255 - blue, "Inverse of " + name); } }
在 liveness 那一节中有举了一个 Alphonse 和 Gaston 鞠躬产生死锁的例子,这里产生死锁的缘由在于可能 线程1 进入 bow,线程2 进入 bowBack,当 线程1 进入 bow 里的 bowBack 时,线程2 刚好也正在进入 bowBack,两者都在等彼此退出,可是却又永远不会退出,由于彼此在循环等待,会一直阻塞在这里,从而产生死锁(循环等待、不可剥夺、独自占有、保持请求)。
public class Deadlock { static class Friend { private final String name; public Friend(String name) { this.name = name; } public String getName() { return this.name; } public synchronized void bow(Friend bower) { System.out.format("%s: %s" + " has bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } public synchronized void bowBack(Friend bower) { System.out.format("%s: %s" + " has bowed back to me!%n", this.name, bower.getName()); } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new Runnable() { public void run() { alphonse.bow(gaston); } }).start(); new Thread(new Runnable() { public void run() { gaston.bow(alphonse); } }).start(); } }
在这一节做者用 lock objects 来解决这个问题
先简单了解一下 Lock 这个接口:
Lock Objects (The Java™ Tutorials >Essential Classes > Concurrency)
Lock objects work very much like the implicit locks used by synchronized code. As with implicit locks, only one thread can own a Lock object at a time. Lock objects also support a wait/notify mechanism, through their associated Condition objects.
Lock 很像同步代码使用的内置锁,在同一时刻只有一个线程能够得到 Lock 对象。经过关联 Condition 对象,Lock 对象也支持 wait/notify 机制。
关于 Condition:
Condition (Java Platform SE 8 )
Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.
在 Lock 取代了 synchronized 方法和语句的地方,Condition 相应地取代了 Object 监视器方法(wait, notify and notifyAll)的使用。
看一下 tryLock() 这个方法:
Lock (Java Platform SE 8 )
<!-- --><ul class="blockList"><li class="blockList"><pre>boolean tryLock()</pre><div class="block">Acquires the lock only if it is free at the time of invocation.<p>Acquires the lock if it is available and returns immediately
with the valuetrue
.
If the lock is not available then this method will return
immediately with the valuefalse
.<p>A typical usage idiom for this method would be:<pre>
Lock lock = ...;
if (lock.tryLock()) {
try {
// manipulate protected state
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}</pre>
This usage ensures that the lock is unlocked if it was acquired, and
doesn't try to unlock if the lock was not acquired.</div>
<dl><dt><span class="returnLabel">Returns:</span></dt><dd>true
if the lock was acquired andfalse
otherwise</dd></dl></li></ul>
它会尝试获取锁,若是成功,马上返回 true,若是失败也会马上返回 false,API 文档中举得那个例子就是一个典型的用法,这种使用 tryLock() 的方式能够确保在获取锁后必定会释放锁,由于 Lock 和 synchronized 有一个不同的地方在于 synchronized 会自动释放,而 Lock 不会,因此必定要保证手动释放 Lock 锁,而且这种方式也能够保证若是没获取锁的状况不会 unlock()。
下面看怎么用 Lock 解决鞠躬的死锁问题:
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.Random; public class Safelock { // 让 Friend 做为一个静态内部类,由于不须要引用外部变量 static class Friend { private final String name; // 这里使用的是可重入锁 private final Lock lock = new ReentrantLock(); public Friend(String name) { this.name = name; } public String getName() { return this.name; } // 即将到来的鞠躬 public boolean impendingBow(Friend bower) { Boolean myLock = false; Boolean yourLock = false; try { myLock = lock.tryLock(); yourLock = bower.lock.tryLock(); } finally { // 若是都获取到了锁 or 都没获取到,就不会释放锁 // 若是只有一个获取到了,须要释放 if (! (myLock && yourLock)) { if (myLock) { lock.unlock(); } if (yourLock) { bower.lock.unlock(); } } } // 只有两个锁都获取到时才会返回 true return myLock && yourLock; } public void bow(Friend bower) { // 若是都获取到了锁 if (impendingBow(bower)) { try { System.out.format("%s: %s has" + " bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } finally { lock.unlock(); bower.lock.unlock(); } } else { // 若是只获取到了一个锁,说明其余线程抢占了另外一个锁 // 在这个情景中就是 Alphonse 鞠躬以前发如今 Gaston 正要鞠躬 // 所以本身就不鞠躬了,从而避免了死锁 System.out.format("%s: %s started" + " to bow to me, but saw that" + " I was already bowing to" + " him.%n", this.name, bower.getName()); } } public void bowBack(Friend bower) { System.out.format("%s: %s has" + " bowed back to me!%n", this.name, bower.getName()); } } // 一样地,也是将 BowLoop 做为静态内部类 static class BowLoop implements Runnable { private Friend bower; private Friend bowee; public BowLoop(Friend bower, Friend bowee) { this.bower = bower; this.bowee = bowee; } public void run() { Random random = new Random(); for (;;) { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) {} bowee.bow(bower); } } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new BowLoop(alphonse, gaston)).start(); new Thread(new BowLoop(gaston, alphonse)).start(); } }
文档教程只是一些比较简单的介绍,先过一遍了。
concurrent 包中主要有三个 Executor 接口
运行新任务。
只有一个 execute() 方法,用来建立线程。可是这个方法没有定义具体的实现方式,因此对于不一样的 Executor 的实现,有不一样的建立方式。
Executor (Java Platform SE 8 )
<!-- --><ul class="blockListLast"><li class="blockList"><h4>execute</h4><pre>void execute( Runnable command)</pre><div class="block">Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread, or in the calling
thread, at the discretion of theExecutor
implementation.</div><dl><dt><span class="paramLabel">Parameters:</span></dt><dd>command
- the runnable task</dd><dt><span class="throwsLabel">Throws:</span></dt><dd>RejectedExecutionException
- if this task cannot be
accepted for execution</dd><dd>NullPointerException
- if command is null</dd></dl></li></ul></li></ul></li></ul></div></div>
execute() 接受一个 Runnable 对象。
ExecutorService 除了提供 execute() 方法还提供了 submit() 方法,这个方法不只能够接受 Runnable 对象还能够接受 Callable 对象。Callable 对象可使任务返还执行的结果
Callable (Java Platform SE 8 )
<!-- --><ul class="blockListLast"><li class="blockList"><h4>call</h4><pre>
V call() throws
Exception</pre><div class="block">Computes a result, or throws an exception if unable to do so.</div><dl><dt><span class="returnLabel">Returns:</span></dt><dd>computed result</dd><dt><span class="throwsLabel">Throws:</span></dt><dd>
Exception
- if unable to compute a result</dd></dl></li></ul></li></ul></li></ul></div></div>
call() 方法会返回计算的结果。
经过 submit() 方法返回的 Future 对象能够读取 Callable 任务的执行结果,或是管理 Callable 任务和 Runnable 任务的状态。
关于 Future 这个接口,它是表示异步计算的结果,提供的方法是用来 1. 检查计算是否完成 2. 是否等待计算完成 3. 是否查找计算的结果
简单使用的例子:
interface ArchiveSearcher { String search (String target); } class App { ExecutorService executor = ... ArchiveSearcher searcher = ... void showSearch(final String target) throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call() { return searcher.search(target); } }); displayOtherThings(); // do other things while searching try { // 计算完成时才能用 get() 获取结果,若是没有计算完成会阻塞着 displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
submit 里的 Callable 对象也能够替换成以下代码:
FutureTask<String> future = new FutureTask<String>(new Callable<String>() { public String call() { return searcher.search(target); }}); executor.execute(future);
由于 FutureTask 是实现了 Runnable 接口的 Future 的实现,因此能够由 Executor 来执行。
public class FutureTask<V> extends Object implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
ExecutorService 也提供了批量运行 Callable 任务的方法。最后,ExecutorService 还提供了一些关闭执行器的方法。若是须要支持即时关闭,执行器所执行的任务须要正确处理中断。
扩展了 ExecutorService 接口,添加了 schedule 方法。
public interface ScheduledExecutorService extends ExecutorService
经过 schedule 方法可让命令在给定延迟的时间以后执行或者按期执行。
这里只是对线程池的一个简单的介绍。
大多数 concurrent 包里的 executor 的实现都使用了线程池(由 worker 线程组成),worker 线程独立于它所执行的 Runnable 任务和 Callable 任务,而且常常用来执行多个任务。
使用 worker 线程可使建立线程的开销最小化。在大规模并发应用中,建立大量的Thread对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。
一种最多见的线程池是 fixed thread pool(固定大小的线程池)。这种线程池始终有必定数量的线程在运行,若是一个线程因为某种缘由终止运行了,线程池会自动建立一个新的线程来代替它。须要执行的任务经过一个内部队列提交给线程,当没有更多的工做线程能够用来执行任务时,队列保存额外的任务。 使用 fixed thread pool 的一个很重要的好处是能够 degrade gracefully。
好比一个 Web 服务器,每个 HTTP 请求都是由一个单独的线程来处理,不可能为每个 HTTP 请求都建立一个新线程,这样的话当系统的开销超出其能力时,会忽然地对全部请求都中止响应。若是限制 Web 服务器能够建立的线程数量,那么它就没必要当即处理全部收到的请求,而是在有能力处理请求时才处理。
建立一个使用 fixed thread pool 的 executor 的最简单的方法是调用 java.util.concurrent.Executors 的 newFixedThreadPool 工厂方法。
Executors 还提供了下面的工厂方法:
还有其余的,如 ThreadPoolExecutor or ScheduledThreadPoolExecutor
Basic use:
if (my portion of the work is small enough) do the work directly else split my work into two pieces invoke the two pieces and wait for the results
思想有点像分而治之的感受,主要是能够充分利用多处理器系统的并行处理能力。
文档中举了一个图片模糊处理的例子:
假设你想要模糊一张图片。原始的 source 图片由一个整数的数组表示,每一个整数表示一个像素点的颜色数值。与 source 图片相同,模糊以后的 destination 图片也由一个整数数组表示。 对图片的模糊操做是经过对 source 数组中的每个像素点进行处理完成的。
处理的过程:将每一个像素点的色值取出,与周围像素的色值(红、黄、蓝)放在一块儿取平均值,获得的结果被放入 destination 数组。
由于一张图片会由一个很大的数组来表示,因此处理图片过程可能会很耗时,可是若是使用 fork/join 框架来完成,就能够充分利用多处理器系统的并行处理能力,加快处理速度。
public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; // Processing window size; should be odd. private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Reassemble destination pixel. int dpixel = (0xff000000 ) | (((int)rt) << 16) | (((int)gt) << 8) | (((int)bt) << 0); mDestination[index] = dpixel; } } ...
如今实现抽象方法 compute(),在处理时,能够直接计算,也能够将其分开计算,取决于一个阈值,这个阈值能够简单地用数组的长度来表明。
protected static int sThreshold = 100000; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); }
由于 fork/join 的核心就是 ForkJoinPool 类,ForkJoinPool 能够执行 ForkJoinTask 任务,而 RecursiveAction 继承了 ForkJoinTask。因此上面这个方法若是是在 RecursiveAction 类中,就能够在 ForkJoinPool 中设置任务并令其执行。
// source image pixels are in src // destination image pixels are in dst ForkBlur fb = new ForkBlur(src, 0, src.length, dst); // Create the ForkJoinPool that will run the task. ForkJoinPool pool = new ForkJoinPool(); // Run the task. pool.invoke(fb);