在执行器中分离任务的启动与结果的处理

CompletionService类有一个方法用来发送任务给执行器,还有一个方法为下一个已经执行结束的任务获取Future对象。从内部实现机制来看,CompletionService类使用Executor对象来执行任务。这个行为的优点是能够共享CompletionService对象,并发送任务到执行器,而后其余的对象能够处理任务的结果。第二个方法有一个不足之处,他只能为已经执行结束的任务获取future对象,所以,这些Future对象只能被用来获取任务的结果。java

Code并发

package com.packtpub.java7.concurrency.chapter4.recipe11.task;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * This class simulates the generation of a report. Is a Callable
 * object that will be executed by the executor inside a 
 * CompletionService
 *
 */
public class ReportGenerator implements Callable<String> {

    /**
     * The sender of the report
     */
    private String sender;
    /**
     * The title of the report
     */
    private String title;
    
    /**
     * Constructor of the class. Initializes the two attributes
     * @param sender The sender of the report
     * @param title The title of the report
     */
    public ReportGenerator(String sender, String title){
        this.sender=sender;
        this.title=title;
    }

    /**
     * Main method of the ReportGenerator. Waits a random period of time
     * and then generates the report as a String.
     */
    @Override
    public String call() throws Exception {
        try {
            Long duration=(long)(Math.random()*10);
            System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender,this.title,duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String ret=sender+": "+title;
        return ret;
    }
    
}
package com.packtpub.java7.concurrency.chapter4.recipe11.task;

import java.util.concurrent.CompletionService;

/**
 * This class represents every actor that can request a report. For this example,
 * it simply create three ReportGenerator objects and execute them through a 
 * CompletionService
 *
 */
public class ReportRequest implements Runnable {

    /**
     * Name of this ReportRequest
     */
    private String name;
    
    /**
     * CompletionService used for the execution of the ReportGenerator tasks
     */
    private CompletionService<String> service;
    
    /**
     * Constructor of the class. Initializes the parameters
     * @param name Name of the ReportRequest
     * @param service Service used for the execution of tasks
     */
    public ReportRequest(String name, CompletionService<String> service){
        this.name=name;
        this.service=service;
    }

    /**
     * Main method of the class. Create three ReportGenerator tasks and executes them
     * through a CompletionService
     */
    @Override
    public void run() {
            ReportGenerator reportGenerator=new ReportGenerator(name, "Report");
            service.submit(reportGenerator);
    }

}
package com.packtpub.java7.concurrency.chapter4.recipe11.task;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * This class will take the results of the ReportGenerator tasks executed through
 * a CompletinoService
 *
 */
public class ReportProcessor implements Runnable {

    /**
     * CompletionService that executes the ReportGenerator tasks
     */
    private CompletionService<String> service;
    /**
     * Variable to store the status of the Object. It will executes until the variable
     * takes the true value
     */
    private boolean end;
    
    /**
     * Constructor of the class. It initializes the attributes of the class
     * @param service The CompletionService used to execute the ReportGenerator tasks
     */
    public ReportProcessor (CompletionService<String> service){
        this.service=service;
        end=false;
    }

    /**
     * Main method of the class. While the variable end is false, it
     * calls the poll method of the CompletionService and waits 20 seconds
     * for the end of a ReportGenerator task
     */
    @Override
    public void run() {
        while (!end){
            try {
//调用CompletionService接口的poll()方法,来获取下一个已经完成的任务Future对象。
                Future<String> result=service.poll(20, TimeUnit.SECONDS);
                if (result!=null) {
                    String report=result.get();
                    System.out.printf("ReportReceiver: Report Recived: %s\n",report);
                }            
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        
        System.out.printf("ReportSender: End\n");
    }

    /**
     * Method that establish the value of the end attribute
     * @param end New value of the end attribute.
     */
    public void setEnd(boolean end) {
        this.end = end;
    }
    
    
}
package com.packtpub.java7.concurrency.chapter4.recipe11.core;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportProcessor;
import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportRequest;

/**
 * Main class of the example creates all the necessary objects and throws the tasks
 *
 */
public class Main {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // Create the executor and thee CompletionService using that executor
        ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();
        CompletionService<String> service=new ExecutorCompletionService<>(executor);

        // Crete two ReportRequest objects and two Threads to execute them
        ReportRequest faceRequest=new ReportRequest("Face", service);
        ReportRequest onlineRequest=new ReportRequest("Online", service);
        Thread faceThread=new Thread(faceRequest);
        Thread onlineThread=new Thread(onlineRequest);
        
        // Create a ReportSender object and a Thread to execute  it
        ReportProcessor processor=new ReportProcessor(service);
        Thread senderThread=new Thread(processor);
        
        // Start the Threads
        System.out.printf("Main: Starting the Threads\n");
        faceThread.start();
        onlineThread.start();
        senderThread.start();
        
        // Wait for the end of the ReportGenerator tasks
        try {
            System.out.printf("Main: Waiting for the report generators.\n");
            faceThread.join();
            onlineThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // Shutdown the executor
        System.out.printf("Main: Shuting down the executor.\n");
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // End the execution of the ReportSender
        processor.setEnd(true);
        System.out.printf("Main: Ends\n");

    }

}

工做原理dom

在范例的主类中,咱们调用了Executors工厂类的newCachedThreadPool()方法建立了ThreadPoolExecutor执行器对象。而后,使用这个对象初始化了CompletionService对象,由于完成服务(Completion Service)使用执行器来执行任务。而后,调用ReportRequest类中的submit()方法,利用“完成服务”来执行任务。ide

  当“完成服务”任务结束,这些任务中的一个任务就执行结束了,“完成服务”中存储着Future对象,用来控制它在队列中执行。this

  调用poll()方法访问这个队列,查看是否有任务已经完成,若是有,则返回队列中的第一个元素(即一个执行完成后的future对象)。当poll()返回Future对象后,他将从队列中删除这个Future对象。在这个示例中,咱们调用poll()方法时传递了两个参数,表示当队列里完成任务结果为空时,想要等待任务执行结束的时间。code

  一旦建立了CompletionService对象,还要建立两个ReportRequest对象,用来执行在CompletionService中的lianggeReportGenerator任务。ReportProcessor任务则将处理两个被发送到执行器里的ReportRequest任务所产生的结果。
对象

相关文章
相关标签/搜索