第四章:Transports(传输)

本章内容java

Transports(传输)bootstrap

NIO(non-blocking IO,New IO), OIO(Old IO,blocking IO), Local(本地), Embedded(嵌入式)安全

Use-case(用例)服务器

APIs(接口)网络

网络应用程序都是以字节码传输。Java开发网络程序传输数据的过程和方式是被抽象了的,咱们不须要关注底层接口,只须要使用Java API或其余网络框架如Netty就能达到传输数据的目的。发送数据和接收数据都是字节码。Nothing more,nothing less。多线程

咱们拿Netty的API和Java的API作比较来告诉你为何Netty能够更容易的使用?并发

4.1 案例研究:切换传输方式app

4.1.1 使用Java的I/O和NIO

  咱们将不用Netty实现这个例子,下面代码是使用阻塞IO实现的例子:框架

package netty.in.action;  
import java.io.IOException;  
import java.io.OutputStream;  
import java.net.ServerSocket;  
import java.net.Socket;  
import java.nio.charset.Charset;  
/** 
 * Blocking networking without Netty 
 * @author c.k 
 * 
 */  
public class PlainOioServer {  
      
    public void server(int port) throws Exception {  
        //bind server to port  
        final ServerSocket socket = new ServerSocket(port);  
        try {  
            while(true){  
                //accept connection  
                final Socket clientSocket = socket.accept();  
                System.out.println("Accepted connection from " + clientSocket);  
                //create new thread to handle connection  
                new Thread(new Runnable() {  
                    @Override  
                    public void run() {  
                        OutputStream out;  
                        try{  
                            out = clientSocket.getOutputStream();  
                            //write message to connected client  
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));  
                            out.flush();  
                            //close connection once message written and flushed  
                            clientSocket.close();  
                        }catch(IOException e){  
                            try {  
                                clientSocket.close();  
                            } catch (IOException e1) {  
                                e1.printStackTrace();  
                            }  
                        }  
                    }  
                }).start();//start thread to begin handling  
            }  
        }catch(Exception e){  
            e.printStackTrace();  
            socket.close();  
        }  
    }  
}

这种阻塞模式在大链接数的状况就会有很严重的问题,如客户端链接超时,服务器响应严重延迟。为了解决这种状况,咱们可使用异步网络处理全部的并发链接。less

问题在于NIO和OIO的API是彻底不一样的,因此一个用OIO开发的网络应用程序想要使用NIO重构代码几乎是从新开发。

下面代码是使用Java NIO实现的例子:

package netty.in.action;  
  
import java.net.InetSocketAddress;  
import java.net.ServerSocket;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.channels.SocketChannel;  
import java.util.Iterator;  
/** 
 * Asynchronous networking without Netty 
 * @author c.k 
 * 
 */  
public class PlainNioServer {  
    public void server(int port) throws Exception {  
        System.out.println("Listening for connections on port " + port);  
        //open Selector that handles channels  
        Selector selector = Selector.open();  
        //open ServerSocketChannel  
        ServerSocketChannel serverChannel = ServerSocketChannel.open();  
        //get ServerSocket  
        ServerSocket serverSocket = serverChannel.socket();  
        //bind server to port  
        serverSocket.bind(new InetSocketAddress(port));  
        //set to non-blocking  
        serverChannel.configureBlocking(false);  
        //register ServerSocket to selector and specify that it is interested in new accepted clients  
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);  
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());  
        while (true) {  
            //Wait for new events that are ready for process. This will block until something happens  
            int n = selector.select();  
            if (n > 0) {  
                //Obtain all SelectionKey instances that received events  
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();  
                while (iter.hasNext()) {  
                    SelectionKey key = iter.next();  
                    iter.remove();  
                    try {  
                        //Check if event was because new client ready to get accepted  
                        if (key.isAcceptable()) {  
                            ServerSocketChannel server = (ServerSocketChannel) key.channel();  
                            SocketChannel client = server.accept();  
                            System.out.println("Accepted connection from " + client);  
                            client.configureBlocking(false);  
                            //Accept client and register it to selector  
                            client.register(selector, SelectionKey.OP_WRITE, msg.duplicate());  
                        }  
                        //Check if event was because socket is ready to write data  
                        if (key.isWritable()) {  
                            SocketChannel client = (SocketChannel) key.channel();  
                            ByteBuffer buff = (ByteBuffer) key.attachment();  
                            //write data to connected client  
                            while (buff.hasRemaining()) {  
                                if (client.write(buff) == 0) {  
                                    break;  
                                }  
                            }  
                            client.close();//close client  
                        }  
                    } catch (Exception e) {  
                        key.cancel();  
                        key.channel().close();  
                    }  
                }  
            }  
        }  
    }  
}

如你所见,即便它们实现的功能是同样,可是代码彻底不一样。下面咱们将用Netty来实现相同的功能。

4.1.2 Netty中使用I/O和NIO

下面代码是使用Netty做为网络框架编写的一个阻塞IO例子:

package netty.in.action;  
import java.net.InetSocketAddress;  
import io.netty.bootstrap.ServerBootstrap;  
import io.netty.buffer.ByteBuf;  
import io.netty.buffer.Unpooled;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelFutureListener;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.ChannelInboundHandlerAdapter;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.oio.OioServerSocketChannel;  
import io.netty.util.CharsetUtil;  
public class NettyOioServer {  
  
    public void server(int port) throws Exception {  
        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));  
        //事件循环组  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            //用来引导服务器配置  
            ServerBootstrap b = new ServerBootstrap();  
            //使用OIO阻塞模式  
            b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
            //指定ChannelInitializer初始化handlers  
                    .childHandler(new ChannelInitializer<Channel>() {  
                        @Override  
                        protected void initChannel(Channel ch) throws Exception {  
                            //添加一个“入站”handler到ChannelPipeline  
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
                                @Override  
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {  
                                    //链接后,写消息到客户端,写完后便关闭链接  
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);  
                                }  
                            });  
                        }  
                    });  
            //绑定服务器接受链接  
            ChannelFuture f = b.bind().sync();  
            f.channel().closeFuture().sync();  
        } catch (Exception e) {  
            //释放全部资源  
            group.shutdownGracefully();  
        }  
    }  
}

面代码实现功能同样,但结构清晰明了,这只是Netty的优点之一。

4.1.3 Netty中实现异步支持

下面代码是使用Netty实现异步,能够看出使用Netty由OIO切换到NIO是很是的方便。

package netty.in.action;  
  
import io.netty.bootstrap.ServerBootstrap;  
import io.netty.buffer.ByteBuf;  
import io.netty.buffer.Unpooled;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelFutureListener;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.ChannelInboundHandlerAdapter;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.SocketChannel;  
import io.netty.channel.socket.nio.NioServerSocketChannel;  
import io.netty.util.CharsetUtil;  
import java.net.InetSocketAddress;  
public class NettyNioServer {  
    public void server(int port) throws Exception {  
        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));  
        // 事件循环组  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            // 用来引导服务器配置  
            ServerBootstrap b = new ServerBootstrap();  
            // 使用NIO异步模式  
            b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
            // 指定ChannelInitializer初始化handlers  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {  
                            // 添加一个“入站”handler到ChannelPipeline  
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
                                @Override  
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {  
                                    // 链接后,写消息到客户端,写完后便关闭链接  
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);  
                                }  
                            });  
                        }  
                    });  
            // 绑定服务器接受链接  
            ChannelFuture f = b.bind().sync();  
            f.channel().closeFuture().sync();  
        } catch (Exception e) {  
            // 释放全部资源  
            group.shutdownGracefully();  
        }  
    }  
}

