netty学习之一 netty入门

在不少开源项目中都有netty的影子,好比阿里系的rocketmq和dubbo,大数据领elasticsearch,hbase,hadoop都是使用了netty做为底层传输的。到底netty是个啥东东呢,其实 Netty是一个NIO client-server(客户端服务器)框架,使用Netty能够快速开发网络应用,例如服务器和客户端协议。Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是数一数二的,它已经获得成百上千的商用项目验证,例如Hadoop的RPC框架(Remote Procedure Call Protocol ,远程过程调用协议,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议)avro使用Netty做为底层通讯框架。不少其它业界主流的RPC框架,也使用Netty来构建高性能的异步通讯能力。经过对Netty的分析,咱们将它的优势总结以下:java

    1. API使用简单,开发门槛低;
    1. 功能强大,预置了多种编解码功能,支持多种主流协议;
    1. 定制能力强,能够经过ChannelHandler对通讯框架进行灵活的扩展;
    1. 性能高,经过与其它业界主流的NIO框架对比,Netty的综合性能最优;
    1. 成熟、稳定,Netty修复了已经发现的全部JDK NIO BUG,业务开发人员不须要再为NIO的BUG而烦恼;
    1. 社区活跃,版本迭代周期短,发现的BUG能够被及时修复,同时,更多的新功能会被加入;
    1. 经历了大规模的商业应用考验,质量已经获得验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业获得成功商用,证实了它能够彻底知足不一样行业的商业应用。
  • 说到netty不得不说IO,如今通讯方式无非就是三种通讯模式,BIO,NIO,AIO。git

    BIO: 同步并阻塞,服务器实现模式为一个链接一个线程,即客户端有链接请求时服务器端就须要启动一个线程进行处理(一客户端一线程)。该模型最大的问题就是缺少弹性伸缩能力,当客户端并发访问量增长后,服务端的线程数与客户端并发访问数呈1:1的关系,系统性能将急剧降低,随着并发访问量的继续增长,系统会发生线程堆栈溢出、建立新线程失败等问题,并最终致使宕机或僵死。github

    NIO:同步非阻塞,服务器实现模式为一个请求一个线程,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有I/O请求时才启动一个线程进行处理。 对于NIO,有两点须要强调的: (1)关于概念有两种理解,New I/O(相对于以前的I/O库是新增的)和Non-block I/O(非阻塞的)。因为NIO的目标就是让java支持非阻塞I/O,全部更多人喜欢用Non-block I/O。 (2)不少人喜欢将NIO称为异步非阻塞I/O,可是,若是按照严格的NUIX网络编程模型和JDK的实现进行区分,实际上它只是非阻塞I/O,不能称之为异步非阻塞I/O。但因为NIO库支持非阻塞读和写,相对于以前的同步阻塞读和写,它是异步的,所以不少人习惯称NIO为异步非阻塞I/O。编程

AIO:JDK1.7升级了NIO库,升级后的NIO库被称为NIO2.0,正式引入了异步通道的概念。NIO2.0的异步套接字通道是真正的异步非阻塞I/O,此即AIO。其服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。bootstrap

NIO 就是New IO , NIO和IO有相同的做用和目的,但实现方式不一样,NIO主要用到的是块,因此NIO的效率要比IO高不少。NIO主要是经过buffer和channel传递数据,而原始的io 主要是经过字节流输入输出数据。浏览器

Netty就是采用了NIO模型,说了这么多,仍是先来个栗子看看吧:安全

首先构建服务端,SslContext主要是来构建安全套接字,不过这个不是重点, EventLoopGroup bossGroup = new NioEventLoopGroup(1)开始构建boss线程组, ServerBootstrap负责初始化netty服务器,而且开始监听端口的socket请求。ServerBootstrap用一个ServerSocketChannelFactory 来实例化。ServerSocketChannelFactory 有两种选择,一种是NioServerSocketChannelFactory,一种是OioServerSocketChannelFactory。 前者使用NIO,后则使用普通的阻塞式IO。它们都须要两个线程池实例做为参数来初始化,一个是boss线程池,一个是worker线程池。服务器

