阻塞队列

       前面几篇文章看到了造成Java并发程序设计基础的底层构建块。然而,实际编程中,应该尽量远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便的多,并且也安全的多。 java

    对于多线程问题,能够经过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出它们。使用队列,能够安全地从一个线程向另外一个线程传递数据。例如,考虑银行转账程序,转帐线程将转帐指令对象插入一个队列中,而不是直接访问银行对象。另外一个线程从队列中取出指令执行转帐。只有该线程能够访问该银行对象的内部,所以不须要同步(锁和条件是线程安全的队列类的实现者须要考虑的)。 编程

        当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blocking queue)致使线程阻塞。在协调多个线程之间的合做时,阻塞队列是一个有用的工具。工做者线程能够周期性地将中间结果存储在阻塞队列中。其它的工做者线程移出中间结果并进一步加以修改。队列会自动地平衡负载。若是第一个线程集运行的比第二个慢,第二个线程集在等待结果时会阻塞。若是第一个线程集运行的快,他将等待第二个队列集遇上来。 安全

    阻塞队列的方法: 多线程

 方法 并发

正常动做 ide

特殊状况下的动做 工具

add 性能

添加一个元素 this

若是队列满,则抛出IllegalStateException异常 spa

element

返回队列的头元素

若是队列空,抛出NoSuchElementException异常

offer

添加一个元素并返回true

若是队列满,返回false

peek

返回队列的头元素

若是队列空,则返回null

poll

移出并返回队列的头元素

若是队列空,则返回null

put

添加一个元素

若是队列满,则阻塞

remove

移出并返回头元素

若是队列空,则抛出NoSuchElementException异常

take

移出并返回头元素

若是队列空,则阻塞

        如上所述,阻塞队列方法分为3类,这取决于当队列满或空时它们的响应方式。若是将队列看成线程管理工具来使用,将要用到put和take方法。当试图向满的队列中添加或从空的队列中移出元素时,add、remove和element操做抛出异常。固然,在一个多线程程序中,队列会在任什么时候刻满或空,所以,必定要使用offer、poll和peek方法做为替代。这些方法若是不能完成任务,只是给出一个错误提示而不会抛出异常。须要注意的一点是,poll和peek方法返回null来指示失败,所以,向这些队列中插入null值是非法的。

        offer和poll方法还有带有超时的方法变体。例如,下面的调用:


    boolean success = q.offer(x,100,TimeUtil.MILLISECONDS);


尝试在100毫秒的时间内在队列的尾部插入一个元素。若是成功返回true,不然,达到超时时,返回false。相似地,下面的调用:


Object head = q.poll(100,TimeUnit.MILLISECONDS);


尝试用100毫秒的时间移出队列的头元素,若是成功返回头元素,不然,达到超时时,返回null。

        java.util.concurrent包提供了阻塞队列的几个变种。默认状况下,LinkedBlockingQueue的容量是没有上边界的,可是,也能够选择指定最大容量。LinkedBlockingDeque是一个双端的版本。ArrayBlockingQueue在构造时须要指定容量,而且有一个可选的参数来指定是否须要公平性。若设置了公平参数,则那么等待了最长时间的线程会优先获得处理。一般,公平性会下降性能,只有在确实很是须要时才使用它。

        PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出。该队列是没有容量上限的,可是,若是队列是空的,取元素的操做会阻塞。

        最后,DelayQueue包含实现Delayed接口的对象:


interface Delayed extends Comparable<Delayed>{
    long getDelay(TimeUnit unit);
}


        getDelay方法返回对象的残留延迟。负值表示延迟已经结束。元素只有在延迟用完的状况下才能从DelayQueue移除。还必须实现compareTo方法,DelayQueue使用该方法对元素进行排序。

    下面是一个使用阻塞队列来控制线程集的例子,在一个目录及它的全部子目录下搜索全部文件,打印出包含指定关键字的行。


public class BlockingQueueTest {
    /**
     * @param args
     */
    public static void main(String[] args) {
	// TODO Auto-generated method stub
	Scanner in = new Scanner(System.in);
	System.out.print("Enter base directory(e.g. /usr/local/jdk1.6.0/src):");
	String directory = in.nextLine();
	System.out.print("Enter keyword (e.g. volatile):");
	String keyword = in.nextLine();
		
	final int FILE_QUEUE_SIZE = 10;
	final int SEARCH_THREADS = 100;
	BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
		
	FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
	new Thread(enumerator).start();
	for (int i = 1; i <= SEARCH_THREADS; i++) {
            new Thread(new SearchTask(queue, keyword)).start();
	}
    }
}


public class FileEnumerationTask implements Runnable {
    private BlockingQueue<File> queue;
    private File startingDirectory;
    public static File DUMMY = new File("");
	
    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory){
	this.queue = queue;
	this.startingDirectory = startingDirectory;
    }
	
    @Override
    public void run() {
        try {
	    enumerate(startingDirectory);
	    queue.put(DUMMY);
	} catch (InterruptedException e) {
	    e.printStackTrace();
	}
    }
	
    public void enumerate(File directory) throws InterruptedException{
        File[] files = directory.listFiles();
        for(File file : files){
            if (file.isDirectory()) {
                enumerate(file);
            }else {
                queue.put(file);
            }
        }
    }
}
public class SearchTask implements Runnable {
    private BlockingQueue<File> queue;
    private String keyword;
	
    public SearchTask(BlockingQueue<File> queue,String keyword){
	this.queue = queue;
	this.keyword = keyword;
    }

    @Override
    public void run() {
        try {
	    boolean done = false;
	    while (!done) {
	        File file = queue.take();
		if (file == FileEnumerationTask.DUMMY) {
		    queue.put(file);
		    done = true;
		}else {
		    search(file);
		}
            }
	} catch (InterruptedException e) {
	    // TODO Auto-generated catch block
	    e.printStackTrace();
	} catch (FileNotFoundException e) {
	    // TODO Auto-generated catch block
	    e.printStackTrace();
	}
    }

    private void search(File file) throws FileNotFoundException {
        // TODO Auto-generated method stub
	Scanner in = new Scanner(new FileInputStream(file));
	int lineNumber = 0;
	while (in.hasNextLine()) {
	    lineNumber++;
	    String line = in.nextLine();
	    if(line.contains(keyword))
	        System.out.printf("%s:%d:%s%n",file.getPath(),lineNumber,line);
        }
	in.close();
    }
}
相关文章
相关标签/搜索