由于Netty使用相同的API来实现每一个传输,它并不关心你使用什么来实现。Netty经过操做Channel接口和ChannelPipeline、ChannelHandler来实现传输。

4.2 Transport API

 传输API的核心是Channel接口,它用于全部出站的操做。Channel接口的类层次结构以下

如上图所示,每一个Channel都会分配一个ChannelPipeline和ChannelConfig。

ChannelConfig负责设置并存储配置,并容许在运行期间更新它们。传输通常有特定的配置设置,只做用于传输,没有其余的实现。

ChannelPipeline容纳了使用的ChannelHandler实例,这些ChannelHandler将处理通道传递的“入站”和“出站”数据。ChannelHandler的实现容许你改变数据状态和传输数据。

如今咱们可使用ChannelHandler作下面一些事情:

    a. 传输数据时,将数据从一种格式转换到另外一种格式

    b. 异常通知

    c. Channel变为有效或无效时得到通知

    d. Channel被注册或从EventLoop中注销时得到通知

    e. 通知用户特定事件

这些ChannelHandler实例添加到ChannelPipeline中,在ChannelPipeline中按顺序逐个执行。它相似于一个链条,有使用过Servlet的读者可能会更容易理解。

ChannelPipeline实现了拦截过滤器模式,这意味着咱们链接不一样的ChannelHandler来拦截并处理通过ChannelPipeline的数据或事件。

能够把ChannelPipeline想象成UNIX管道,它容许不一样的命令链(ChannelHandler至关于命令)。

你还能够在运行时根据须要添加ChannelHandler实例到ChannelPipeline或从ChannelPipeline中删除,这能帮助咱们构建高度灵活的Netty程序。

此外,访问指定的ChannelPipeline和ChannelConfig,你能在Channel自身上进行操做。

Channel提供了不少方法,以下列表:

    eventLoop(),返回分配给Channel的EventLoop

    pipeline(),返回分配给Channel的ChannelPipeline

    isActive(),返回Channel是否激活,已激活说明与远程链接对等

    localAddress(),返回已绑定的本地SocketAddress

    remoteAddress(),返回已绑定的远程SocketAddress

    write(),写数据到远程客户端,数据经过ChannelPipeline传输过去

写数据到远程已链接客户端能够调用Channel.write()方法,以下代码:

Channel channel = ...  
//Create ByteBuf that holds data to write  
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);  
//Write data  
ChannelFuture cf = channel.write(buf);  
//Add ChannelFutureListener to get notified after write completes  
cf.addListener(new ChannelFutureListener() {  
    @Override  
    public void operationComplete(ChannelFuture future) {  
        //Write operation completes without error  
        if (future.isSuccess()) {  
            System.out.println(.Write successful.);  
        } else {  
            //Write operation completed but because of error  
            System.err.println(.Write error.);  
            future.cause().printStacktrace();  
        }  
    }  
});

Channel是线程安全(thread-safe)的,它能够被多个不一样的线程安全的操做,在多线程环境下,全部的方法都是安全的。正由于Channel是安全的,咱们存储对Channel的引用,并在学习的时候使用它写入数据到远程已链接的客户端,使用多线程也是如此。

下面的代码是一个简单的多线程例子:

final Channel channel = ...  
//Create ByteBuf that holds data to write  
final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);  
//Create Runnable which writes data to channel  
Runnable writer = new Runnable() {  
    @Override  
    public void run() {  
        channel.write(buf.duplicate());  
    }  
};  
//Obtain reference to the Executor which uses threads to execute tasks  
Executor executor = Executors.newChachedThreadPool();  
// write in one thread  
//Hand over write task to executor for execution in thread  
executor.execute(writer);  
// write in another thread  
//Hand over another write task to executor for execution in thread  
executor.execute(writer);

此外,这种方法保证了写入的消息以相同的顺序经过写入它们的方法。想了解全部方法的使用能够参考Netty API文档。

4.3 Netty包含的传输实现

