Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包。这个包包含有一系列可以让 Java 的并发编程变得更加简单轻松的类。在这个包被添加之前,你须要本身去动手实现本身的相关工具类。
本文我将带你一一认识 java.util.concurrent 包里的这些类,而后你能够尝试着如何在项目中使用它们。
我不会去解释关于 Java 并发的核心问题 - 其背后的原理,也就是说,若是你对那些东西感兴趣,参考《Java 并发指南》。html
java.util.concurrent 包里的 BlockingQueue 接口表示一个线程放入和提取实例的队列。本小节我将给你演示如何使用这个 BlockingQueue。
本节不会讨论如何在 Java 中实现一个你本身的 BlockingQueue。若是你对那个感兴趣,参考《Java 并发指南》java
BlockingQueue 一般用于一个线程生产对象,而另一个线程消费这些对象的场景。下图是对这个原理的阐述:编程
一个线程往里边放,另一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。若是该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。若是消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。数组
BlockingQueue 具备 4 组不一样的方法用于插入、移除以及对队列中的元素进行检查。若是请求的操做不能获得当即执行的话,每一个方法的表现也不一样。这些方法以下:数据结构
四组不一样的行为方式解释:并发
没法向一个 BlockingQueue 中插入 null。若是你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。
能够访问到 BlockingQueue 中的全部元素,而不只仅是开始和结束的元素。好比说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你能够调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。可是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其余对象的效率不会过高),所以你尽可能不要用这一类的方法,除非你确实不得不那么作。工具
BlockingQueue 是个接口,你须要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具备如下 BlockingQueue 接口的实现(Java 6):性能
这里是一个 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 实现。
首先,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 一个 Consumer。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。this
public class BlockingQueueExample { public static void main(String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue(1024); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); Thread.sleep(4000); } }
如下是 Producer 类。注意它在每次 put() 调用时是如何休眠一秒钟的。这将致使 Consumer 在等待队列中对象的时候发生阻塞。spa
public class Producer implements Runnable{ protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { queue.put("1"); Thread.sleep(1000); queue.put("2"); Thread.sleep(1000); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } } }
如下是 Consumer 类。它只是把对象从队列中抽取出来,而后将它们打印到 System.out。
public class Consumer implements Runnable{ protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }
ArrayBlockingQueue 类实现了 BlockingQueue 接口。
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不可以存储无限多数量的元素。它有一个同一时间可以存储元素数量的上限。你能够在对其初始化的时候设定这个上限,但以后就没法对这个上限进行修改了(译者注:由于它是基于数组实现的,也就具备数组的特性:一旦初始化,大小就没法修改)。
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在全部元素之中是放入时间最久的那个,而尾元素则是最短的那个。
如下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024); queue.put("1"); String string = queue.take();
如下是使用了 Java 泛型的一个 BlockingQueue 示例。注意其中是如何对 String 元素放入和提取的:
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024); queue.put("1"); String string = queue.take();
DelayQueue 实现了 BlockingQueue 接口。
DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,该接口定义:
public interface Delayed extends Comparable<Delayed< { public long getDelay(TimeUnit timeUnit); }
DelayQueue 将会在每一个元素的 getDelay() 方法返回的值的时间段以后才释放掉该元素。若是返回的是 0 或者负值,延迟将被认为过时,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。
传递给 getDelay 方法的 getDelay 实例是一个枚举类型,它代表了将要延迟的时间段。TimeUnit 枚举将会取如下值:
DAYS HOURS MINUTES SECONDS MILLISECONDS MICROSECONDS NANOSECONDS
正如你所看到的,Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间能够进行对比。这个可能在对 DelayQueue 队列中的元素进行排序时有用,所以它们能够根据过时时间进行有序释放。如下是使用 DelayQueue 的例子:
public class DelayQueueExample { public static void main(String[] args) { DelayQueue queue = new DelayQueue(); Delayed element1 = new DelayedElement(); queue.put(element1); Delayed element2 = queue.take(); } }
DelayedElement 是我所建立的一个 DelayedElement 接口的实现类,它不在 java.util.concurrent 包里。你须要自行建立你本身的 Delayed 接口的实现以使用 DelayQueue 类。
LinkedBlockingQueue 类实现了 BlockingQueue 接口。
LinkedBlockingQueue 内部以一个链式结构(连接节点)对其元素进行存储。若是须要的话,这一链式结构能够选择一个上限。若是没有定义上限,将使用 Integer.MAX_VALUE 做为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在全部元素之中是放入时间最久的那个,而尾元素则是最短的那个。
如下是 LinkedBlockingQueue 的初始化和使用示例代码:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>(); BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024); bounded.put("Value"); String value = bounded.take();
PriorityBlockingQueue 类实现了 BlockingQueue 接口。
PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 同样的排序规则。你没法向这个队列中插入 null 值。
全部插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。所以该队列中元素的排序就取决于你本身的 Comparable 实现。
注意 PriorityBlockingQueue 对于具备相等优先级(compare() == 0)的元素并不强制任何特定行为。
同时注意,若是你从一个 PriorityBlockingQueue 得到一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。
如下是使用 PriorityBlockingQueue 的示例:
BlockingQueue queue = new PriorityBlockingQueue(); //String implements java.lang.Comparable queue.put("Value"); String value = queue.take();
SynchronousQueue 类实现了 BlockingQueue 接口。
SynchronousQueue 是一个特殊的队列,它的内部同时只可以容纳单个元素。若是该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另外一个线程将该元素从队列中抽走。一样,若是该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另外一个线程向队列中插入了一条新的元素。据此,把这个类称做一个队列显然是夸大其词了。它更多像是一个汇合点。
java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。本小节我将给你演示如何使用 BlockingDeque。
BlockingDeque 类是一个双端队列,在不可以插入元素时,它将阻塞住试图插入元素的线程;在不可以抽取元素时,它将阻塞住试图抽取的线程。
deque(双端队列) 是 "Double Ended Queue" 的缩写。所以,双端队列是一个你能够从任意一端插入或者抽取元素的队列。
在线程既是一个队列的生产者又是这个队列的消费者的时候可使用到 BlockingDeque。若是生产者线程须要在队列的两端均可以插入数据,消费者线程须要在队列的两端均可以移除数据,这个时候也可使用 BlockingDeque。BlockingDeque 图解:
一个 BlockingDeque - 线程在双端队列的两端均可以插入和提取元素。
一个线程生产元素,并把它们插入到队列的任意一端。若是双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。若是双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。
BlockingDeque 具备 4 组不一样的方法用于插入、移除以及对双端队列中的元素进行检查。若是请求的操做不能获得当即执行的话,每一个方法的表现也不一样。这些方法以下
四组不一样的行为方式解释:
既然 BlockingDeque 是一个接口,那么你想要使用它的话就得使用它的众多的实现类的其中一个。java.util.concurrent 包提供了如下 BlockingDeque 接口的实现类:
如下是如何使用 BlockingDeque 方法的一个简短代码示例:
BlockingDeque<String> deque = new LinkedBlockingDeque<String>(); deque.addFirst("1"); deque.addLast("2"); String two = deque.takeLast(); String one = deque.takeFirst();
LinkedBlockingDeque 类实现了 BlockingDeque 接口。
deque(双端队列) 是 "Double Ended Queue" 的缩写。所以,双端队列是一个你能够从任意一端插入或者抽取元素的队列。(译者注:唐僧啊,受不了。)
LinkedBlockingDeque 是一个双端队列,在它为空的时候,一个试图从中抽取数据的线程将会阻塞,不管该线程是试图从哪一端抽取数据。
java.util.concurrent.ConcurrentMap 接口表示了一个可以对别人的访问(插入和提取)进行并发处理的 java.util.Map。
ConcurrentMap 除了从其父接口 java.util.Map 继承来的方法以外还有一些额外的原子性方法。
既然 ConcurrentMap 是个接口,你想要使用它的话就得使用它的实现类之一。java.util.concurrent 包具有 ConcurrentMap 接口的如下实现类:
ConcurrentHashMap 和 java.util.HashTable 类很类似,但 ConcurrentHashMap 可以提供比 HashTable 更好的并发性能。在你从中读取对象的时候 ConcurrentHashMap 并不会把整个 Map 锁住。此外,在你向其中写入对象的时候,ConcurrentHashMap 也不会锁住整个 Map。它的内部只是把 Map 中正在被写入的部分进行锁定。
另一个不一样点是,在被遍历的时候,即便是 ConcurrentHashMap 被改动,它也不会抛 ConcurrentModificationException。尽管 Iterator 的设计不是为多个线程的同时使用。
更多关于 ConcurrentMap 和 ConcurrentHashMap 的细节请参考官方文档。
java.util.concurrent.ConcurrentNavigableMap 是一个支持并发访问的 java.util.NavigableMap,它还能让它的子 map 具有并发访问的能力。所谓的 "子 map" 指的是诸如 headMap(),subMap(),tailMap() 之类的方法返回的 map。
NavigableMap 中的方法再也不赘述,本小节咱们来看一下 ConcurrentNavigableMap 添加的方法。
headMap(T toKey) 方法返回一个包含了小于给定 toKey 的 key 的子 map。
若是你对原始 map 里的元素作了改动,这些改动将影响到子 map 中的元素(译者注:map 集合持有的其实只是对象的引用)。
如下示例演示了对 headMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap headMap = map.headMap("2");
headMap 将指向一个只含有键 "1" 的 ConcurrentNavigableMap,由于只有这一个键小于 "2"。关于这个方法及其重载版本具体是怎么工做的细节请参考 Java 文档。
tailMap(T fromKey) 方法返回一个包含了不小于给定 fromKey 的 key 的子 map。
若是你对原始 map 里的元素作了改动,这些改动将影响到子 map 中的元素(译者注:map 集合持有的其实只是对象的引用)。
如下示例演示了对 tailMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap tailMap = map.tailMap("2");
subMap() 方法返回原始 map 中,键介于 from(包含) 和 to (不包含) 之间的子 map。示例以下:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap subMap = map.subMap("2", "3");
返回的 submap 只包含键 "2",由于只有它知足不小于 "2",比 "3" 小。
java.util.concurrent.CountDownLatch 是一个并发构造,它容许一个或多个线程等待一系列指定操做的完成。
CountDownLatch 以一个给定的数量初始化。countDown() 每被调用一次,这一数量就减一。经过调用 await() 方法之一,线程能够阻塞等待这一数量到达零。
如下是一个简单示例。Decrementer 三次调用 countDown() 以后,等待中的 Waiter 才会从 await() 调用中释放出来。
CountDownLatch latch = new CountDownLatch(3); Waiter waiter = new Waiter(latch); Decrementer decrementer = new Decrementer(latch); new Thread(waiter) .start(); new Thread(decrementer).start(); Thread.sleep(4000); public class Waiter implements Runnable{ CountDownLatch latch = null; public Waiter(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); } } public class Decrementer implements Runnable { CountDownLatch latch = null; public Decrementer(CountDownLatch latch) { this.latch = latch; } public void run() { try { Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }