整个Netty的API都是异步的。java
下面代码是一个简单的回调:异步
package netty.in.action; public class Worker { public void doWork() { Fetcher fetcher = new MyFetcher(new Data(1, 0)); fetcher.fetchData(new FetcherCallback() { @Override public void onError(Throwable cause) { System.out.println("An error accour: " + cause.getMessage()); } @Override public void onData(Data data) { System.out.println("Data received: " + data); } }); } public static void main(String[] args) { Worker w = new Worker(); w.doWork(); } }
package netty.in.action; public interface Fetcher { void fetchData(FetcherCallback callback); }
package netty.in.action; public class MyFetcher implements Fetcher { final Data data; public MyFetcher(Data data){ this.data = data; } @Override public void fetchData(FetcherCallback callback) { try { callback.onData(data); } catch (Exception e) { callback.onError(e); } } }
package netty.in.action; public interface FetcherCallback { void onData(Data data) throws Exception; void onError(Throwable cause); }
package netty.in.action; public class Data { private int n; private int m; public Data(int n,int m){ this.n = n; this.m = m; } @Override public String toString() { int r = n/m; return n + "/" + m +" = " + r; } }
Fetcher.fetchData()方法需传递一个FetcherCallback类型的参数,当得到数据或发生错误时被回调。ide
FetcherCallback.onData(),将接收数据时被调用函数
FetcherCallback.onError(),发生错误时被调用性能
回调过程有个问题就是当你使用链式调用不少不一样的方法会致使线性代码;fetch
Futuresthis
Futures是一个抽象的概念,它表示一个值,该值可能在某一点变得可用。一个Future要么得到计算完的结果,要么得到计算失败后的异常。编码
Java在java.util.concurrent包中附带了Future接口,它使用Executor异步执行。spa
例以下面的代码,每传递一个Runnable对象到ExecutorService.submit()方法就会获得一个回调的Future,你能使用它检测是否执行完成。netty
package netty.in.action; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class FutureExample { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); Runnable task1 = new Runnable(){ @Override public void run() { //do something System.out.println("i am task1....."); } }; Callable<Integer> task2 = new Callable<Integer>() { @Override public Integer call() throws Exception { //do something return new Integer(100); } }; Future<?> f1 = executor.submit(task1); Future<Integer> f2 = executor.submit(task2); System.out.println("task1 is completed? " + f1.isDone()); System.out.println("task2 is completed? " + f2.isDone()); //waiting task1 completed while(f1.isDone()){ System.out.println("task1 completed."); break; } //waiting task2 completed while(f2.isDone()){ System.out.println("return value by task2: " + f2.get()); break; } } }
有时候使用Future感受很丑陋,由于你须要间隔检查Future是否已完成,而使用回调会直接收到返回通知。
Netty使用以上两种异步处理方式,提供一箭双鵰的方案。
JAVA中:NIO2看起来很理想,可是NIO2只支持Jdk1.7+,若你的程序在Java1.6上运行,则没法使用NIO2。
另外,Java7的NIO2中没有提供DatagramSocket的支持,因此NIO2只支持TCP程序,不支持UDP程序。
JAVA中:ByteBuffer是一个数据容器,可是惋惜的是JDK没有开发ByteBuffer实现的源码;ByteBuffer容许包装一个byte[]来得到一个实例,若是你但愿尽可能减小内存拷贝,那么这种方式是很是有用的。若果你想将ByteBuffer从新实现,那么不要浪费你的时间了,ByteBuffer的构造函数是私有的,因此它不能被扩展。
Netty提供了本身的ByteBuffer实现,Netty经过一些简单的APIs对ByteBuffer进行构造、使用和操做,以此来解决NIO中的一些限制。
JAVA中: NIO对缓冲区的聚合和分散操做可能会操做内存泄露。不少Channel的实现支持Gather和Scatter。这个功能容许从多个ByteBuffer中读入或写入到多个ByteBuffer,这样作能够提升性能。若是要分割的数据在多个不一样的ByteBuffer中,使用Gather/Scatter是比较好的方式。
例如,你可能但愿header在一个ByteBuffer中,而body在另外的ByteBuffer中:
下图显示的是Scatter(分散),将ScatteringByteBuffer中的数据分散读取到多个ByteBuffer中:
下图显示的是Gather(聚合),将多个ByteBuffer的数据写入到GatheringByteChannel:
惋惜Gather/Scatter功能会致使内存泄露,直到Java7才解决内存泄露问题。使用这个功能必须当心编码和Java版本。