JAVA NIO的服务器端实现java
package com.flyer.cn.javaIO; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Date; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class EchoServer { public static SelectorLoop connectionBell; public static SelectorLoop readBell; public boolean isReadBellRunning=false; private ExecutorService thdPool=Executors.newCachedThreadPool(); public static void main(String[] args) throws IOException { new EchoServer().startServer(); } // 启动服务器 public void startServer() throws IOException { // 准备好一个闹钟.当有连接进来的时候响. connectionBell = new SelectorLoop(); // 准备好一个闹装,当有read事件进来的时候响. readBell = new SelectorLoop(); // 开启一个server channel来监听 ServerSocketChannel ssc = ServerSocketChannel.open(); // 开启非阻塞模式 ssc.configureBlocking(false); ServerSocket socket = ssc.socket(); socket.bind(new InetSocketAddress("localhost",7878)); // 给闹钟规定好要监听报告的事件,这个闹钟只监听新链接事件. ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT); new Thread(connectionBell,"connectionBell").start(); } // Selector轮询线程类 public class SelectorLoop implements Runnable { private Selector selector; private ByteBuffer temp = ByteBuffer.allocate(1024); public SelectorLoop() throws IOException { this.selector = Selector.open(); } public Selector getSelector() { return this.selector; } @Override public void run() { while(true) { try { // 阻塞,只有当至少一个注册的事件发生的时候才会继续. this.selector.select(); Set<SelectionKey> selectKeys = this.selector.selectedKeys(); Iterator<SelectionKey> it = selectKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { // 这是一个connection accept事件, 而且这个事件是注册在serversocketchannel上的. ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 接受一个链接. SocketChannel sc = ssc.accept(); // 对新的链接的channel注册read事件. 使用readBell闹钟. sc.configureBlocking(false); sc.register(readBell.getSelector(), SelectionKey.OP_READ); // 若是读取线程尚未启动,那就启动一个读取线程. synchronized(EchoServer.this) { if (!EchoServer.this.isReadBellRunning) { EchoServer.this.isReadBellRunning = true; new Thread(readBell,"readBell").start(); } } } else if (key.isReadable()){ // 这是一个read事件,而且这个事件是注册在socketchannel上的. SocketChannel sc = (SocketChannel) key.channel(); // 写数据到buffer int count = sc.read(temp); if (count < 0) { // 客户端已经断开链接. key.cancel(); sc.close(); return; } // 切换buffer到读状态,内部指针归位. temp.flip(); String msg = Charset.forName("UTF-8").decode(temp).toString(); System.out.println(new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + sc.getRemoteAddress()); // 清空buffer temp.clear(); thdPool.submit(new Dispatch(sc,msg)); } } } catch (IOException e) { e.printStackTrace(); } } } public class Dispatch implements Runnable{ private SocketChannel sc; private String msg; public Dispatch(SocketChannel _sc,String _msg){ this.sc=_sc; this.msg=_msg; } public void run() { try{ Thread.sleep(1000); // echo back. sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); } catch(Exception ex){ ex.printStackTrace(); } } } } }
JAVA NIO的客户端实现服务器
package com.flyer.cn.javaIO; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Date; import java.util.Iterator; import java.util.Set; public class Client implements Runnable { // 空闲计数器,若是空闲超过10次,将检测server是否中断链接. private String clientName; private static int idleCounter = 0; private Selector selector; private SocketChannel socketChannel; private ByteBuffer temp = ByteBuffer.allocate(1024); public static void main(String[] args) throws IOException { for(int i=0;i<100;i++){ Client client= new Client("client"+i); new Thread(client).start(); //client.sendFirstMsg(); } } public Client(String name) throws IOException { this.clientName=name; // 一样的,注册闹钟. this.selector = Selector.open(); // 链接远程server socketChannel = SocketChannel.open(); // 若是快速的创建了链接,返回true.若是没有创建,则返回false,并在链接后出发Connect事件. Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 7878)); socketChannel.configureBlocking(false); SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); if (isConnected) { this.sendFirstMsg(); } else { // 若是链接还在尝试中,则注册connect事件的监听. connect成功之后会出发connect事件. key.interestOps(SelectionKey.OP_CONNECT); } } public void sendFirstMsg() throws IOException { String msg = "Hello NIO.From "+this.clientName; socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); } @Override public void run() { while (true) { try { // 阻塞,等待事件发生,或者1秒超时. num为发生事件的数量. int num = this.selector.select(1000); if (num ==0) { idleCounter ++; if(idleCounter >10) { // 若是server断开了链接,发送消息将失败. try { this.sendFirstMsg(); } catch(ClosedChannelException e) { e.printStackTrace(); this.socketChannel.close(); return; } } continue; } else { idleCounter = 0; } Set<SelectionKey> keys = this.selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isConnectable()) { // socket connected SocketChannel sc = (SocketChannel)key.channel(); if (sc.isConnectionPending()) { sc.finishConnect(); } // send first message; this.sendFirstMsg(); } if (key.isReadable()) { // msg received. SocketChannel sc = (SocketChannel)key.channel(); this.temp = ByteBuffer.allocate(1024); int count = sc.read(temp); if (count<0) { sc.close(); continue; } // 切换buffer到读状态,内部指针归位. temp.flip(); String msg = Charset.forName("UTF-8").decode(temp).toString(); System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress()+new Date().toLocaleString()); Thread.sleep(1000); // echo back. sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); // 清空buffer temp.clear(); } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
这个实例在高并发下会产生粘包和段包的问题,解决方法参考下一篇并发