netty提供了统一的API进行传输数据,这个相比于JDK的方式方便不少。好比下面是一个不用netty而使用原生的阻塞IO进行传输的例子。数据库
public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for(;;) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!rn".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch (IOException e) { e.printStackTrace(); } } }
代码很好理解,为每个新来的链接建立一个线程处理。这种方式有个比较大的问题是,客户端链接数受限于服务器所能承受的线程数。为了改进这个问题咱们可使用异步模式来重写这段代码,可是你会发现,几乎全部的代码都要重写。原生的OIO和NIO的API几乎彻底不能复用。不信你看看下面这段NIO的代码,安全
public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!rn".getBytes()); for (;;){ try { selector.select(); } catch (IOException ex) { ex.printStackTrace(); //handle exception break; } Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println( "Accepted connection from " + client); } if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // ignore on close } } } } } }
这个代码不作过多解释了,毕竟咱们的重点是netty不是JDK NIO。服务器
用netty实现一个OIO的程序是下面这样的姿式:多线程
public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!rn", Charset.forName("UTF-8"))); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ChannelInboundHandlerAdapter() { @Override public void channelActive( ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buf.duplicate()) .addListener( ChannelFutureListener.CLOSE); } }); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }
而后若是要改为异步非阻塞的模式,只须要把OioEventLoopGroup
改为NioEventLoopGroup
,把OioServerSocketChannel
改为NioServerSocketChannel
,简单到使人发指。异步
下面是Channel
的类关系图,socket
从这幅图看出ChannelConfig
和ChannelPipeline
都属于Channel
,在代码中体现为类的成员。ChannelPipeline
其实前面咱们也讲过了,它实现了责任链模式,把ChannelHandler
一个个串起来。经过后者咱们能够拥有包括但不限于以下的功能:ide
下面列举了一些Channle
自己提供的重要方法。工具
方法名oop
解释测试
eventLoop()
返回分配到channel上的eventloop
pipeline()
返回分配到channel上的channelpipeline
isActive()
返回到channel是否链接到一个远程服务
localAddress()
返回本地绑定的socketAddress
remoteAddress()
返回远程绑定的socketAddress
write()
写入数据到远程(客户端或者服务端),数据会通过channelpipeline
有些方法咱们已经在前面的示例中见过了。来看下write()
方法的使用示例:
Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); ChannelFuture cf = channel.writeAndFlush(buf); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { System.out.println("Write successful"); } else { System.err.println("Write error"); future.cause().printStackTrace(); } } });
简单解释下,
buf里是要写的数据,而后调用write方法写入数据,返回一个写入的future结果。前面已经说过这个future了,咱们给future添加一个监听器,以便写入成功后能够经过回调获得通知。
另外write这个方法也是线程安全的,下面是一个用多线程操做write方法的示例,
final Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere final ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); Runnable writer = new Runnable() { @Override public void run() { channel.write(buf.duplicate()); } }; Executor executor = Executors.newCachedThreadPool(); // write in one thread executor.execute(writer); // write in another thread executor.execute(writer); //... }
netty保证write方法线程安全的原理,是将用户线程的操做封装成Task放入消息队列中,底层由同一个I/O线程负责执行,这样就实现了局部无锁化。
这部分要解释清楚须要深刻到源码底层,由于本篇系列是netty in action的笔记系列就很少说了。后面可能考虑写一个源码解析系列在深刻这一块。
这是netty最多见的使用场景。当channel状态变动时用户能够收到通知,有如下几个状态:
如上图所示,netty内部其实也是封装了JDK的NIO,使用selector来管理IO状态的变动。在前面的章节里咱们其实给过JDK NIO的代码示例,这里就不贴出来了。
netty NIO模型里有一个不得不说的特性叫zero-file-copy
,不少地方翻译成零拷贝。这种特性可让咱们直接在文件系统和网卡传输数据,避免了数据从内核空间到用户空间的拷贝。
OIO是在netty里是一种折中的存在,阻塞的方式尽管应用场景不多,可是不表明不存在。好比经过jdbc调用数据库,若是是异步的方案是不太合适的。
netty的OIO模型底层也是调用JDK,前面的笔记咱们也给过示例。这种模型就是用一个线程处理监听(accetp),而后为每一个成功的链接建立一个处理线程。这样作的目的是防止对于某个链接的处理阻塞影响其它链接,毕竟I/O操做是很容易引发阻塞的。
既然是阻塞的模型,netty的封装能作的工做也有限。netty只是给socket上加了SO_TIMEOUT
,这样若是一个操做在超时时间内没有完成,就会抛出SocketTimeoutException
,netty会捕获这个异常,而后继续后面的流程。而后就是下一个EventLoop执行,循环往复。这种处理方案弊端在于抛出异常的开销,由于异常会占用堆栈。
这个图就是对上面的归纳,分配一个线程给socket,socket链接服务器而后读数据,读数据可能阻塞也可能成功。若是是前者捕获异常后再次重试。
netty包含对本地传输的支持,这个传输实现使用相同的API用于虚拟机之间的通讯,传输是彻底异步的。
每一个Channel使用惟一的SocketAddress,客户端经过使用SocketAddress进行链接,在服务器会被注册为长期运行,一旦通道关闭,它会自动注销,客户端没法再使用它。
使用本地传输服务器的行为与其余的传输实现几乎是相同的,须要注意的一个重点是只能在本地的服务器和客户端上使用它们。
Embedded transport可让你更容易的在不一样的ChannelHandler之间的交互,更多的时候它像是一个工具类。通常用于测试的场景。它自带了一个具体的Channel实现,EmbeddedChannel
。好比下面是一个使用示例:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException("frameLength must be positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= frameLength) { ByteBuf buf = in.readBytes(frameLength); out.add(buf); } } }
@Test public void testFramesDecoded() { ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3)); Assert.assertTrue(channel.writeInbound(input.retain())); Assert.assertTrue(channel.finish()); ByteBuf read = channel.readInbound(); Assert.assertEquals(buf.readSlice(3), read); read.release(); read = channel.readInbound(); Assert.assertEquals(buf.readSlice(3), read); read.release(); read = channel.readInbound(); Assert.assertEquals(buf.readSlice(3), read); read.release(); Assert.assertNull(channel.readInbound()); buf.release(); }
用到的几个方法解释下,
更多的使用细节能够去网上了解下。