分析netty从源码开始java
准备工做:git
1.下载源代码:https://github.com/netty/netty.gitgithub
我下载的版本为4.1
netty提供了一个netty-example工程,数据库
分类以下:bootstrap
Fundamentalapi
Echo ‐ the very basic client and server
Discard ‐ see how to send an infinite data stream asynchronously without flooding the write buffer
Uptime ‐ implement automatic reconnection mechanism
Text protocolspromise
Telnet ‐ a classic line-based network application
Quote of the Moment ‐ broadcast a UDP/IP packet
SecureChat ‐ an TLS-based chat server, derived from the Telnet example
Binary protocols缓存
ObjectEcho ‐ exchange serializable Java objects
Factorial ‐ write a stateful client and server with a custom binary protocol
WorldClock ‐ rapid protocol protyping with Google Protocol Buffers integration
HTTP安全
Snoop ‐ build your own extremely light-weight HTTP client and server
File server ‐ asynchronous large file streaming in HTTP
Web Sockets (Client & Server) ‐ add a two-way full-duplex communication channel to HTTP using Web Sockets
SPDY (Client & Server) ‐ implement SPDY protocol
CORS demo ‐ implement cross-origin resource sharing
Advanced服务器
Proxy server ‐ write a highly efficient tunneling proxy server
Port unification ‐ run services with different protocols on a single TCP/IP port
UDT
Byte streams ‐ use UDT in TCP-like byte streaming mode
Message flow ‐ use UDT in UDP-like message delivery mode
Byte streams in symmetric peer-to-peer rendezvous connect mode
Message flow in symmetric peer-to-peer rendezvous connect mode
咱们的分析从这里开始,netty是client-server形式的,咱们以最简单的discard示例开始,其服务器端代码以下:
/** * Discards any incoming data. */ public final class DiscardServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8009")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new DiscardServerHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
上面的代码中使用了下面几个类:
实现类为NioEventLoopGroup,其层次结构为:
EventExecutorGroup为全部类的父接口,它经过next()方法来提供EventExecutor供使用。除此之外,它还负责处理它们的生命周期,容许以优雅的方式关闭。
EventExecutor是一种特殊的EventExcutorGroup,它提供了一些便利的方法来查看一个线程是否在一个事件循环中执行过,除此之外,它也扩展了EventExecutorGroup,从而提供了一个通用的获取方法的方式。
EventLoopGroup是一种特殊的EventExcutorGroup,它提供了channel的注册功能,channel在事件循环中将被后面的selection来获取到。
NioEventLoopGroup继承自MultithreadEventLoopGroup,基于channel的NIO selector会使用该类。
group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法设置父EventLoopGroup和子EventLoopGroup。这些EventLoopGroup用来处理全部的事件和ServerChannel和Channel的IO。
channel(Class<? extends C> channelClass)方法用来建立一个Channel实例。建立Channel实例要不使用此方法,若是channel实现是无参构造要么可使用channelFactory来建立。
handler(ChannelHandler handler)方法,channelHandler用来处理请求的。
childHandler(ChannelHandler childHandler)方法,设置用来处理请求的channelHandler。
当Channel注册到它的eventLoop中时,ChannelInitializer提供了一个方便初始化channel的方法。该类的实现一般用来设置ChannelPipeline的channel,一般使用在Bootstrap#handler(ChannelHandler),ServerBootstrap#handler(ChannelHandler)和ServerBootstrap#childHandler(ChannelHandler)三个场景中。示例:
public class MyChannelInitializer extends ChannelInitializer{ public void initChannel({@link Channel} channel) { channel.pipeline().addLast("myHandler", new MyHandler()); } }
而后:
ServerBootstrap bootstrap = ...; ... bootstrap.childHandler(new MyChannelInitializer());
注意:这个类标示为可共享的,所以实现类重用时须要时安全的。
理解ChannelPipeline须要先理解ChannelHandler,
4.1 ChannelHandler
处理一个IO事件或者翻译一个IO操做,而且传递给ChannelPineline的下一个handler。
由于这个接口有太多接口须要实现,所以你只有实现ChannelHandlerAdapter就能够代替实现这个接口。
ChannelHandlerContext封装了ChannelHandler。ChannelHandler应该经过context对象与它所属的ChannelPipeLine进行交互。经过使用context对象,ChannelHandler能够传递上行或者下行事件,或者动态修改pipeline,或者存储特定handler的信息(使用AttributeKey)。
一个channelHandler一般须要存储一些状态信息。最简单最值得推荐的方法是使用member变量:
public interface Message { // your methods here } public class DataServerHandler extends SimpleChannelInboundHandler<Message> { private boolean loggedIn; {@code @Override} protected void messageReceived( ChannelHandlerContext ctx, Message message) { Channel ch = e.getChannel(); if (message instanceof LoginMessage) { authenticate((LoginMessage) message); loggedIn = true; } else (message instanceof GetDataMessage) { if (loggedIn) { ch.write(fetchSecret((GetDataMessage) message)); } else { fail(); } } } ... }
注意:handler的状态附在ChannePipelineContext上,所以能够增长相同的handler实例到不一样的pipeline上:
public class DataServerInitializer extends ChannelInitializer<Channel> { private static final DataServerHandler SHARED = new DataServerHandler(); @Override public void initChannel(Channel channel) { channel.pipeline().addLast("handler", SHARED); } }
在上面的示例中,使用了一个AttributeKey,你可能注意到了@Sharable注解。
若是一个ChannelHandler使用@sharable进行注解,那就意味着你仅仅建立了一个handler一次,能够添加到一个或者多个ChannelPipeline屡次而不会产生竞争。
若是没有指定该注解,你必须每次都建立一个新的handler实例,而且增长到一个ChannelPipeline,由于它没有像member变量同样,它有一个非共享的状态。
4.2 ChannelPipeline
ChanelPipeline是一组ChanelHandler的集合,它处理或者解析Channel的Inbound事件和OutBound操做。ChannelPipeline的实现是Intercepting Filter的一种高级形式,这样用户能够控制事件如何处理,一个pipeline内部ChannelHandler如何交互。
pipeline事件流程
上图描述了IO事件如何被一个ChannelPipeline的ChannelHandler处理的。一个IO事件被一个ChannelInBoundHandler处理或者ChannelOutboundHandler,而后经过调用在ChannelHandlerContext中定义的事件传播方法传递给最近的handler,传播方法有ChannelHandlerContext#filreChannelRead(Object)和ChannelHandlerContext#write(Object)。
一个Inbound事件一般由Inbound handler来处理,如上如左上。一个Inbound handler一般处理在上图底部IO线程产生的Inbound数据。Inbound数据经过真实的输入操做如SocketChannel#read(ByteBuffer)来获取。若是一个inbound事件越过了最上面的inbound handler,该事件将会被抛弃到而不会通知你或者若是你须要关注,打印出日志。
一个outbound事件由上图的右下的outbound handler来处理。一个outbound handler一般由outbound流量如写请求产生或者转变的。若是一个outbound事件越过了底部的outbound handler,它将由channel关联的IO线程处理。IO线程一般运行的是真实的输出操做如SocketChannel#write(byteBuffer).
示例,假设咱们建立下面这样一个pipeline:
ChannelPipeline} p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
在上例中,inbound开头的handler意味着它是一个inbound handler。outbound开头的handler意味着它是一个outbound handler。上例的配置中当一个事件进入inbound时handler的顺序是1,2,3,4,5;当一个事件进入outbound时,handler的顺序是5,4,3,2,1.在这个最高准则下,ChannelPipeline跳过特定handler的处理来缩短stack的深度:
3,4没有实现ChannelInboundHandler,于是一个inbound事件的处理顺序是1,2,5.
1,2没有实现ChannelOutBoundhandler,于是一个outbound事件的处理顺序是5,4,3
若5同时实现了ChannelInboundHandler和channelOutBoundHandler,一个inbound和一个outbound事件的执行顺序分别是125和543.
如上图所示,一个handler触发ChannelHandlerContext中的事件传播方法,而后传递到下一个handler。这些方法有:
inbound 事件传播方法:
ChannelHandlerContext#fireChannelRegistered() ChannelHandlerContext#fireChannelActive() ChannelHandlerContext#fireChannelRead(Object) ChannelHandlerContext#fireChannelReadComplete() ChannelHandlerContext#fireExceptionCaught(Throwable) ChannelHandlerContext#fireUserEventTriggered(Object) ChannelHandlerContext#fireChannelWritabilityChanged() ChannelHandlerContext#fireChannelInactive() ChannelHandlerContext#fireChannelUnregistered()
outbound事件传播方法:
ChannelHandlerContext#bind(SocketAddress, ChannelPromise) ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) ChannelHandlerContext#write(Object, ChannelPromise) ChannelHandlerContext#flush() ChannelHandlerContext#read() ChannelHandlerContext#disconnect(ChannelPromise) ChannelHandlerContext#close(ChannelPromise) ChannelHandlerContext#deregister(ChannelPromise)
下面的示例展现了事件是如何传播的:
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext} ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); } } public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext} ctx, ChannelPromise} promise) { System.out.println("Closing .."); ctx.close(promise); } }
在pipeline中,一个用户通常由一个或者多个ChannelHandler来接收IO事件(例如读)和IO操做请求(如写或者close)。例如,一个典型的服务器pipeline一般具备如下几个handler,但最多有多少handler取决于协议和业务逻辑的复杂度:
Protocol Decoder--将二进制数据(如ByteBuffer)转换成一个java对象
Protocol Encoder--将一个java对象转换成二进制数据。
Business Logic Handler--处理真实的业务逻辑(如数据库访问)。
让咱们用下面的示例展现:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16); ... ChannelPipeline} pipeline = ch.pipeline(); pipeline.addLast("decoder", new MyProtocolDecoder()); pipeline.addLast("encoder", new MyProtocolEncoder()); // Tell the pipeline to run MyBusinessLogicHandler's event handler methods // in a different thread than an I/O thread so that the I/O thread is not blocked by // a time-consuming task. // If your business logic is fully asynchronous or finished very quickly, you don't // need to specify a group. pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
由于ChannelPipeline是线程安全的,一个channelhandler能够在任意时间内增长或者删除。例如,当有敏感信息交换时,你能够插入一个加密handler,而后当信息交换结束后删除该handler。
4.3 Channel
Channel是网络socket的一个纽带或者一个处理IO操做如读、写、链接、绑定的组件。一个Channel提供以下信息:
当前channel的状态,如它是否开启?是否链接?
Channel的ChannelConfig的配置参数,如接受缓存大小;
channel支持的IO操做,如读、写、链接、绑定;
channel支持的ChannelPipeline,它处理全部的IO事件和channel关联的请求。
在Netty中全部的IO操做都是异步的。这意味着全部的IO调用将当即返回,但不保证在调用结束时请求的IO操做都已经执行完毕。而是在请求操做处理完成、失败或者取消时返回一个ChannelFuture来通知。
Channel是继承性的。
一个Channel能够它如何建立的来获取它的父Channel(#parent()方法)。例如:一个由ServerSocketChannel接受的SocketChannel调用parent()方法时返回ServerSocketChannel。
继承的结构依赖于Channel的所属transport实现。例如,你能够新写一个Channel实现,它建立了一个共享同一个socket链接的子channel,如BEEP和SSH
一些transport会暴露一些该transport特定的操做。Channel向下转换到子类型能够触发这些操做。例如:老的IO datargram transport,DatagramChannel提供了多播的join和leave操做。
当Channel处理完后,必定记得调用close()或者close(ChannelPromise)来释放资源。
channelFuture是异步IO操做的返回值。
在Netty中全部的IO操做都是异步的。这意味着全部的IO调用将当即返回,但不保证在调用结束时请求的IO操做都已经执行完毕。而是在请求操做处理完成、失败或者取消时返回一个ChannelFuture来通知。
当一个IO操做开始时,建立一个新的future。ChannelFuture要么是uncompleted,要么是completed。新的future开始时是uncompleted---既不是成功、失败,也不是取消,由于IO操做尚未开始呢。若IO操做结束时future要么成功,要么失败或者取消,标记为completed的future有更多特殊的意义,例如失败的缘由。请注意:即便失败和取消也属于completed状态。
有不少方法能够查询IO操做是否完成:等待完成,检索IO操做的结果。一样也容许你增长ChannelFutureListenner,这样你能够在IO操做完成后得到通知。
在尽量的状况下,推荐addListenner()方法而不是await()方法,当IO操做完成后去完成接下来的其它任务时去获取通知。
6.ChannelHandlerContext
对ChannelHandler相关信息的包装。
netty处理请求的总流程是通过ChannelPipeline中的多个ChannelHandler后,返回结果ChannelFuture。以下图所示:
具体I/O操做调用的流程,
应用->Channel的I/O操做->调用Pipeline相应的I/O操做->调用ChannelHandlerContext的相应I/O操做->调用ChannelHandler的相应操做->Channel.UnSafe中相关的I/O操做。
应用为何不直接调用Channel.UnSafe接口中的I/O操做呢,而要绕一个大圈呢?由于它是框架,要支持扩展。
执行者完成操做后,是如何通知命令者的呢?通常流程是这样的:
Channel.UnSafe中执行相关的I/O操做,根据操做结果->调用ChannelPipeline中相应发fireXXXX()接口->调用ChannelHandlerContext中相应的fireXXXX()接口->调用ChannelHandler中相应方法->调用应用中的相关逻辑。
参考文献: