简介:java
Netty是一款高效的基于reactor线程模型、异步非阻塞事件驱动的网络编程框架,主要用于网络编程,适用于中间件开发组建、传统应用通信开发。react
Reactor线程模型:线程模型线程模型中有两种角色,一个是主处理线程(组),一个是工做线程(组),在netty中主处理线程和工做线程都由线程组来维护,主处理线程主要用于处理网络事件中的Accept事件,工做线程主要用来处理主线程下发下来的网络连接处理处理任务,如对channel中数据的读写等。编程
使用Netty开发网络应用程序的优点:bootstrap
(1)采用java NIO原生开发网络应用,成本相对较大,须要熟练掌握NIO编程中各组件的正确使用(如:多路复用器、channel、缓存机制等)、架构设计上线程模型的选择及实现、编解码技术、TCP半包处理等等。api
(2)Netty已解决或规避原生java NIO的各类问题或bug,比较醒目的就是多路复用的空轮询。缓存
(3)Netty趋于成熟,已成为不少大型底层中间件网络通信组件(如:Hadoop的RPC框架avro)。网络
(4)易用性,netty已尽量地对外屏蔽了底层网络实现的具体细节,已相对友好的方式对外提供API(具体为ServerBootstrap、Bootstrap、自实现编解码抽象类、自定义Handler等)。架构
开发案例:框架
Client发出查询时间的指令,Server端接收指令并简单判断后,将当前时间写入channel。异步
maven pom.xml中引入:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.0.Final</version> </dependency>
辅助类:
package com.best.diamond.netty.time; /** * Created by hengluwen on 17/10/1. */ public class NettyConstants { public static final String BAD_ORDER = "BAD ORDER"; public static final String QUERY_TIME_ORDER = "QUERY TIME ORDER"; }
Netty Server端:
package com.best.diamond.netty.time; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.util.CharsetUtil; /** * Created by hengluwen on 17/10/1. */ public class TimeServer { private static final StringDecoder DECODER = new StringDecoder(CharsetUtil.UTF_8); public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) //服务端的ServerBootstrap中增长了一个方法childHandler,它的目的是添加handler(包括用户自定义的handler),用来监听已经链接的客户端的Channel的动做和状态。 .childHandler(new ChildChannelHandler()); //绑定端口,同步等待成功 ChannelFuture channelFuture = b.bind(port).sync(); //等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //如下两行代码为了解决半包读问题 pipeline.addLast("framer", new LineBasedFrameDecoder(1024)); pipeline.addLast("decoder", DECODER); pipeline.addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { new TimeServer().bind(8080); } }
服务端代码中首先建立了两个NioEventLoopGroup实例,NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,实际上它们就是Reactor线程组,建立两个的缘由是:一个用于服务端接收客户端的连接,一个用于进行SocketChannel的读写。设置建立Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类,而后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024(该参数表示未完成TCP三次握手的连接数与完成TCP三次握手的链接数的总和,这个参数会限制client的总连接数),ServerBootstrap是netty服务端启动辅助类,目的为下降服务端开发的复杂度。绑定I/O事件的处理类ChailChannelHandler,它的做用相似于Reactor模式中的Handler类,主要用于处理网络I/O事件。
package com.best.diamond.netty.time; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.Date; /** * Created by hengluwen on 17/10/1. */ public class TimeServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String request) throws Exception { System.out.println("The time server receive order : " + request); String currentTime = NettyConstants.QUERY_TIME_ORDER.equalsIgnoreCase(request) ? new Date().toString() : NettyConstants.BAD_ORDER; sendMessage(ctx, currentTime); } private void sendMessage(ChannelHandlerContext ctx, String data) { ByteBuf byteBuf = Unpooled.copiedBuffer(data.getBytes()); ctx.writeAndFlush(byteBuf); } }
当接收到client发送的数据时,因为在启动时已经加入了String类型的消息解码器(StringDecoder,此消息解码器由Netty提供,不须要自定义),所以用户自定义的Handler中接收到的request的数据类型为String,没必要再次进行手工解码。
Netty Client端:
package com.best.diamond.netty.time; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; /** * Created by hengluwen on 17/10/1. */ public class TimeClient { public void connect(String host, int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioServerSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeClientHandler()); } }); ChannelFuture channelFuture = b.connect(new InetSocketAddress(host, port)).sync(); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) { try { new TimeClient().connect("127.0.0.1", 8080); } catch (Exception e) { e.printStackTrace(); } } }
client端的启动代码与server相似,因为client更多的是要处理网络事件的I/O,且正常状况下client只会处理一个或两个channel(channel失效重连时),所以只须要一个线程组便可。
package com.best.diamond.netty.time; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * Created by hengluwen on 17/10/1. */ public class TimeClientHandler extends SimpleChannelInboundHandler<String> { private final ByteBuf firstMessage; public TimeClientHandler() { firstMessage = Unpooled.buffer().writeBytes(NettyConstants.QUERY_TIME_ORDER.getBytes()); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { System.out.println("error" + future.cause()); } } }); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String request) throws Exception { System.out.println("Now is : " + request); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
channel激活时,发出指定,等待服务端的相应。
总结:
(1)netty是一个高效的异步非阻塞的网络通信框架,开发简单。
(2)netty易用性强,提供了大量的编码器/解码器,已经处理TCP半包问题的辅助类。
(3)netty的api设计的强大而友好。