Netty是一个高性能 事件驱动的异步的非堵塞的IO(NIO)框架,用于创建TCP等底层的链接,基于Netty能够创建高性能的Http服务器。支持HTTP、 WebSocket 、Protobuf、 Binary TCP |和UDP,Netty已经被不少高性能项目做为其Socket底层基础,如HornetQ Infinispan Vert.x
Play Framework Finangle和 Cassandra。其竞争对手是:Apache MINA和 Grizzly。css
传统堵塞的IO读取以下:html
InputStream is = new FileInputStream("input.bin");
int byte = is.read(); // 当前线程等待结果到达直至错误
while (true) {
selector.select(); // 从多个通道请求事件
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectorKey key = (SelectionKey) it.next();
handleKey(key);
it.remove();
}
}
传统硬件的堵塞以下,从内存中读取数据,而后写到磁盘,而CPU一直等到磁盘写完成,磁盘的写操做是慢的,这段时间CPU被堵塞不能发挥效率。安全
使用非堵塞的DMA以下图:CPU只是发出写操做这样的指令,作一些初始化工做,DMA具体执行,从内存中读取数据,而后写到磁盘,当完成写后发出一个中断事件给CPU。这段时间CPU是空闲的,能够作别的事情。这个原理称为Zero.copy零拷贝。服务器
Netty底层基于上述Java NIO的零拷贝原理实现:框架
Netty的使用代码以下:异步
Channel channel = ...
ChannelFuture cf = channel.write(data);
cf.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(!future.isSuccess() {
future.cause().printStacktrace();
...
}
...
}
});
...
cf.sync();
经过引入观察者监听,当有数据时,将自动激活监听者中的代码运行。socket
咱们使用Netty创建一个服务器代码:ide
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
// new LoggingHandler(LogLevel.INFO),
new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(port).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
这段代码调用:在9999端口启动oop
new EchoServer(9999).run();
咱们须要完成的代码是EchoServerHandler:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(EchoServerHandler.class.getName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
一个Netty服务器的原理以下:性能
图中每次请求的读取是经过UpStream来实现,而后激活咱们的服务逻辑如EchoServerHandler,而服务器向外写数据,也就是响应是经过DownStream实现的。每一个通道Channel包含一对UpStream和DownStream,以及咱们的handlers(EchoServerHandler),以下图,这些都是经过channel pipeline封装起来的,数据流在管道里流动,每一个Socket对应一个ChannelPipeline。
CHANNELPIPELINE是关键,它相似Unix的管道,有如下做用:
前面咱们演示了服务器端代码,下面是客户端代码:
public class EchoClient {
private final String host;
private final int port;
private final int firstMessageSize;
public EchoClient(String host, int port, int firstMessageSize) {
this.host = host;
this.port = port;
this.firstMessageSize = firstMessageSize;
}
public void run() throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
// new LoggingHandler(LogLevel.INFO),
new EchoClientHandler(firstMessageSize));
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
客户端的应用逻辑EchoClientHandler:
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(EchoClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
System.out.print("active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
System.out.print("read");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
System.out.print("readok");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}