CompletionService类有一个方法用来发送任务给执行器,还有一个方法为下一个已经执行结束的任务获取Future对象。从内部实现机制来看,CompletionService类使用Executor对象来执行任务。这个行为的优点是能够共享CompletionService对象,并发送任务到执行器,而后其余的对象能够处理任务的结果。第二个方法有一个不足之处,他只能为已经执行结束的任务获取future对象,所以,这些Future对象只能被用来获取任务的结果。java
Code并发
package com.packtpub.java7.concurrency.chapter4.recipe11.task; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * This class simulates the generation of a report. Is a Callable * object that will be executed by the executor inside a * CompletionService * */ public class ReportGenerator implements Callable<String> { /** * The sender of the report */ private String sender; /** * The title of the report */ private String title; /** * Constructor of the class. Initializes the two attributes * @param sender The sender of the report * @param title The title of the report */ public ReportGenerator(String sender, String title){ this.sender=sender; this.title=title; } /** * Main method of the ReportGenerator. Waits a random period of time * and then generates the report as a String. */ @Override public String call() throws Exception { try { Long duration=(long)(Math.random()*10); System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender,this.title,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } String ret=sender+": "+title; return ret; } }
package com.packtpub.java7.concurrency.chapter4.recipe11.task; import java.util.concurrent.CompletionService; /** * This class represents every actor that can request a report. For this example, * it simply create three ReportGenerator objects and execute them through a * CompletionService * */ public class ReportRequest implements Runnable { /** * Name of this ReportRequest */ private String name; /** * CompletionService used for the execution of the ReportGenerator tasks */ private CompletionService<String> service; /** * Constructor of the class. Initializes the parameters * @param name Name of the ReportRequest * @param service Service used for the execution of tasks */ public ReportRequest(String name, CompletionService<String> service){ this.name=name; this.service=service; } /** * Main method of the class. Create three ReportGenerator tasks and executes them * through a CompletionService */ @Override public void run() { ReportGenerator reportGenerator=new ReportGenerator(name, "Report"); service.submit(reportGenerator); } }
package com.packtpub.java7.concurrency.chapter4.recipe11.task; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * This class will take the results of the ReportGenerator tasks executed through * a CompletinoService * */ public class ReportProcessor implements Runnable { /** * CompletionService that executes the ReportGenerator tasks */ private CompletionService<String> service; /** * Variable to store the status of the Object. It will executes until the variable * takes the true value */ private boolean end; /** * Constructor of the class. It initializes the attributes of the class * @param service The CompletionService used to execute the ReportGenerator tasks */ public ReportProcessor (CompletionService<String> service){ this.service=service; end=false; } /** * Main method of the class. While the variable end is false, it * calls the poll method of the CompletionService and waits 20 seconds * for the end of a ReportGenerator task */ @Override public void run() { while (!end){ try { //调用CompletionService接口的poll()方法,来获取下一个已经完成的任务Future对象。 Future<String> result=service.poll(20, TimeUnit.SECONDS); if (result!=null) { String report=result.get(); System.out.printf("ReportReceiver: Report Recived: %s\n",report); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.printf("ReportSender: End\n"); } /** * Method that establish the value of the end attribute * @param end New value of the end attribute. */ public void setEnd(boolean end) { this.end = end; } }
package com.packtpub.java7.concurrency.chapter4.recipe11.core; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportProcessor; import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportRequest; /** * Main class of the example creates all the necessary objects and throws the tasks * */ public class Main { /** * @param args */ public static void main(String[] args) { // Create the executor and thee CompletionService using that executor ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool(); CompletionService<String> service=new ExecutorCompletionService<>(executor); // Crete two ReportRequest objects and two Threads to execute them ReportRequest faceRequest=new ReportRequest("Face", service); ReportRequest onlineRequest=new ReportRequest("Online", service); Thread faceThread=new Thread(faceRequest); Thread onlineThread=new Thread(onlineRequest); // Create a ReportSender object and a Thread to execute it ReportProcessor processor=new ReportProcessor(service); Thread senderThread=new Thread(processor); // Start the Threads System.out.printf("Main: Starting the Threads\n"); faceThread.start(); onlineThread.start(); senderThread.start(); // Wait for the end of the ReportGenerator tasks try { System.out.printf("Main: Waiting for the report generators.\n"); faceThread.join(); onlineThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } // Shutdown the executor System.out.printf("Main: Shuting down the executor.\n"); executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } // End the execution of the ReportSender processor.setEnd(true); System.out.printf("Main: Ends\n"); } }
工做原理dom
在范例的主类中,咱们调用了Executors工厂类的newCachedThreadPool()方法建立了ThreadPoolExecutor执行器对象。而后,使用这个对象初始化了CompletionService对象,由于完成服务(Completion Service)使用执行器来执行任务。而后,调用ReportRequest类中的submit()方法,利用“完成服务”来执行任务。ide
当“完成服务”任务结束,这些任务中的一个任务就执行结束了,“完成服务”中存储着Future对象,用来控制它在队列中执行。this
调用poll()方法访问这个队列,查看是否有任务已经完成,若是有,则返回队列中的第一个元素(即一个执行完成后的future对象)。当poll()返回Future对象后,他将从队列中删除这个Future对象。在这个示例中,咱们调用poll()方法时传递了两个参数,表示当队列里完成任务结果为空时,想要等待任务执行结束的时间。code
一旦建立了CompletionService对象,还要建立两个ReportRequest对象,用来执行在CompletionService中的lianggeReportGenerator任务。ReportProcessor任务则将处理两个被发送到执行器里的ReportRequest任务所产生的结果。
对象