public final class EchoServer {
    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    public static void main(String[] args) throws Exception {
        // Configure SSL.构建安全套接字
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 [@Override](https://my.oschina.net/u/1162528)
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     p.addLast(new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

ServerBootstrap.bind(int)负责绑定端口,当这个方法执行后,ServerBootstrap就能够接受指定端口上的socket链接了。一个ServerBootstrap能够绑定多个端口。也就是说ServerBootstrap监听的一个端口对应一个boss线程,它们一一对应,在boss线程接收了socket链接请求后,会产生一个channel(一个打开的socket对应一个打开的channel),并把这个channel交给ServerBootstrap初始化时指定的ServerSocketChannelFactory来处理,boss线程则继续处理socket的请求。ServerSocketChannelFactory则会从worker线程池中找出一个worker线程来继续处理这个请求。若是是OioServerSocketChannelFactory的话,那个这个channel上全部的socket消息,从开始到channel(socket)关闭,都只由这个特定的worker来处理,也就是说一个打开的socket对应一个指定的worker线程,这个worker线程在socket没有关闭的状况下,也只能为这个socket处理消息,没法服务其余socket。网络

若是是NioServerSocketChannelFactory的话则否则,每一个worker能够服务不一样的socket或者说channel,worker线程和channel再也不有一一对应的关系。 显然,NioServerSocketChannelFactory只须要少许活动的worker线程及能很好的处理众多的channel,而OioServerSocketChannelFactory则须要与打开channel等量的worker线程来服务。并发

线 程是一种资源,因此当netty服务器须要处理长链接的时候,最好选择NioServerSocketChannelFactory,这样能够避免建立大 量的worker线程。在用做http服务器的时候,也最好选择NioServerSocketChannelFactory,由于现代浏览器都会使用 http keepalive功能(可让浏览器的不一样http请求共享一个信道),这也是一种长链接。

worker线程池中的线程生命周期是怎么样的?当某个channel有消息到达或者有消息须要写入socket的时候,worker线程就会从线程池中取出一个。在worker线程中,消息会通过设定好 的ChannelPipeline处理。ChannelPipeline就是一堆有顺序的filter,它分为两部分:UpstreamHandler和 DownStreamHandler, 客户端送入的消息会首先由许多UpstreamHandler依次处理,处理获得的数据送入应用的业务逻辑handler,对 于Nio当messageReceived()方法执行后,若是没有产生异常,worker线程就执行完毕了,它会被线程池回收。业务逻辑hanlder 会经过一些方法,把返回的数据交给指定好顺序的DownStreamHandler处理,处理后的数据若是须要,会被写入channel,进而经过绑定的 socket发送给客户端。这个过程是由另一个线程池中的worker线程来完成的。 对于Oio来讲,从始到终,都是由一个指定的worker来处理。

减小worker线程的处理占用时间

worker 线程是由netty内部管理,统一调配的一种资源,因此最好应该尽快的把让worker线程执行完毕,返回给线程池回收利用。worker线程的大部分时 间消耗在在ChannelPipeline的各类handler中,而在这些handler中,通常是负责应用程序业务逻辑掺入的那个handler最占 时间,它一般是排在最后的UpstreamHandler。因此经过把这部分处理内容交给另一个线程来处理,能够有效的减小worker线程的周期循环 时间。messageReceived()方法中开启一个新的线程来处理业务逻辑,固然也可使用利用netty框架自带的ExecutionHandler,这个之后在说。。

下面要说明下载本栗子中用于进行业务逻辑处理的EchoServerHandler,实际的开发中更能够拥有多个handler作消息的处理,下面咱们来看下这个handler都作了那些工做。 EchoServerHandler主要是继承ChannelInboundHandlerAdapter,而且复写其中的某几个方法就能够实现本身定义的逻辑,例如咱们能够复写channelRead方法能够获取客户端传过来的消息。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    [@Override]
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf buf = (ByteBuf)msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String response = new String(req,"utf-8");
            System.out.println("收到客户端服务器的请求消息"+response);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端返回的消息hello".getBytes()))
                .addListener(ChannelFutureListener.CLOSE);
    }

    [@Override]
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
    [@Override](https://my.oschina.net/u/1162528)
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }

下面来看看客户端是如何编写的,首先构建客户端线程组,只须要构建一个线程组便可,而且绑定服务器主机的ip和port,而且向pipeline中绑定本身的handler这个handler主要是客户端中须要自定义的业务处理逻辑。

public final class EchoClient {
    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 [@Override](https://my.oschina.net/u/1162528)
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }

下面看看handler的处理逻辑,只是在链接服务器的时候向服务器注册完毕时候及channelActive发送条消息,而且获得服务器返回消息的时候调用channelRead方法,获得消息而且将消息打印。

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        firstMessage = Unpooled.buffer(EchoClient.SIZE);
        for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf buf = (ByteBuf)msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String response = new String(req,"utf-8");
            System.out.println("收到服务端返回的消息"+response);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

总结一下Netty的核心包含如下几个部分:

  • channel:通讯框架的核心抽象
  • bootstrap:应用启动入口
  • buffer:数据交换的载体
  • handler:协议与事件的处理工具

channel下包含的核心概念有:

Channel:通讯通道,对应一个物理链接
ChannelPipeline:事件处理管道
ChannelHandler:事件处理器
ChannelHandlerContext:上下文环境,包含了handler的引用

完整代码连接:https://github.com/winstonelei/Smt

相关文章
相关标签/搜索