Netty中的传输方式有以下几种:

    NIO,io.netty.channel.socket.nio,基于java.nio.channels的工具包,使用选择器做为基础的方法。

    OIO,io.netty.channel.socket.oio,基于java.net的工具包,使用阻塞流。

    Local,io.netty.channel.local,用来在虚拟机之间本地通讯。

    Embedded,io.netty.channel.embedded,嵌入传输,它容许在没有真正网络的运输中使用ChannelHandler,能够很是有用的来测试ChannelHandler的实现。

4.3.1 NIO - Nonblocking I/O

NIO传输是目前最经常使用的方式,它经过使用选择器提供了彻底异步的方式操做全部的I/O,NIO从Java 1.4才被提供。NIO中,咱们能够注册一个通道或得到某个通道的改变的状态,通道状态有下面几种改变:

    一个新的Channel被接受并已准备好

    Channel链接完成

    Channel中有数据并已准备好读取

    Channel发送数据出去

处理完改变的状态后需从新设置他们的状态,用一个线程来检查是否有已准备好的Channel,若是有则执行相关事件。在这里可能只同时一个注册的事件而忽略其余的。选择器所支持的操做在SelectionKey中定义,具体以下:

    OP_ACCEPT,有新链接时获得通知

    OP_CONNECT,链接完成后获得通知

    OP_READ,准备好读取数据时获得通知

    OP_WRITE,写入数据到通道时获得通知

Netty中的NIO传输就是基于这样的模型来接收和发送数据,经过封装将本身的接口提供给用户使用,这彻底隐藏了内部实现。如前面所说,Netty隐藏内部的实现细节,将抽象出来的API暴露出来供使用,下面是处理流程图:

NIO在处理过程也会有必定的延迟,若链接数不大的话,延迟通常在毫秒级,可是其吞吐量依然比OIO模式的要高。Netty中的NIO传输是“zero-file-copy”,也就是零文件复制,这种机制可让程序速度更快,更高效的从文件系统中传输内容,零复制就是咱们的应用程序不会将发送的数据先复制到JVM堆栈在进行处理,而是直接从内核空间操做。接下来咱们将讨论OIO传输,它是阻塞的。

4.3.2 OIO - Old blocking I/O

OIO就是java中提供的Socket接口,java最开始只提供了阻塞的Socket,阻塞会致使程序性能低。下面是OIO的处理流程图,若想详细了解,能够参阅其余相关资料。

4.3.3 Local - In VM transport

Netty包含了本地传输,这个传输实现使用相同的API用于虚拟机之间的通讯,传输是彻底异步的。每一个Channel使用惟一的SocketAddress,客户端经过使用SocketAddress进行链接,在服务器会被注册为长期运行,一旦通道关闭,它会自动注销,客户端没法再使用它。

链接到本地传输服务器的行为与其余的传输实现几乎是相同的,须要注意的一个重点是只能在本地的服务器和客户端上使用它们。Local未绑定任何Socket,值提供JVM进程之间的通讯。

4.3.4 Embedded transport

Netty还包括嵌入传输,与以前讲述的其余传输实现比较,它是否是一个真的传输呢?若不是一个真的传输,咱们用它能够作什么呢?Embedded transport容许更容易的使用不一样的ChannelHandler之间的交互,这也更容易嵌入到其余的ChannelHandler实例并像一个辅助类同样使用它们。它通常用来测试特定的ChannelHandler实现,也能够在ChannelHandler中从新使用一些ChannelHandler来进行扩展,为了实现这样的目的,它自带了一个具体的Channel实现,即:EmbeddedChannel。

4.4 每种传输方式在何时使用?

很少加赘述,看下面列表:

    OIO,在低链接数、须要低延迟时、阻塞时使用

    NIO,在高链接数时使用

    Local,在同一个JVM内通讯时使用

    Embedded,测试ChannelHandler时使用

相关文章
相关标签/搜索