Java NIO服务器端开发

1、NIO类库简介

  一、缓冲区Buffer

  Buffer是一个对象,包含一些要写入和读出的数据。java

  在NIO中,全部的数据都是用缓冲区处理的,读取数据时,它是从通道(Channel)直接读到缓冲区中,在写入数据时,也是从缓冲区写入到通道。node

  缓冲区实质上是一个数组,一般是一个字节数组(ByteBuffer),也能够是其它类型的数组,此外缓冲区还提供了对数据的结构化访问以及维护读写位置等信息。数组

  Buffer类的继承关系以下图所示:服务器

  

  二、通道Channel

  Channel是一个通道,网络数据经过Channel读取和写入。通道和流的不一样之处在于通道是双向的(通道能够用于读、写后者两者同时进行),流只是在一个方向上移动。网络

  Channel大致上能够分为两类:用于网络读写的SelectableChannel(ServerSocketChannel和SocketChannel就是其子类)、用于文件操做的FileChannel。dom

  下面的例子给出经过FileChannel来向文件中写入数据、从文件中读取数据,将文件数据拷贝到另外一个文件中:socket

public class NioTest
{
    public static void main(String[] args) throws IOException
    {
        copyFile();
    }
    //拷贝文件
    private static void copyFile()
    {
        FileInputStream in=null;
        FileOutputStream out=null;
        try
        {
            in=new FileInputStream("src/main/java/data/in-data.txt");
            out=new FileOutputStream("src/main/java/data/out-data.txt");
            FileChannel inChannel=in.getChannel();
            FileChannel outChannel=out.getChannel();
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            int bytesRead = inChannel.read(buffer);
            while (bytesRead!=-1)
            {
                buffer.flip();
                outChannel.write(buffer);
                buffer.clear();
                bytesRead = inChannel.read(buffer);
            }
        }
        catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }    
    }
    //写文件
    private static void writeFileNio()
    {
        try
        {
            RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw");
            FileChannel fc=fout.getChannel();
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            buffer.put("hi123".getBytes());
            buffer.flip();
            try
            {
                fc.write(buffer);
            } catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } 
        catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //读文件
    private static void readFileNio()
    {
        FileInputStream fileInputStream;
        try
        {
            fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt");
            FileChannel fileChannel=fileInputStream.getChannel();//从 FileInputStream 获取通道
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//建立缓冲区
            int bytesRead=fileChannel.read(byteBuffer);//将数据读到缓冲区
            while(bytesRead!=-1)
            {
                /*limit=position
                 * position=0;
                 */
                byteBuffer.flip();
                //hasRemaining():告知在当前位置和限制之间是否有元素
                while (byteBuffer.hasRemaining())
                {
                    System.out.print((char) byteBuffer.get());
                }
                /*
                 * 清空缓冲区
                 * position=0;
                 * limit=capacity;
                 */
                byteBuffer.clear();
                bytesRead = fileChannel.read(byteBuffer);
            }
        } catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

  三、多路复用器Selector

  多路复用器提供选择已经就绪的任务的能力。Selector会不断的轮询注册在其上的Channel,若是某个Channel上面发送读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,而后经过SelectionKey能够获取就绪Channel的集合,进行后续的I/O操做。ide

  一个多路复用器Selector能够同时轮询多个Channel,因为JDK使用了epoll代替了传统的select实现,因此它没有最大链接句柄1024/2048的限制,意味着只须要一个线程负责Selector的轮询,就能够接入成千上万的客户端。其模型以下图所示:url

  

  用单线程处理一个Selector。要使用Selector,得向Selector注册Channel,而后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就能够处理这些事件,事件的例子有如新链接进来,数据接收等。 spa

  注:

  一、什么select模型?

  select是事件触发机制,当等待的事件发生就触发进行处理,多用于Linux实现的服务器对客户端的处理。

  能够阻塞地同时探测一组支持非阻塞的IO设备,是否有事件发生(如可读、可写,有高优先级错误输出等),直至某一个设备触发了事件或者超过了指定的等待时间。也就是它们的职责不是作IO,而是帮助调用者寻找当前就绪的设备。

  二、什么是epoll模型?

  epoll的设计思路,是把select/poll单个的操做拆分为1个epoll_create+多个epoll_ctrl+一个wait。此外,内核针对epoll操做添加了一个文件系统”eventpollfs”,每个或者多个要监视的文件描述符都有一个对应的eventpollfs文件系统的inode节点,主要信息保存在eventpoll结构体中。而被监视的文件的重要信息则保存在epitem结构体中。因此他们是一对多的关系。

2、NIO服务器端开发

  功能说明:开启服务器端,对每个接入的客户端都向其发送hello字符串。

  使用NIO进行服务器端开发主要有如下几个步骤:

  一、建立ServerSocketChannel,配置它为非阻塞模式

    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);

  二、绑定监听,配置TCP参数,如backlog大小

    serverSocketChannel.socket().bind(new InetSocketAddress(8080));

  三、建立一个独立的I/O线程,用于轮询多路复用器Selector

  四、建立Selector,将以前建立的ServerSocketChannel注册到Selector上,监听SelectionKey.ACCEPT

  selector=Selector.open();
   serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  五、启动I/O线程,在循环体内执行Selector.select()方法,轮询就绪的Channel

  while(true)
   {
        try
        {
           //select()阻塞到至少有一个通道在你注册的事件上就绪了
           //若是没有准备好的channel,就在这一直阻塞
           //select(long timeout)和select()同样,除了最长会阻塞timeout毫秒(参数)。
           selector.select();
        } 
        catch (IOException e)
        {
           // TODO Auto-generated catch block
           e.printStackTrace();
           break;
         }
 }

  六、当轮询到了处于就绪状态的Channel时,需对其进行判断,若是是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端

      //返回已经就绪的SelectionKey,而后迭代执行
            Set<SelectionKey> readKeys=selector.selectedKeys();
            for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
            {
                SelectionKey key=it.next();
                it.remove();
                try
                {
                    if(key.isAcceptable())
                    {
                        ServerSocketChannel server=(ServerSocketChannel) key.channel();
                        SocketChannel client=server.accept();
                        client.configureBlocking(false);
                        client.register(selector,SelectionKey.OP_WRITE);
                    }
                    else if(key.isWritable())
                    {
                        SocketChannel client=(SocketChannel) key.channel();
                        ByteBuffer buffer=ByteBuffer.allocate(20);
                        String str="hello";
                        buffer=ByteBuffer.wrap(str.getBytes());
                        client.write(buffer);
                        key.cancel();
                    } 
                }catch(IOException e)
                {
                    e.printStackTrace();
                    key.cancel();
                    try
                    {
                        key.channel().close();
                    } catch (IOException e1)
                    {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                    
                }
            }    

  七、设置新接入的客户端链路SocketChannel为非阻塞模式,配置其余的一些TCP参数

  if(key.isAcceptable())
    {
        ServerSocketChannel server=(ServerSocketChannel) key.channel();
        SocketChannel client=server.accept();
        client.configureBlocking(false);
        ...
    }

  八、将SocketChannel注册到Selector,监听OP_WRITE

  client.register(selector,SelectionKey.OP_WRITE);

  九、若是轮询的Channel为OP_WRITE,则说明要向SockChannel中写入数据,则构造ByteBuffer对象,写入数据包

  else if(key.isWritable())
    {
        SocketChannel client=(SocketChannel) key.channel();
        ByteBuffer buffer=ByteBuffer.allocate(20);
        String str="hello";
        buffer=ByteBuffer.wrap(str.getBytes());
        client.write(buffer);
        key.cancel();
    } 

  完整代码以下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ServerSocketChannelDemo
{
    public static void main(String[] args)
    {
        ServerSocketChannel serverSocketChannel;
        Selector selector=null;
        try
        {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            selector=Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        while(true)
        {
            try
            {
                //select()阻塞到至少有一个通道在你注册的事件上就绪了
                //若是没有准备好的channel,就在这一直阻塞
                //select(long timeout)和select()同样,除了最长会阻塞timeout毫秒(参数)。
                selector.select();
            } 
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
                break;
            }
            //返回已经就绪的SelectionKey,而后迭代执行
            Set<SelectionKey> readKeys=selector.selectedKeys();
            for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
            {
                SelectionKey key=it.next();
                it.remove();
                try
                {
                    if(key.isAcceptable())
                    {
                        ServerSocketChannel server=(ServerSocketChannel) key.channel();
                        SocketChannel client=server.accept();
                        client.configureBlocking(false);
                        client.register(selector,SelectionKey.OP_WRITE);
                    }
                    else if(key.isWritable())
                    {
                        SocketChannel client=(SocketChannel) key.channel();
                        ByteBuffer buffer=ByteBuffer.allocate(20);
                        String str="hello";
                        buffer=ByteBuffer.wrap(str.getBytes());
                        client.write(buffer);
                        key.cancel();
                    } 
                }catch(IOException e)
                {
                    e.printStackTrace();
                    key.cancel();
                    try
                    {
                        key.channel().close();
                    } catch (IOException e1)
                    {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                    
                }
            }    
        }
    }
}
View Code

  咱们用telnet localhost 8080模拟出多个客户端:

  

  程序运行结果以下:

  

3、参考资料

  一、netty权威指南(李林峰)

相关文章
相关标签/搜索