Netty实战四之传输(全章)

项目地址:请点击推文结尾阅读原文(码云免费下载、wiki完整阅读)java

流经网络的数据老是具备相同的类型:字节(网络传输——一个帮助咱们抽象底层数据传输机制的概念)编程

Netty为它全部的传输实现提供了一个通用的API,即咱们能够将时间花在其余更有成效的事情上。设计模式

咱们将经过一个案例来对传输进行学习,应用程序只简单地接收链接,向客户端写 “Hi!” ,而后关闭链接。安全

一、不经过Netty使用OIO和NIO服务器

先介绍JDK API的应用程序的阻塞(OIO)版本和异步(NIO)版本。网络

public class PlainNioServer {
    public void server(int port) throws IOException{        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);        ServerSocket ssocket = serverChannel.socket();        InetSocketAddress address = new InetSocketAddress(port);
        //将服务器绑定到选定的端口
        ssocket.bind(address);
        //打开Selector来处理Channel
        Selector selector = Selector.open();
        //将ServerSocket注册到Selector已接受链接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());        for(;;){            try {
                //等待须要处理的新事件;阻塞将一直持续到下一个传入事件
                selector.select();
            }catch (IOException e){
                e.printStackTrace();
                //handle exception                break;
            }
            //获取全部接收事件的SelectionKey实例            Set<SelectionKey> readyKeys = selector.selectedKeys();            Iterator<SelectionKey> iterator = readyKeys.iterator();            while (iterator.hasNext()){                SelectionKey key = iterator.next();                iterator.remove();                try {
                    //检查事件是不是一个新的已经就绪能够被接受的链接                    if (key.isAcceptable()){                        ServerSocketChannel server = (ServerSocketChannel)key.channel();                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        //接受客户端,并将它注册到选择器
                        client.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ,msg.duplicate());                        System.out.println("Accepted connection from " + client);
                    }
                    //检查套接字是否已经准备好写数据                    if (key.isWritable()){                        SocketChannel client = (SocketChannel)key.channel();                        ByteBuffer buffer = (ByteBuffer)key.attachment();                        while(buffer.hasRemaining()){
                            //将数据写到已链接的客户端                            if (client.write(buffer) == 0){                                break;
                            }
                        }
                        //关闭链接
                        client.close();
                    }
                }catch (IOException e){
                    key.cancel();                    try {
                        key.channel().close();
                    }catch (IOException ex){
                        //ignore on close
                    }
                }
            }
        }
    }
}

这段代码彻底能够处理中等数量的并发客户端,可是随着应用程序变得流行起来,你会发现它并不能很好地伸缩到支撑成千上万的并发连入链接。你决定改用异步网络编程,可是很快就发现异步API是彻底不一样的,以致于如今你不得不重写你的应用程序。多线程

public class PlainOioServer {    public void server(int port) throws IOException{        //将服务器绑定到指定端口
        final ServerSocket socket = new ServerSocket(port);        try {            for (;;){                //接受链接
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from " + clientSocket);                //建立一个新的线程来处理该链接
                new Thread(new Runnable() {
                    @Override                    public void run() {
                        OutputStream out;                        try {                            out = clientSocket.getOutputStream();                            //将消息写给已链接的客户端
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            out.flush();                            //关闭链接
                            clientSocket.close();
                        }catch (IOException e){
                            e.printStackTrace();
                        }finally {                            try {
                                clientSocket.close();
                            }catch (IOException e){                                //ignore on close
                            }
                        }
                    }                    //启动线程
                }).start();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

}

虽然这段代码所作的事情与以前的版本彻底相同,可是代码却大相径庭,若是为了用于非阻塞I/O而从新实现这个简单的应用程序,都须要一次彻底重写的话,那么不难想象,移植真正复杂的应用程序须要付出什么样的努力!并发

二、经过Netty使用OIO和NIO框架

public class NettyOioServer {    public void server(int port) throws Exception{        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();        try {            //建立ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)                    //使用OioEventLoopGroup以容许阻塞模式(旧的I/O)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))                    //指定ChannelInitializer,对于每一个已接受的链接都调用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(                                //添加一个ChannelInboundHandlerAdapter以拦截和处理事件
                                new ChannelInboundHandlerAdapter(){                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        ctx.writeAndFlush(buf.duplicate())                                                //将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭链接
                                                .addListener(ChannelFutureListener.CLOSE);
                                    }
                                }
                            );
                        }
                    });            //绑定服务器以接受链接
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally {            //释放全部的资源
            group.shutdownGracefully().sync();
        }
    }
}

三、非阻塞的Netty版本异步

public class NettyNioServer {    public void server(int port) throws Exception{        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));        //为非阻塞模式使用NioEventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();        try {            //建立ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))                    //指定ChannelInitializer,对于每一个已接受的链接都调用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(                                    //添加一个ChannelInboundHandlerAdapter以拦截和处理事件
                                    new ChannelInboundHandlerAdapter(){                                        @Override
                                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())                                                    //将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭链接
                                                    .addListener(ChannelFutureListener.CLOSE);
                                        }
                                    }
                            );
                        }
                    });            //绑定服务器以接受链接
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally {            //释放全部的资源
            group.shutdownGracefully().sync();
        }
    }
}

