把书读薄(Netty in Action第二章) + Netty启动源码java
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[]args)throws Exception{ new EchoServer(8888).start(); } public void start() throws Exception{ final EchoServerHandler handler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(handler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); }finally { group.shutdownGracefully().sync(); } } }
1: 初始化EventLoopGroupsegmentfault
所谓的EventLoopGroup,组(group)的概念表如今它自身维护了一个数组children,默认维护逻辑处理核数2倍的NioEventLoop线程,并经过chooser来方便的获取下一个要执行的线程。实际处理的是NioEventLoop,它的部分类结构以下:数组
实质上的线程执行,就是启动一个java Thread,而后从taskQuene中获取要执行的任务,在run方法中执行。ide
2:配置引导类ServerBootstrap做为工具来引导channel创建工具
Hello word版代码中用的是同一个NioEventLoop,实际中通常各自分配
3 :建立并初始化channel oop
在管道的最后添加ChannelInitializer的方式则会在管道注册完成以后,往管道中 添加一个ServerBootstrapAcceptor(它是InboundHandler),它持有对childGroup(client)和childHandler的引用,而ChannelInitializer这个InboundHandler在完成它的使命以后,就会从管道中被移除, 至此完成channel的初始化。ui
ServerBootstrapAcceptor 最开始在客户端创建链接的时候执行调用(后续读消息调用),入口是 doReadMessages,读到消息以后,从Head沿着InBoundHandler到ServerBootstrapAcceptor,触发读事件,此时执行注册childGroup到这个channel,也就是每次都用childGroup来处理读到的消息
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { //管道注册完成以后触发 ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { initChannel((C) ctx.channel()); //执行注册过程当中的方法,在这里就是往管道中添加ServerBootstrapAcceptor pipeline.remove(this); //删除ChannelInitializer自己 ctx.fireChannelRegistered(); //继续沿着管道传递channel注册完成事件 success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } }
新建的NioServerSocketChannel的部分类结构以下:this
对于Netty来说channel有"两个"spa
凡是经过 channel()方法获取的则是Netty自身的channel
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; //Netty自身的channel tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
凡是经过javachannel()调用的获取到的值便是jdk的channel
,而unsafe自己真正意义上执行的register、bind、connect、write、read操做均经过ServerSocketChannel实现4: 执行channel的注册线程
能够看到注册过程当中实际的注册操做经理了从channel->unsafe->ch的一个过程,实际的注册操做就是使用jdk完成的。
5:执行channel的绑定
可以看到的是,绑定操做也是经过jdk来实现绑定的。另外同步阻塞住server,使之不关闭,实际上也就是只要CloseFuture不完成,那么server主线程永远阻塞住,由刚开始分配的NioEventLoop一直在运行各自的task
netty nio底层的注册channel、绑定监听端口都是经过jdk自身的nio完成的。java nio中的select和channel是怎么使用的?
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.printf("Server get:"+in.toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //将目前暂存于ChannelOutboundBuffer中的消息在下一次flush或者writeAndFlush的时候冲刷到远程并关闭这个channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
public class EchoClient { private final String host; private final int port; public EchoClient(String host,int port){ this.host=host; this.port=port; } public void start() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class)//指定NIO的传输方式 .remoteAddress(new InetSocketAddress(host,port))//指定远程地址 .handler(new ChannelInitializer<SocketChannel>() {//向channel的pipeline添加handler @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler());//channelHander交给pipeline } }); ChannelFuture f = b.connect().sync();//链接到远程节点,阻塞直到链接完成 System.out.println("wait"); f.channel().closeFuture().sync();//阻塞直到链接关闭 System.out.println("over"); }finally { System.out.println("shutdown"); group.shutdownGracefully().sync();//关闭线程池而且释放资源 } } public static void main(String[]args) throws Exception{ new EchoClient("localhost",8888).start(); } }
从代码自己能够看到与 server的差别化在于如下两个部分:
注意这里的其实是没有指定本地的地址的
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world",CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Client get:"+msg.toString(CharsetUtil.UTF_8)); } }