读懂Netty服务端开发(附学习代码)

使用JDK NIO类库时开发NIO的异步服务端时,须要用到:多路复用器Selector,ServerSocketChannel,SocketChanel,ByteBuffer,SelectionKey等。若是用源生的JAVA NIO搭建服务端,无疑是十分复杂的,这不只拖慢了项目的进度,并且开发出来的项目可能还不稳定。那么Netty是如何下降其复杂度的呢?Netty开发简单异步服务端不超过9步,下面分步骤来说解用Netty来开发异步服务端:java

第一步:建立EventLoopGroupbootstrap

//第一步 建立EventLoopGroup 
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

讲解:EventLoopGroup 是EventLoop的数组,EventLoop的职责是处理全部注册到本线程多路复用器Selector上的Channel,Selector的轮询操做由EventLoop线程的run方法驱动。数组

第二步:建立ServerBootstrap安全

ServerBootstrap b = new ServerBootstrap();

讲解:ServerBootstrap是Netty服务端启动的辅助类,从代码能够看出,咱们在建立ServerBootstrap时,用的是一个无参构造器建立的,你也会惊讶的发现,ServerBootstrap只有一个无参构造器,由于这个辅助类须要的参数太多了,并且有些参数是用户能够选择性添加的,因此ServerBootstrap引入了Builder模式 让用户动态添加参数:服务器

ServerBootstrap b = new ServerBootstrap();
//动态添加参数,支持链式添加
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new HttpServerHandler());

第三步:设置并绑定服务端Channel(开始绑定ServerSocketChannel)网络

b.channel(NioServerSocketChannel.class)

讲解:这里只须要传入对应的.class类便可,ServerBootstrap会经过工厂类,利用反射帮咱们建立对应的Class对象。异步

第四步:建立并初始化处理网络时间的职责链 ChannelPipelinesocket

ch.pipeline().addLast(new LoggingHandler());
		//请求 http解码
		ch.pipeline().addLast("http-decoder",new HttpRequestDecoder());
		//将多个消息转换为单一的FullHttpRequest
		ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536));
		//应答http编码
		ch.pipeline().addLast("http-encoder",new HttpResponseEncoder());
		//链接异步发送请求,防止内存溢出
		ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
		ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter());

讲解:ChannelPipeline并非NIO服务端必须的,它本质就是一个处理网络事件的职责链,负责管理和执行ChannelHandler,咱们能够在其中添加服务端接收请求的解码和发送请求的编码,以及粘包\拆包的处理等。ide

第五步:添加并设置ChannelHandleroop

ChannelHandler是Netty提供给咱们定制和扩展的关键接口,利用ChannelHandler咱们能够完成大多数功能的定制,例如消息编解码,心跳安全认证等,下面是我定义的Handler:

ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			.handler(new LoggingHandler())
			
			//加入自定义的handler
			.childHandler(new HttpServerHandler());



//我定义的handler
public class HttpServerHandler extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ch.pipeline().addLast(new LoggingHandler());
		//请求 http解码
		ch.pipeline().addLast("http-decoder",new HttpRequestDecoder());
		//将多个消息转换为单一的FullHttpRequest
		ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536));
		//应答http编码
		ch.pipeline().addLast("http-encoder",new HttpResponseEncoder());
		//链接异步发送请求,防止内存溢出
		ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
		ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter());
		
		
	}

//第二层对消息处理
public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler<FullHttpRequest>{

	@Override
	public boolean acceptInboundMessage(Object msg) throws Exception {
		System.out.println(msg);
		return super.acceptInboundMessage(msg);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
		System.out.println(msg);
	}
	


}

第六步:绑定并启动监听端口:

ChannelFuture f = b.bind(8080).sync();
			System.out.println("Http服务器启动完成,监听端口为:"+port);
			f.channel().closeFuture().sync();

讲解:在监听端口以前,Netty会作一系列的初始化和检测工做,而后启动监听端口,同时,会将ServerSocketChannel注册到多路复用器Selector上,监听客户端的链接。

第七步:Selector轮询:

讲解:Selector的轮询是由Reactor线程NioEventLoop负责调度的,而后将准备就绪的Channel放到一个集合中;

第八步:Reactor线程NioEventLoop将准备就绪的Channel去执行咱们定义好的ChannelPipeline方法,并执行系统定义的ChannelHandler;

第九步:最终,执行咱们根据具体业务逻辑定义好的的ChannelHandler。

完整源码以下:

HttpServer:

package kaoqin.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;

public class HttpServer {
	
	private void run(int port) throws InterruptedException{
		//第一步 建立EventLoopGroup 
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try{
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			.handler(new LoggingHandler())
			
			//加入自定义的handler
			.childHandler(new HttpServerHandler());
			//.option(ChannelOption.AUTO_READ,true);
			ChannelFuture f = b.bind(port).sync();
			System.out.println("Http服务器启动完成,监听端口为:"+port);
			f.channel().closeFuture().sync();
		}finally{
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	public static void main(String[] args) throws InterruptedException {
		new HttpServer().run(9999);
	}
}

HttpServerHandler:

package kaoqin.server;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class HttpServerHandler extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ch.pipeline().addLast(new LoggingHandler());
		//请求 http解码
		ch.pipeline().addLast("http-decoder",new HttpRequestDecoder());
		//将多个消息转换为单一的FullHttpRequest
		ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536));
		//应答http编码
		ch.pipeline().addLast("http-encoder",new HttpResponseEncoder());
		//链接异步发送请求,防止内存溢出
		ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
		ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter());
		
		
	}

}

HttpServerHandlerAdapter:

package kaoqin.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;

public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler<FullHttpRequest>{

	@Override
	public boolean acceptInboundMessage(Object msg) throws Exception {
		System.out.println(msg);
		return super.acceptInboundMessage(msg);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
		System.out.println(msg);
	}
	


}
相关文章
相关标签/搜索