由于Netty为每种传输的实现都暴露了相同的API,因此不管选用哪种传输的实现,你的代码都仍然几乎不受影响,在全部的状况下,传输的实现都依赖于interface Channel、ChannelPipeline和ChannelHandler。

四、传输API

传输API的核心是interface Channel ,它被用于全部的I/O操做。Channel类的层次结构如图4-1(Channel接口的层次结构)所示。
Netty实战四之传输(全章)
如图所示,每一个Channel都将会将分配一个ChannelPipeline和ChannelConfig。ChannelConfig包含了该Channel的全部配置设置,而且支持热更新。

因为特定的传输可能具备独特的设置,因此它可能会实现一个ChannelConfig的子类型。

因为Channel是独一无二的,因此为了保证顺序将Channel声明为java.lang.Comparable的子接口。所以,若是两个不一样的Channel实例都返回相同的散列码,那么AbstractChannel中的compareTo()方法的实现将会抛出一个Error。

ChannelPiepeline持有全部将应用于入站和出站数据以及事件的ChannelHandler实例,这些ChannelHandler实现了应用程序用于处理状态变化以及数据处理的逻辑。

ChannelHandler的典型用途包括:

-将数据从一种格式转换为另外一种格式

-提供异常的通知

-提供Channel变为活动的或者非活动的通知

-提供当Channel注册到EventLoop或者从EventLoop注销时的通知

-提供有关用户自定义事件的通知

拦截过滤器 ChannelPipeline实现了一种常见的设计模式——拦截过滤器(InterceptingFilter)。UNIX管道是另一个熟悉的例子:多个命令被连接在一块儿,其中一个命令的输出端将链接到命令行中下一个命令的输入端。

你也能够根据须要经过添加或者移除ChannelHandler实例来修改ChannelPipeline。经过利用Netty的这项能力能够构建出高度灵活的应用程序。例如,每当STARTTLS协议被请求时,你能够简单地经过向ChannelPipeline添加一个适当的ChannelHandler(SslHandler)来按需地支持STARTTLS协议。
Netty实战四之传输(全章)
考虑一下写数据并将其冲刷到远程节点这样的常规任务,代码清单4-5演示了使用Channel.writeAndFlush()来实现这一目的。

Channel channel = ...        //建立持有要写数据的ByteBuf
        ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);        //写数据并冲刷它
        ChannelFuture cf = channel.writeAndFlush(buf);        //添加ChannelFutureListener以便在写操做完成后接收通知
        cf.addListener(new ChannelFutureListener() {            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {                //写操做完成,而且没有错误发生
                if (channelFuture.isSuccess()){
                    System.out.println("Write successful");
                } else {                    //记录错误
                    System.out.println("Write error");
                    channelFuture.cause().printStackTrace();
                }
            }
        });

Netty的Channel实现是线程安全的,所以你能够存储一个到Channel的引用,而且每当你须要向远程节点写数据时,均可以使用它,即便当时许多线程都在使用它。代码清单4-6展现了一个多线程写数据的简单例子,须要注意的是,消息将会被保证按顺序发送的。

final Channel channel = ...        //建立持有要写数据的ByteBuf
        final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8).retain();        //建立将数据写到Channel的Runable
        Runnable writer = new Runnable() {            @Override
            public void run() {
                channel.writeAndFlush(buf.duplicate());
            }
        };        //获取到线程池Executor的引用
        Executor executor = Executors.newCachedThreadPool();        //write in one thread
        //递交写任务给线程池以便在某个线程中执行
        executor.execute(writer);        //write in another thread
        //递交另外一个写任务以便在另外一个线程中执行
        executor.execute(writer);

五、内置的传输

Netty内置了一些可开箱即用的传输,由于并非它们全部的传输都支持每一种协议,因此你必须选择一个和你的应用程序所使用的协议相容的传输。 下表显示了全部Netty提供的传输

Netty实战四之传输(全章)

六、NIO——非阻塞I/O

NIO提供了一个全部I/O操做的全异步的实现。它利用了自NIO子系统被引入JDK1.4时即可用的基于选择器的API。

选择器背后的基本概念是充当一个注册表,在那里你将能够请求在Channel的状态发生变化时获得通知。

-新的Channel已被接受而且就绪

-Channel链接已经完成

-Channel有已经就绪的可供读取的数据

-Channel可用于写数据

选择器运行在一个检查状态变化并对其作出相应响应的线程上,在应用程序对状态的改变作出响应以后,选择器将会被重置,并将重复这个过程。

下表中的常量值表明了由class java.nio.channels.SelectionKey定义的位模式。这些位模式能够组合起来定义一组应用程序正在请求通知的状态变化集。
Netty实战四之传输(全章)
对于全部Netty的传输实现都共有的用户级别API彻底地隐藏了这些NIO的内部细节。下图展现了该处理流程。
Netty实战四之传输(全章)

零拷贝:

