前面几篇文章看到了造成Java并发程序设计基础的底层构建块。然而,实际编程中,应该尽量远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便的多,并且也安全的多。 java
对于多线程问题,能够经过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出它们。使用队列,能够安全地从一个线程向另外一个线程传递数据。例如,考虑银行转账程序,转帐线程将转帐指令对象插入一个队列中,而不是直接访问银行对象。另外一个线程从队列中取出指令执行转帐。只有该线程能够访问该银行对象的内部,所以不须要同步(锁和条件是线程安全的队列类的实现者须要考虑的)。 编程
当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blocking queue)致使线程阻塞。在协调多个线程之间的合做时,阻塞队列是一个有用的工具。工做者线程能够周期性地将中间结果存储在阻塞队列中。其它的工做者线程移出中间结果并进一步加以修改。队列会自动地平衡负载。若是第一个线程集运行的比第二个慢,第二个线程集在等待结果时会阻塞。若是第一个线程集运行的快,他将等待第二个队列集遇上来。 安全
阻塞队列的方法: 多线程
方法 并发 |
正常动做 ide |
特殊状况下的动做 工具 |
add 性能 |
添加一个元素 this |
若是队列满,则抛出IllegalStateException异常 spa |
element |
返回队列的头元素 |
若是队列空,抛出NoSuchElementException异常 |
offer |
添加一个元素并返回true |
若是队列满,返回false |
peek |
返回队列的头元素 |
若是队列空,则返回null |
poll |
移出并返回队列的头元素 |
若是队列空,则返回null |
put |
添加一个元素 |
若是队列满,则阻塞 |
remove |
移出并返回头元素 |
若是队列空,则抛出NoSuchElementException异常 |
take |
移出并返回头元素 |
若是队列空,则阻塞 |
如上所述,阻塞队列方法分为3类,这取决于当队列满或空时它们的响应方式。若是将队列看成线程管理工具来使用,将要用到put和take方法。当试图向满的队列中添加或从空的队列中移出元素时,add、remove和element操做抛出异常。固然,在一个多线程程序中,队列会在任什么时候刻满或空,所以,必定要使用offer、poll和peek方法做为替代。这些方法若是不能完成任务,只是给出一个错误提示而不会抛出异常。须要注意的一点是,poll和peek方法返回null来指示失败,所以,向这些队列中插入null值是非法的。
offer和poll方法还有带有超时的方法变体。例如,下面的调用:
boolean success = q.offer(x,100,TimeUtil.MILLISECONDS);
尝试在100毫秒的时间内在队列的尾部插入一个元素。若是成功返回true,不然,达到超时时,返回false。相似地,下面的调用:
Object head = q.poll(100,TimeUnit.MILLISECONDS);
尝试用100毫秒的时间移出队列的头元素,若是成功返回头元素,不然,达到超时时,返回null。
java.util.concurrent包提供了阻塞队列的几个变种。默认状况下,LinkedBlockingQueue的容量是没有上边界的,可是,也能够选择指定最大容量。LinkedBlockingDeque是一个双端的版本。ArrayBlockingQueue在构造时须要指定容量,而且有一个可选的参数来指定是否须要公平性。若设置了公平参数,则那么等待了最长时间的线程会优先获得处理。一般,公平性会下降性能,只有在确实很是须要时才使用它。
PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出。该队列是没有容量上限的,可是,若是队列是空的,取元素的操做会阻塞。
最后,DelayQueue包含实现Delayed接口的对象:
interface Delayed extends Comparable<Delayed>{ long getDelay(TimeUnit unit); }
getDelay方法返回对象的残留延迟。负值表示延迟已经结束。元素只有在延迟用完的状况下才能从DelayQueue移除。还必须实现compareTo方法,DelayQueue使用该方法对元素进行排序。
下面是一个使用阻塞队列来控制线程集的例子,在一个目录及它的全部子目录下搜索全部文件,打印出包含指定关键字的行。
public class BlockingQueueTest { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub Scanner in = new Scanner(System.in); System.out.print("Enter base directory(e.g. /usr/local/jdk1.6.0/src):"); String directory = in.nextLine(); System.out.print("Enter keyword (e.g. volatile):"); String keyword = in.nextLine(); final int FILE_QUEUE_SIZE = 10; final int SEARCH_THREADS = 100; BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE); FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory)); new Thread(enumerator).start(); for (int i = 1; i <= SEARCH_THREADS; i++) { new Thread(new SearchTask(queue, keyword)).start(); } } }
public class FileEnumerationTask implements Runnable { private BlockingQueue<File> queue; private File startingDirectory; public static File DUMMY = new File(""); public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory){ this.queue = queue; this.startingDirectory = startingDirectory; } @Override public void run() { try { enumerate(startingDirectory); queue.put(DUMMY); } catch (InterruptedException e) { e.printStackTrace(); } } public void enumerate(File directory) throws InterruptedException{ File[] files = directory.listFiles(); for(File file : files){ if (file.isDirectory()) { enumerate(file); }else { queue.put(file); } } } }
public class SearchTask implements Runnable { private BlockingQueue<File> queue; private String keyword; public SearchTask(BlockingQueue<File> queue,String keyword){ this.queue = queue; this.keyword = keyword; } @Override public void run() { try { boolean done = false; while (!done) { File file = queue.take(); if (file == FileEnumerationTask.DUMMY) { queue.put(file); done = true; }else { search(file); } } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void search(File file) throws FileNotFoundException { // TODO Auto-generated method stub Scanner in = new Scanner(new FileInputStream(file)); int lineNumber = 0; while (in.hasNextLine()) { lineNumber++; String line = in.nextLine(); if(line.contains(keyword)) System.out.printf("%s:%d:%s%n",file.getPath(),lineNumber,line); } in.close(); } }