阻塞IO,在accept和IO读写时当前线程阻塞。 java
Socket clientScoket=server.accept(); //阻塞等待客户端Socket连接服务器
取得链接后,把clientScoket封装到Runable中交给ThreadPool线程池中线程去处理读写。socket
clientScoket.getInputStream();//取得客户端Socket输入流this
阻塞的进行IO读写操做.net
可是当线程池占满时,其余连接必须等待有连接释放线程。线程
只用一个线程经过Selector,就能够控制客户端Channel连接注册,状态监控,以及读写操做,实现多路复用。rest
此时多个客户端能够同时链接,和服务器进行IO读写操做。code
由于如今是一个线程关注多个Channel,并且每一个Channel的数据发送都是不连贯的。因此增长了Buffer做为缓冲区。server
此线程无线循环,Selector只会监控Channel注册时绑定的事件。通常ServerSocketChannel 绑定OP_ACCEPT事件,事件
客户端SocketChannel绑定OP_READ。
public class NioServer { //通道管理器 private Selector selector; //获取一个ServerSocket通道,并初始化通道 public NioServer init(int port) throws IOException{ //获取一个ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); //获取通道管理器 selector=Selector.open(); //将通道管理器与ServerSocketChannel绑定,并为该通道注册SelectionKey.OP_ACCEPT事件, //只有当该事件到达时,Selector.select()会返回,不然一直阻塞。 serverChannel.register(selector, SelectionKey.OP_ACCEPT); return this; } public void listen() throws IOException{ System.out.println("服务器端启动成功"); //使用轮询访问selector while(true){ //当有注册的事件触发时,方法返回,不然阻塞。 selector.select(); //获取selector中的迭代器,选中项为注册的事件 Iterator<SelectionKey> ite=selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = ite.next(); //删除已选key,防止重复处理 ite.remove(); //服务端接收客户端请求链接事件 if(key.isAcceptable()){ //服务端 ServerSocketChannel server = (ServerSocketChannel)key.channel(); //得到客户端链接通道 SocketChannel channel = server.accept(); channel.configureBlocking(false); //向客户端发消息 channel.write(ByteBuffer.wrap(new String("send message to client").getBytes())); //在与客户端链接成功后,为客户端通道注册SelectionKey.OP_READ事件。 channel.register(selector, SelectionKey.OP_READ); System.out.println("客户端请求链接事件"); }else if(key.isReadable()){//客户端有可读数据事件 //获取客户端传输数据可读取消息通道。 SocketChannel channel = (SocketChannel)key.channel(); //建立读取数据缓冲器 ByteBuffer buffer = ByteBuffer.allocate(10); int read = channel.read(buffer); byte[] data = buffer.array(); String message = new String(data); System.out.println("receive message from client, size:" + buffer.position() + " msg: " + message); // ByteBuffer outbuffer = ByteBuffer.wrap(("server.".concat(msg)).getBytes()); // channel.write(outbuffer); } } } } public static void main(String[] args) throws IOException { new NioServer().init(9981).listen(); } }
虽然NIO是非阻塞的,一个线程就能够经过多路复用器完成对多个客户端的读写操做。可是当业务复杂时,推荐用ThreadPool来解决业务问题,从而解放IO线程。因此区分BossThreadPool和WorkerThreadPool。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; import java.util.List; public class ServerSocketThreadPool{ private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors(); private ThreadPool pool = new ThreadPool(MAX_THREAD); private static int PORT_NUMBER = 1234; public static void main(String[] args) throws Exception { new ServerSocketThreadPool().go(); } public void go() throws Exception { int port = PORT_NUMBER; System.out.println("Listenning on port:" + port); // 建立通道 ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 绑定监听端口 serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 设置为非阻塞方式 serverSocketChannel.configureBlocking(false); // 建立选择器 Selector selector = Selector.open(); // 通道注册到选择器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 一直阻塞,直到有数据请求 int n = selector.select(); if (n == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel socket = server.accept(); registerChannel(selector,socket, SelectionKey.OP_READ); sayHello(socket); } if (key.isReadable()) { readDataFromSocket(key); } it.remove(); } } } public void registerChannel(Selector selector,SelectableChannel channel,int ops)throws Exception{ if(channel==null){ return; } channel.configureBlocking(false); channel.register(selector, ops); } public void sayHello(SocketChannel socket) throws Exception{ ByteBuffer buffer=ByteBuffer.allocate(1024); buffer.clear(); buffer.put("hello client".getBytes()); buffer.flip(); socket.write(buffer); } public void readDataFromSocket(SelectionKey key) throws Exception { WorkThread thread=pool.getWork(); if(thread==null){ return; } thread.serviceChannel(key); } private class ThreadPool { List idle=new LinkedList(); public ThreadPool(int poolSize) { for(int i=0;i<poolSize;i++){ WorkThread thread=new WorkThread(this); thread.setName("worker"+(i+1)); thread.start(); idle.add(thread); } } public WorkThread getWork(){ WorkThread thread=null; synchronized (idle) { if(idle.size()>0){ thread=(WorkThread) idle.remove(0); } } return thread; } public void returnWorker(WorkThread workThread) { synchronized (idle) { idle.add(workThread); } } } private class WorkThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; public WorkThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); this.interrupt(); } if (key == null) { continue; } System.out.println(this.getName() + " has been awaken"); try{ drainChannel(key); }catch(Exception e){ System.out.println("caught '"+e+"' closing channel"); try{ key.channel().close(); }catch(IOException ioe){ ioe.printStackTrace(); } key.selector().wakeup(); } key=null; this.pool.returnWorker(this); } } synchronized void serviceChannel(SelectionKey key){ this.key=key; key.interestOps(key.interestOps()&(~SelectionKey.OP_READ)); this.notify(); } void drainChannel(SelectionKey key)throws Exception{ SocketChannel channel=(SocketChannel) key.channel(); buffer.clear(); int count; while((count=channel.read(buffer))>0){ buffer.flip(); /*while(buffer.hasRemaining()){ channel.write(buffer); }*/ byte[] bytes; bytes=new byte[count]; buffer.get(bytes); System.out.println(new String(bytes)); buffer.clear(); } if(count<0){ channel.close(); return; } key.interestOps(key.interestOps()|SelectionKey.OP_READ); key.selector().wakeup(); } } }