零拷贝(zero-copy)是一种目前只有在使用NIO和Epoll传输时才可以使用的特性。它使你能够快速高效地将数据从文件系统移动到网络接口,而不须要将其从内核空间复制到用户空间,其在像FTP或者HTTP这样的协议中能够显著地提高性能。可是,并非全部的操做系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。反过来讲,传输已被加密的文件则不是问题。

七、Epoll——用于Linux的本地非阻塞传输

Linux做为高性能网络编程的平台,其重要性与日俱增,这催生了大量先进特性的开发,其中包括Epoll——一个高度可扩展的I/O事件通知特性,这个API自Linux内核版本2.5.44被引入,提供了比旧的POSIX select和poll系统调用更好的性能,同时如今也是Linux上非阻塞网络编程的事实标准。Linux JDK NIO API使用了这些epoll调用。

Netty为Linux提供了一组NIO API,其以一种和它自己的设计更加一致的方式使用epoll,而且以一种更加轻量的方式使用中断,若是你的应用程序旨在运行于Linux系统,那么请考虑利用这个版本的传输,你将发如今高负载下它的性能要优于JDK的NIO实现。

八、OIO——旧的阻塞I/O

Netty的OIO传输实现表明了一种折中:它能够经过常规的传输API使用,可是因为它是创建在java.net包的阻塞实现上的,因此他不是异步的。

例如,你可能须要移植使用了一些进行阻塞调用的库(如JDBC)的遗留代码,而将逻辑转换为非阻塞的可能也是不切实际。相反,你能够在短时间内使用Netty的OIO传输,而后再将你的代码移植到纯粹的异步传输上。

在 java.net API中,你一般会有一个用来接受到达正在监听的ServerSocket的新链接的线程。会建立一个新的和远程节点进行交互的套接字,而且会分配一个新的用于处理响应通讯流量的线程。这是必需的,由于某个指定套接字上的任何I/O操做在任意的时间点上均可能会阻塞。使用单个线程来处理多个套接字,很容易致使一个套接字上的阻塞操做也捆绑了全部其余的套接字。

Netty是如何可以使用和用于异步传输相同的API支持OIO的呢?Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操做完成的最大毫秒数。若是操做在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试,这也是Netty这样的异步框架可以支持OIO的惟一方式。
Netty实战四之传输(全章)

九、用于JVM内部通讯的Local传输

Netty提供了一个Local传输,用于在同一个JVM中运行的客户端和服务器程序之间的异步通讯,且也支持对于全部Netty传输实现都共同的API。

在这个传输中,和服务器Channel相关联的SocketAddress并无绑定物理网络地址;相反,只要服务器还在运行,它就会被存储在注册表里,并在Channel关闭时注销。由于这个传输并不接受真正的网络流量,因此它并不可以和其余传输实现进行互操做。所以,客户端但愿链接到(在同一个JVM中)使用了这个传输的服务器端时也必须使用它。除了这个限制,它的使用方式和其余传输如出一辙。

十、Embedded传输

Netty提供了一种额外的传输,使得你能够将一组ChannelHandler做为帮助器类嵌入到其余的ChannelHandler内部。经过这种方式,你将能够扩展一个CHannelHandler的功能,而又不须要修改其内部代码。

十一、传输的用例

在Linux上启用SCTP

SCTP须要内核的支持,而且须要安装用户库

例如,对于Ubuntn,可使用下面的命令

sudo apt-get install libsctpl
对于Fedora,可使用yum

sudo yum install kernel-modules-extra.x86_64 lksctp-tools.x86_64
有关如何启用SCTP的详细信息,请参考你的Linux发行版的文档。

虽然只有SCTP传输有这些特殊要求,可是其余传输可能也有它们本身的配置选项须要考虑。此外,若是只是为了支持更高的并发链接数,服务器平台可能须要配置得和客户端不同。

——非阻塞代码库:若是你的代码库中没有阻塞调用(或者你可以限制它们的范围),那么在Linux上使用NIO或者epoll始终是个好主意。虽然NIO/Epoll旨在处理大量的并发链接,可是在处理较小数目的并发链接时,它也能很好地工做,尤为是考虑到它在链接之间共享线程的方式。

——阻塞代码库:若是你的代码库严重地依赖于阻塞I/O,并且你的应用程序也有一个相应的设计,那么在你尝试将其直接转换为Netty的NIO传输时,你将可能会遇到和阻塞操做相关的问题。不要为此而重写你的代码,能够考虑分阶段迁移:先从OIO开始,等你的代码修改好以后,在迁移到NIO(或者EPoll,若是你在使用Linux)

——在同一个JVM内部的通讯:同一个JVM内部的通讯,不须要经过网络暴露服务,是Local传输的完美用例。这将消除全部真实网络操做的开销,同时仍然使用你的Netty代码库。若是随后须要经过网络暴露服务,那么你将只须要把传输改成NIO或者OIO便可。

——测试你的ChannelHandler实现:若是你想要为本身的ChannelHandler实现编写单元测试,那么请考虑使用Embedded传输。这既便于测试你的代码,而又不须要建立大量的模拟对象。你的类将仍然符合常规API事件流,保证该Channelhandler在和真实的传输一块儿使用时可以正确地工做。

Netty实战四之传输(全章)

相关文章
相关标签/搜索