什么是netty?先看下百度百科的解释:java
为何好多大公司都在使用netty框架?主要是基于netty框架的如下几个特色决定的:编程
1)健壮性,2)功能齐全,3)可定制,4)扩展性bootstrap
传统的RPC性能差,主要是因为客户端和远程调用采用了同步阻塞IO线程,当客户端的并发压力增大后,同步阻塞会因为频繁的等待致使I/O线程堵塞,线程没法高效的工做,IO处理能力天然会下降。影响性能的三个因素:第一,IO模型,IO模型在必定程度上决定了框架的性能。第2、协议,如:HTTP、TCP/IP等,协议选择的不一样,性能模型也不一样,一般状况下,内部私有协议的性能比较优,这是因为内部设计决定的。第3、线程,数据报文的接收、读取、编码、解码等,线程模型的不一样,性能也不一样。相比于传统的RPC框架,netty的优势主要体如今如下几个方面:服务器
Netty是一个基于三层网络架构模型的框架,三层网络架构分析包括调度层、链条传递层以及业务逻辑层。网络
NIO线程池组件{多线程
监听网络读写链接架构
业务调度处理并发
NIO,AIO,配合NIO通道NioSocketChannel组件框架
}异步
Netty经过内部select巡查机制,可以实现IO多路复用,经过把多个IO阻塞复用到同一个select的阻塞上,从而可以使系统即便在单线程的状况下,也可以同时处理多个请求。这样就使得netty实现了IO多路复用的优点,与传统多线程相比,大大减小了系统的开销,由于系统没必要建立新的线程和销毁线程了,减小了系统的维护难度,节省了资源。
ByteBuffer池化支持,不用手动切换标志位,实现零拷贝。传统的Socket读写,基本是使用堆内存进行,即jvm事先会把堆内存拷贝到内存中,而后再写入Socket,而netty采用的是DIRECT BUFFERS,不须要通过jvm内存拷贝,在堆外内存直接进行Socket读写,这样就少了一次缓冲区的内存拷贝,从而实现零拷贝。
2.Pipleline职责链条传递
拦截处理向前向后事件,外部传入的消息包对象,有POJO信息抽象,上层也只须要处理逻辑,相似SpringIOC处理BeanDefince。不一样的Handler节点的功能也不一样,一般状况下须要编码解码等,它能够完成外部协议到内部POJO对象的转化,这样上层只须要关注业务逻辑,不须要知道底层的协议和线程模型,从而实现解耦。
3.构建逻辑业务处理单元
底层的协议隔离,上层处理逻辑框架并不须要关心底层协议是什么。Netty框架的分层设计使得开发人员不须要关注协议框架的实现,只须要关注服务层的业务逻辑开发便可,实现了简单化。
以前有个项目是基于传统Socket和线程池的技术实现的,可是在高并发的时候发现并发能力不足,压测的时候发现TPS达不到理想值,因此通过考虑,决定使用netty框架来解决此问题。一样,netty框架也分为客户端和服务端,通过整理,先写一个demo初探netty框架,下面是代码的实现过程。
首先是服务端,服务端包含两个方面,第1、服务端Server的主要做用就是经过辅助引导程序,设置NIO的链接方式处理客户端请求,经过绑定特定端口、设定解码方式以及监听来实现整个线程的处理请求;第2、服务端Handler须要继承ChannelInboundHandlerAdapter类,handler类的主要做用是读取客户端数据,处理业务,抛出异常,响应客户端请求。代码以下:
服务端Server:
public class Server { private static Log logger = LogFactory.getLog(Server.class); private int port; public Server(int port) { super(); this.port = port; } public void start(){ ServerBootstrap b = new ServerBootstrap();//引导辅助程序 EventLoopGroup group = new NioEventLoopGroup();//经过nio方式来接收链接和处理请求 try { b.group(group); b.channel(NioServerSocketChannel.class);//设置nio类型的channnel b.localAddress(new InetSocketAddress(port));//设置监听端口 //b.option(ChannelOption.SO_BACKLOG, 2048); b.childHandler(new ChannelInitializer<SocketChannel>() {//有链接到达时会建立一个channel @Override protected void initChannel(SocketChannel ch) throws Exception { //注册handler ch.pipeline().addLast(new ByteArrayDecoder()); ch.pipeline().addLast(new ByteArrayEncoder()); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new ServerHandler()); } });//.option(ChannelOption.SO_BACKLOG, 2048).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind().sync();//配置完成,开始绑定server,经过调用sync同步方法阻塞直到绑定成功 logger.info(Server.class.getName()+"开始监听:"+f.channel().localAddress()); f.channel().closeFuture().sync();//应用程序会一直等待直到channel关闭 } catch (Exception e) { e.printStackTrace(); } finally { try { //关闭EventLoopGroup,释放掉全部资源包括建立的线程 group.shutdownGracefully().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
服务端Handler
public class ServerHandler extends ChannelInboundHandlerAdapter { private static Log logger=LogFactory.getLog(ServerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx){ logger.info(ctx.channel().localAddress().toString()+"通道活跃...."); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.error(ctx.channel().localAddress().toString()+"通道不活跃...."); } /** * * 读取客户端传过来的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //业务处理类 logger.info("开始业务处理...."); new SocketController(ctx,msg).run(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //出现异常,关闭连 logger.error("服务端出现异常:"+cause.getMessage(),cause); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("服务端完成请求!"); ctx.flush(); } }
客户端代码
客户端主要是用来向服务端发送数据,一样包含两个方面,第1、Client主要经过设定端口和IP和服务器创建链接,进行数据包的编码;第2、ClientHandler 须要继承 SimpleChannelInboundHandler<ByteBuf>类,针对不一样的传输方式,继承不一样的类,handler类一样处理业务请求,响应服务端的请求。代码以下:
客户端Client:
public class Client { private static Log logger=LogFactory.getLog(Client.class); private String host; private int port; public Client(String host, int port) { super(); this.host = host; this.port = port; } public void connect(){ EventLoopGroup workGroup=new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.group(workGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { logger.info("客户端触发链接......"); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new ClientHandler()); } }); //客户端开始链接 try { logger.info("链接到服务器......"); ChannelFuture future=bootstrap.connect(host,port).sync(); //等待链接关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ workGroup.shutdownGracefully(); } } }
客户端Handler:
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private static Log logger=LogFactory.getLog(ClientHandler.class); /** * 向服务端发送消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info(ctx.channel().localAddress().toString()+"客户点活跃..."); //向服务端写字符串 logger.info("客户端链接服务端,开始发送数据....."); String string ="hello server!"; System.out.println("发送数据为:"+string); ByteBuf buf=ctx.alloc().buffer(4*string.length()); buf.writeBytes(string.getBytes()); ctx.writeAndFlush(buf); logger.info("发送完毕..."); } /** * 读取服务端返回来的消息 */ @Override protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception { logger.info("开始接受服务端数据"); byte[] b=new byte[in.readableBytes()]; in.readBytes(b); String string=new String(b); logger.info("服务端发送的数据为:"+string); in.release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.info("客户端异常:"+cause.getMessage(),cause); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("客户端完成请求...."); ctx.flush(); } }
服务端启动:
public class ServerMain { private static Log logger=LogFactory.getLog(ServerMain.class); private static Server server =new Server(55550); public static void main(String[] args) { logger.info("服务端启动......."); server.start(); } }
客户端启动类:
public class Test { private static Client client = new Client("127.0.0.1", 55550); public static void main(String[] args) throws UnknownHostException, IOException { client.connect(); } }
测试结果:
服务端:
客户端:
以上只是一个netty框架初探的小Demo,学习使用netty框架的开始,这里面涉及到了不少的技术以及很是多的组件,好比:Channels、Callbacks、Futures、Events和handlers等等,须要进一步的学习,另外,消息的编码解码、粘包、拆包的方式方法、消息格式的转换以及报文格式大小限制都须要进一步的研究学习。