最近从头了解了一下java的异步编程,经过一些例子去展示不一样java版本下怎么去实现任务的并行。 java
(一)Java 1.0的经过Thread 类 和 Runnable接口算法
Thread类实现了Runnable接口。编程
通常用法:框架
1)建立一个类SendToCUPD,实现Runnable接口,重写run()方法去实现任务异步
2)建立一个类SendToCUPD的实例, 做为参数去新建一个Thread并调用Thread.start 方法。 而后系统去新建一个线程去执行这个任务ide
public class JThread { public static void main(String[] args) { // TODO Auto-generated method stub System.out.println("get request from TC thread id="+Thread.currentThread().getId()); Runnable createRM = new CreateRichMedia("Thread RM"); new Thread(createRM).start(); Runnable tCUPD=new SendToCUPD("Thread CUPD"); new Thread(tCUPD).start(); System.out.println("End main method"); } } class SendToCUPD implements Runnable{ public String TName; public SendToCUPD(String TName) { // TODO Auto-generated constructor stub this.TName=TName; } @Override public void run() { System.out.println("b4 send to cupd"); try { System.out.println(" sending to cupd, thread id="+Thread.currentThread().getId()); Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("end send to cupd"); } } class CreateRichMedia implements Runnable{ public String TName; public CreateRichMedia(String TName) { // TODO Auto-generated constructor stub this.TName=TName; } @Override public void run() { System.out.println("start response Rich media"); try { System.out.println("creating response Rich media, thread id="+Thread.currentThread().getId()); Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("End response Rich media"); } }
(二)Java5, Callable, Future, FutureTask异步编程
经过Runable和Thread, 没法获取子线程的运行结果。Future接口能够理解成一个任务, Future.get()方法能够获取任务的运行结果函数
例子步骤:this
1)建立一个RMTask和CUPDTask, 实现Callable接口。 重写call()方法去实现任务
spa
2)经过Executors类去建立一个线程池
3)经过Executors.submit()去建立子线程去执行任务。
public class J5Thread { public static void main(String[] args) { // ES System.out.println("Get request from TC, thread id="+Thread.currentThread().getId()); ExecutorService ES=Executors.newCachedThreadPool(); //task Callable<Integer> rmTask= new RMTask(); Callable<Integer> cupdTask = new CUPDTask(); //ES submit task, //param is callable //return result Future<Integer> rmResult=ES.submit(rmTask); Future<Integer> cupdResult= ES.submit(cupdTask); ES.shutdown(); System.out.println("ES shutdown, thread id="+Thread.currentThread().getId()); //ES shutdown try { Integer result = rmResult.get(); System.out.println("process rmResult, thread id="+Thread.currentThread().getId()); System.out.println("process rmResult="+result); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //define task class RMTask implements Callable<Integer>{ @Override public Integer call() throws Exception { // TODO Auto-generated method stub System.out.println("creating response Rich media, thread id="+Thread.currentThread().getId()); Thread.sleep(3000); System.out.println("end response Rich media, thread id="+Thread.currentThread().getId()); return new Integer(3); } } class CUPDTask implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("creating CUPD msg, thread id="+Thread.currentThread().getId()); Thread.sleep(8000); System.out.println("end response Rich media, thread id="+Thread.currentThread().getId()); return new Integer(10); }
(三)Java7, fork/join
Java 7在java.util.concurrent包下加入了支持fork/join框架的RecursiveTask,forkjoinpool
例子,去计算从start到end的连续数字的和:
1)执行任务的类要继承RecursiveTask类
2)定义一个compute方法,将要算的数分红两组,再分别调用compute方法去算。 运用了递归算法
3)定义一个ForkJoinPool去执行任务, submit方法返还一个Future类, 经过future.get()去获取任务结果
public class J7ForkJoin extends RecursiveTask<Integer>{ private static final int THRESHOLD=5; private int beginning; private int ending; public J7ForkJoin(int beginning, int ending) { super(); this.beginning = beginning; this.ending = ending; } //override compute @Override protected Integer compute() { // TODO Auto-generated method stub //check if still need fork int sum=0; boolean canCompute = (ending-beginning)<=THRESHOLD; if(canCompute){ //no need System.out.println("no need fork, ThreadID="+Thread.currentThread().getId()); System.out.println("no need fork, beginning="+this.beginning); System.out.println("no need fork, ending="+this.ending); for(int i=beginning; i<=ending;++i){ sum+=i; } return sum; }else{ //need int interim=(this.ending+this.beginning)/2; //5+1 /2 =3 1-3 //6+1 /2 =3 1-3 System.out.println("need fork, ThreadID="+Thread.currentThread().getId()); System.out.println("need fork, interim="+interim); J7ForkJoin leftTask=new J7ForkJoin(this.beginning , interim); J7ForkJoin rightTask=new J7ForkJoin(interim+1, this.ending); //.fork leftTask.fork(); rightTask.fork(); //get reuslt int leftResult=leftTask.join(); int rightResult=rightTask.join(); sum =leftResult+rightResult; return sum; } } public static void main(String[] args) { // TODO Auto-generated method stub ForkJoinPool FJPool=new ForkJoinPool(); J7ForkJoin task=new J7ForkJoin(1, 20); Future<Integer> result=FJPool.submit(task); try { System.out.println("main result="+result.get()); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
能够定义回调函数, 子线程执行完后,会触发回调函数
步骤:
1)CompletableFuture.supplyAsync(),定义要执行的异步任务
2)cupdResult.thenAccept(new Consumer<String>() , 重写accept()方法去定义回调函数
public class J8ComFuture3 { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); // get msg from tc System.out.println("Got reqeust from TC"); // prepare RM System.out.println("prepare RM msg"); // trigger cupd CompletableFuture<String> cupdResult = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { // TODO Auto-generated method stub try { System.out.println("sleep b4"); TimeUnit.SECONDS.sleep(1); System.out.println("sleep after"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "msg from CUPD"; } }, executor); // return cupd cupdResult.thenAccept(new Consumer<String>() { // callback method public void accept(String arg0) { System.out.println("return msg to TC=" + arg0); } }); // return RM System.out.println("return RM msg to customer"); } }