编者注:Netty是Java领域有名的开源网络库,特色是高性能和高扩展性,所以不少流行的框架都是基于它来构建的,好比咱们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,通常都是基于Netty来构建,好比soft-bolt。总之一句话,Java小伙伴们须要且有必要学会使用Netty并理解其实现原理。
关于Netty的入门讲解可参考:Netty 入门,这一篇文章就够了java
Netty的链接处理就是IO事件的处理,IO事件包括读事件、ACCEPT事件、写事件和OP_CONNECT事件。程序员
IO事件的处理是结合ChanelPipeline来作的,一个IO事件到来,首先进行数据的读写操做,而后交给ChannelPipeline进行后续处理,ChannelPipeline中包含了channelHandler链(head + 自定义channelHandler + tail)。
使用channelPipeline和channelHandler机制,起到了解耦和可扩展的做用。一个IO事件的处理,包含了多个处理流程,这些处理流程正好对应channelPipeline中的channelHandler。若是对数据处理有新的需求,那么就新增channelHandler添加到channelPipeline中,这样实现很6,之后本身写代码能够参考。面试
说到这里,通常为了知足扩展性要求,经常使用2种模式:网络
netty的channelHandler
的channelPipeline
能够理解成就是责任链模式,经过动态增长channelHandler可达到复用和高扩展性目的。架构
了解netty链接处理机制以前须要了解下NioEventLoop模型,其中处理链接事件的架构图以下:框架
对应的处理逻辑源码为:socket
// 处理各类IO事件 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // OP_CONNECT事件,client链接上客户端时触发的事件 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 注意,这里读事件和ACCEPT事件对应的unsafe实例是不同的 // 读事件 -> NioByteUnsafe, ACCEPT事件 -> NioMessageUnsafe unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
从上面代码来看,事件主要分为3种,分别是OP_CONNECT事件、写事件和读事件(也包括ACCEPT事件)。下面分为3部分展开:ide
// NioMessageUnsafe public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { do { // 调用java socket的accept方法,接收请求 int localRead = doReadMessages(readBuf); // 增长统计计数 allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } // readBuf中存的是NioChannel int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 触发fireChannelRead pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); }
链接创建好以后就该链接的channel注册到workGroup中某个NIOEventLoop的selector中,注册操做是在fireChannelRead中完成的,这一块逻辑就在ServerBootstrapAcceptor.channelRead中。oop
// ServerBootstrapAcceptor public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; // 设置channel的pipeline handler,及channel属性 child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 将channel注册到childGroup中的Selector上 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
// NioByteUnsafe public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); // 从channel中读取数据,存放到byteBuf中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); allocHandle.incMessagesRead(1); readPending = false; // 触发fireChannelRead pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); // 触发fireChannelReadComplete,若是在fireChannelReadComplete中执行了ChannelHandlerContext.flush,则响应结果返回给客户端 allocHandle.readComplete(); // 触发fireChannelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
正常状况下通常是不会注册写事件的,若是Socket发送缓冲区中没有空闲内存时,再写入会致使阻塞,此时能够注册写事件,当有空闲内存(或者可用字节数大于等于其低水位标记)时,再响应写事件,并触发对应回调。性能
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 写事件,从flush操做来看,虽然以前没有向socket缓冲区写数据,可是已经写入到 // 了chnanel的outboundBuffer中,flush操做是将数据从outboundBuffer写入到 // socket缓冲区 ch.unsafe().forceFlush(); }
该事件是client触发的,由主动创建链接这一侧触发的。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // OP_CONNECT事件,client链接上客户端时触发的事件 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); // 触发finishConnect事件,其中就包括fireChannelActive事件,若是有自定义的handler有channelActive方法,则会触发 unsafe.finishConnect(); }
推荐阅读
欢迎小伙伴关注【TopCoder】阅读更多精彩好文。