java异步编程

        不少时候咱们都但愿可以最大的利用资源,好比在进行IO操做的时候尽量的避免同步阻塞的等待,由于这会浪费CPU的资源。若是在有可读的数据的时候可以通知程序执行读操做甚至由操做系统内核帮助咱们完成数据的拷贝,这再好不过了。从NIO到CompletableFuture、LambdaFork/Join,java一直在努力让程序尽量变的异步甚至拥有更高的并行度,这一点一些函数式语言作的比较好,所以java也或多或少的借鉴了某些特性。下面介绍一种很是经常使用的实现异步操做的方式。java

考虑有一个耗时的操做,操做完后会返回一个结果(不论是正常结果仍是异常),程序若是想拥有比较好的性能不可能由线程去等待操做的完成,而是应该采用listener模式。jdk并发包里的Future表明了将来的某个结果,当咱们向线程池中提交任务的时候会返回该对象。代码例子:编程

/**
 * jdk1.8以前的Future
 * 
 * @author Administrator
 *
 */
public class JavaFuture {
	public static void main(String[] args) throws Throwable, ExecutionException {
		ExecutorService executor = Executors.newFixedThreadPool(1);
		// Future表明了线程执行完之后的结果,能够经过future得到执行的结果
		// 可是jdk1.8以前的Future有点鸡肋,并不能实现真正的异步,须要阻塞的获取结果,或者不断的轮询
		// 一般咱们但愿当线程执行完一些耗时的任务后,可以自动的通知咱们结果,很遗憾这在原生jdk1.8以前
		// 是不支持的,可是咱们能够经过第三方的库实现真正的异步回调
		Future<String> f = executor.submit(new Callable<String>() {

			@Override
			public String call() throws Exception {
				System.out.println("task started!");
				Thread.sleep(3000);
				System.out.println("task finished!");
				return "hello";
			}
		});

		//此处阻塞main线程
		System.out.println(f.get());
		System.out.println("main thread is blocked");
	}
}
若是想得到耗时操做的结果,能够经过get方法获取,可是该方法会阻塞当前线程,咱们能够在作完剩下的某些工做的时候调用get方法试图去获取结果,也能够调用非阻塞的方法isDone来肯定操做是否完成,这种方式有点儿相似下面的过程:


这种方式对流程的控制很混乱,可是在jdk1.8以前只提供了这种笨拙的实现方式,以致于不少高性能的框架都实现了本身的一套异步框架,好比Netty和Guava,下面分别介绍下这三种异步的实现方式(包括jdk1.8)。首先是Guava中的实现方式:promise

package guava;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

/**
 * Guava中的Future
 * 
 * @author Administrator
 *
 */
public class GuavaFuture {
	public static void main(String[] args) {
		ExecutorService executor = Executors.newFixedThreadPool(1);

		// 使用guava提供的MoreExecutors工具类包装原始的线程池
		ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
		//向线程池中提交一个任务后,将会返回一个可监听的Future,该Future由Guava框架提供
		ListenableFuture<String> lf = listeningExecutor.submit(new Callable<String>() {

			@Override
			public String call() throws Exception {
				System.out.println("task started!");
				//模拟耗时操做
				Thread.sleep(3000);
				System.out.println("task finished!");
				return "hello";
			}
		});
		//添加回调,回调由executor中的线程触发,但也能够指定一个新的线程
		Futures.addCallback(lf, new FutureCallback<String>() {

			//耗时任务执行失败后回调该方法
			@Override
			public void onFailure(Throwable t) {
				System.out.println("failure");
			}
			
			//耗时任务执行成功后回调该方法
			@Override
			public void onSuccess(String s) {
				System.out.println("success " + s);
			}
		});
		
		//主线程能够继续作其余的工做
		System.out.println("main thread is running");
	}
}
Guava提供了一套完整的异步框架,核心是可监听的Future,经过注册监听器或者回调方法实现及时获取操做结果的能力。须要提一点的是,假设添加监听的时候耗时操做已经执行完了,此时回调方法会被当即执行并不会丢失。想探究其实现方式的话能够跟一下源码,底层的原理并不难。

谈到异步编程就不得不提一下Promise,不少函数式语言好比js原生支持Promise,可是在java界也有一些promise框架,其中就有大名鼎鼎的Netty。从Future、Callback到Promise甚至线程池,Netty实现了一套完整的异步框架,而且netty代码中也大量使用了Promise,下面是Netty中的例子:并发

package netty_promise;

import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

/**
 * netty中的promise
 * 
 * @author Administrator
 *
 */
public class PromiseTest {
	@SuppressWarnings({ "unchecked", "rawtypes" })
	public static void main(String[] args) throws Throwable {
		//线程池
		EventExecutorGroup group = new DefaultEventExecutorGroup(1);
		//向线程池中提交任务,并返回Future,该Future是netty本身实现的future
		//位于io.netty.util.concurrent包下,此处运行时的类型为PromiseTask
		Future<?> f = group.submit(new Runnable() {
			
			@Override
			public void run() {
				System.out.println("任务正在执行");
				//模拟耗时操做,好比IO操做
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("任务执行完毕");
			}
		});
		//增长监听
		f.addListener( new FutureListener() {
			@Override
			public void operationComplete(Future arg0) throws Exception {
				System.out.println("ok!!!");
			}
		});
		System.out.println("main thread is running.");
	}
}
直到jdk1.8才算真正支持了异步操做,其中借鉴了某些框架的实现思想,但又有新的功能,同时在jdk1.8中提供了lambda表达式,使得java向函数式语言又靠近了一步。借助jdk原生的CompletableFuture能够实现异步的操做,同时结合lambada表达式大大简化了代码量。代码例子以下:

package netty_promise;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

/**
 * 基于jdk1.8实现任务异步处理
 * 
 * @author Administrator
 *
 */
public class JavaPromise {
	public static void main(String[] args) throws Throwable, ExecutionException {
		// 两个线程的线程池
		ExecutorService executor = Executors.newFixedThreadPool(2);
		//jdk1.8以前的实现方式
		CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
			@Override
			public String get() {
				System.out.println("task started!");
				try {
					//模拟耗时操做
					Thread.sleep(2000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return "task finished!";
			}
		}, executor);

		//采用lambada的实现方式
		future.thenAccept(e -> System.out.println(e + " ok"));
		
		System.out.println("main thread is running");
	}
}
以上的三种实现方式相似下面的过程:


上面的图只是简单的表示了一下异步的实现流程,实际的调用中看似顺序的步骤会发生线程的切换。框架