JAVA线程12 - 新特性:有返回值的线程

1、概述

在Java5以前,线程是没有返回值的,要实现子线程完成任务后返回值给主线程须要借助第三方转存。

在JAVA5开始,有返回值的任务能够利用Callable接口来实现。
 
执行Callable任务后,能够获取一个Future的对象,在该对象上调用get就能够获取到Callable任务返回的Object了。

2、示例

import java.util.concurrent.*; 

public class CallableDemo { 
    public static void main(String[] args) throws ExecutionException, InterruptedException { 
        //建立一个线程池 
        ExecutorService pool = Executors.newSingleThreadExecutor(); 
        //建立有返回值的任务 
        Callable callable = new MyCallable("zhangsan"); 
        //执行任务并获取Future对象 
        Future future = pool.submit(callable); 
        //从Future对象上获取任务的返回值,并输出到控制台 
        System.out.println("等待结果...");
        System.out.println("获得结果:"+future.get().toString()); 
        //关闭线程池 
        pool.shutdown(); 
    } 
} 

class MyCallable implements Callable{ 
    private String name; 

    MyCallable(String name) { 
        this.name = name; 
    } 

    @Override 
    public Object call() throws Exception {
        Thread.sleep(2000);
        return name + "返回的内容"; 
    } 
}

3、CompletionService

1. 简介

CompletionService用于提交一组Callable任务,其take()方法返回已完成的一个Callable任务对应的Feture对象。 java

CompletionService采起的是BlockingQueue<Future<V>>无界队列来管理Future。则有一个线程执行完毕把返回结果放到BlockingQueue<Future<V>>里面。就能够经过completionServcie.take().get()取出结果,同时还有一个poll()方法,这两个方法的区别以下:
take 获取并移除表示下一个已完成任务的 Future,若是目前不存在这样的任务,则等待。(若是须要用到返回值建议用take)
poll 获取并移除表示下一个已完成任务的 Future,若是不存在这样的任务,则返回null。

2. CompletionService解决的问题

当向Executor提交批处理任务时,而且但愿在它们完成后得到结果,若是用FutureTask,你能够循环获取task,并用future.get()去获取结果,可是若是这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在不少场合,其实你拿第一个任务结果时,此时结果并无生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种状况用future task不合适的,效率也不高。

3. 本身维护list<Future>和CompletionService的区别

从list中遍历的每一个Future对象并不必定处于完成状态,这时调用get()方法就会被阻塞住,若是系统是设计成每一个线程完成后就能根据其结果继续作后面的事,这样对于处于list后面的可是先完成的线程就会增长了额外的等待时间。
而CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,若是Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。

4. 示例

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletionServiceDemo {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionServcie = new ExecutorCompletionService<Integer>(pool);
        try {
            for (int i = 0; i < 10; i++) {
                completionServcie.submit(new MyCallable(i));
            }
            for (int i = 0; i < 10; i++) {
                // take 方法等待下一个结果并返回 Future 对象。
                // poll 不等待,有结果就返回一个 Future 对象,不然返回 null。
                System.out.println(completionServcie.take().get());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
}

class MyCallable implements Callable<Integer>{ 
    private int i;

    Task(int i) {
        this.i = i;
    }

    @Override 
    public Object call() throws Exception {
        Thread.sleep(new Random().nextInt(5000));
        System.out.println(Thread.currentThread().getName() + "   " + i);
        return i;
    } 
}


4、参考资料

http://my.oschina.net/jielucky/blog/158839
相关文章
相关标签/搜索