本篇文章主要使用IO和NIO的形式来实现一个简单的聊天室,而且说明IO方法存在的问题,而NIO又是如何解决的。java
大概的框架为,先提供思路和大概框架图——代码——问题及解决方式,这样会容易看一点。服务器
下面编写一个简单的聊天室,大概须要的功能就是服务端维护一个聊天室,里边的客户端发送消息以后服务将其消息转发给其余客户端,达到一个聊天室的效果。多线程
大体的思路:服务端区分职责,分红两部分,主线程负责接收链接并把链接放入到线程池中处理,维护一个线程池,全部对于socket的处理都交给线程池中的线程来处理。以下图。架构
下面贴上demo代码(代码中有几处为了方便并无采用最规范的定义方式,如线程池的建立和Map初始化的时候未设置初始容量等)并发
代码分五个类,服务端(ChatServer,监听做用,为服务端主线程)、客户端(ChatClient)、服务端处理器(ServerHandler,能够理解为线程池中要执行的事情)、客户端处理器(ClientHandler,客户端读写服务器消息的处理),工具类(SocketUtils,只有一个发送消息方法)。框架
服务端:socket
/** * 服务端启动类 * 主要负责监听客户端链接 */ public class ChatServer { public static void main(String[] args) { ServerSocket serverSocket = null; /*----------为了方便使用Executors建立线程-------------*/ ExecutorService handlerThreadPool = Executors.newFixedThreadPool(100); try { serverSocket = new ServerSocket(8888); while (true) { System.out.println("-----------阻塞等待链接------------"); Socket socket = serverSocket.accept(); String key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); System.err.println(key + "已链接"); // 主线程只接收,处理直接交给处理线程池 handlerThreadPool.execute(new ServerHandler(socket)); } } catch (IOException e) { e.printStackTrace(); if (Objects.nonNull(serverSocket)) { try { serverSocket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } } }
服务端处理类:ide
/** * 服务端socket事件处理类 * 负责处理对应socket中的读写操做 */ public class ServerHandler implements Runnable { /** * 链接到服务端的全部链接 socket的地址端口->socket */ private static final Map<String, Socket> socketMap = new ConcurrentHashMap<>(); /** * 维护名称和地址的map */ private static final Map<String, String> nameMap = new ConcurrentHashMap<>(); private Socket socket; /** * 每一个socket的标识,使用地址+端口构成 */ private String key; public ServerHandler() { } public ServerHandler(Socket socket) { this.socket = socket; this.key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); } @Override public void run() { Socket s = socket; // 根据消息执行不一样操做 InputStream inputStream; // debug查看数据用 // Map<String, Socket> tmpMap = socketMap; try { inputStream = s.getInputStream(); Scanner scanner = new Scanner(inputStream); while (true) { String line = scanner.nextLine(); if (line.startsWith("register")) { // 登记 String[] split = line.split(":"); String name = split[1]; String msg; // 校验是否存在 if (socketMap.containsKey(key)) { msg = "请勿重复登记"; sendMsg(s, msg); return; } if (nameMap.containsValue(name)) { msg = "名称已被登记,请换一个名称"; sendMsg(s, msg); return; } // 通知本身已链接 sendMsg(s, "已链接到服务器"); msg = name + "进入聊天室"; // 将消息转发给其余客户端 sendMsgToClients(msg); // 放入socket池 socketMap.put(key, s); nameMap.put(key, name); System.err.println(name + "已登记"); } else if (line.trim().equalsIgnoreCase("end")) { if (notPassRegisterValidate()) { continue; } // 断开链接 socketMap.remove(key); String name = nameMap.get(key); String msg = name + "离开聊天室"; System.err.println(msg); // 将消息转发给其余客户端 sendMsgToClients(msg); msg = "已断开链接"; // 发送给对应的链接断开信息 sendMsg(s, msg); inputStream.close(); break; } else { if (notPassRegisterValidate()) { continue; } // 正常通讯 String name = nameMap.get(key); String msg = name + ":" + line; // 将消息转发给其余客户端 sendMsgToClients(msg); } } } catch (IOException e) { e.printStackTrace(); } } /** * 是否已登陆校验 * * @return 是否已登陆 */ private boolean notPassRegisterValidate() { boolean hasRegister = nameMap.containsKey(key); if (hasRegister) { return false; } String msg = "您还未登陆,请先登陆"; sendMsg(socket, msg); return true; } /** * 往链接发送消息 * * @param socket 客户端链接 * @param msg 消息 */ private void sendMsg(Socket socket, String msg) { SocketUtils.sendMsg(socket, msg); if (socket.isClosed()) { socketMap.remove(key); } } /** * 发送给其余客户端信息 * * @param msg 信息 */ private void sendMsgToClients(String msg) { for (Map.Entry<String, Socket> entry : socketMap.entrySet()) { if (this.key.equals(entry.getKey())) { continue; } sendMsg(entry.getValue(), msg); } } }
工具类(一个发送消息的方法):函数
public class SocketUtils { private SocketUtils() { } public static void sendMsg(Socket socket, String msg) { Socket s = socket; OutputStream outputStream = null; msg += "\r\n"; try { outputStream = s.getOutputStream(); outputStream.write(msg.getBytes(StandardCharsets.UTF_8)); outputStream.flush(); } catch (IOException e) { System.err.println("发送消息失败, 链接已断开"); try { if (Objects.nonNull(outputStream)) { outputStream.close(); } socket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
客户端:高并发
/** * 客户端读和写各自使用一个线程 */ public class ChatClient { public static void main(String[] args) { Socket socket; ExecutorService clientHandlerPool = Executors.newFixedThreadPool(2); try { socket = new Socket("localhost", 8888); // 写线程 clientHandlerPool.execute(new ClientHandler(socket, 1)); // 读线程 clientHandlerPool.execute(new ClientHandler(socket, 0)); } catch (IOException e) { e.printStackTrace(); } } }
客户端处理器:
/** * 客户端处理器 * 根据type来区分是作读工做仍是写工做 */ public class ClientHandler implements Runnable { private Socket socket; /** * 处理类型,0-读、1-写 */ private int type; public ClientHandler() { throw new IllegalArgumentException("不能使用没有参数的构造函数"); } public ClientHandler(Socket socket, int type) { this.socket = socket; this.type = type; } @Override public void run() { if (type == 1) { // 进行写操做 doWriteJob(); return; } // 默认读操做 doReadJob(); } /** * 读操做 */ private void doReadJob() { Socket s = socket; InputStream inputStream; try { inputStream = s.getInputStream(); Scanner scanner = new Scanner(inputStream); while (true) { String line = scanner.nextLine(); if (null != line && !"".equals(line)) { System.err.println(line); } // 若是已退出了,那么关闭链接 if ("已断开链接".equals(line)) { socket.close(); break; } } } catch (IOException e) { e.printStackTrace(); try { socket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } /** * 写线程 */ private void doWriteJob() { Socket s = socket; try { Scanner scanner = new Scanner(System.in); while (true) { String output = scanner.nextLine(); if (Objects.nonNull(output) && !"".equals(output)) { SocketUtils.sendMsg(s, output); } } } catch (Exception e) { e.printStackTrace(); System.err.println("错误发生了:" + e.getMessage()); } } }
结果:
思考:当前这样实现有什么瓶颈,可能会出现什么问题?
存在问题:
- 服务端使用accept阻塞接收线程,链接一个一个处理,在高并发下处理性能缓慢。
- 没有链接的时候线程一直处于阻塞状态形成资源的浪费(若是使用多线程接收处理并发,那么没链接的时候形成多个线程的资源浪费)。
那咱们来看下NIO是怎么解决上方的问题的,首先上这个demo总体的架构图。
大概的逻辑为
- 服务端将ServerSocketChannel注册到Selector中,客户端链接进来的时候事件触发,将客户端的链接注册到selector中。
- 主线程负责selector的轮询工做,发现有事件能够处理就将其交给线程池。
- 客户端同理分红两个部分,写操做和读操做,每一个操做由一个线程单独完成;可是若是读操做处理使用while循环不断轮询等待接收的话,CPU会飙升,因此须要客户端新建一个selector来解决这个问题,注意这个selector跟服务端不是同一个,没有啥关系。
代码分类大体跟IO写法同样,分红服务端、服务端处理器、客户端、客户端处理器,下面为demo。
服务端:
public class ChatServer { private Selector selector; private ServerSocketChannel serverSocketChannel; private static final ExecutorService handlerPool = Executors.newFixedThreadPool(100); public ChatServer() throws IOException { this.selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(9999)); // 将服务端的socket注册到selector中,接收客户端,并将其注册到selector中,其自己也是selector中的一个I/O事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.err.println("聊天室服务端初始化结束"); } /** * 启动方法 * 1.监听,拿到以后进行处理 */ public void start() throws IOException { int count; while (true) { // 可能出现select方法没阻塞,空轮询致使死循环的状况 count = selector.select(); if (count > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 交给线程池处理 handlerPool.execute(new ServerHandler(key, selector)); // 处理完成后移除 iterator.remove(); } } } } public static void main(String[] args) throws IOException { new ChatServer().start(); } }
服务端处理器:
public class ServerHandler implements Runnable { private SelectionKey key; private Selector selector; public ServerHandler() { } /** * 原本能够经过key拿到selector,这里为了图方便就这样写了 */ public ServerHandler(SelectionKey key, Selector selector) { this.key = key; this.selector = selector; } @Override public void run() { try { if (key.isAcceptable()) { // 说明是服务端的事件,注意这里强转换为的是ServerSocketChannel ServerSocketChannel channel = (ServerSocketChannel) key.channel(); // 接收链接 SocketChannel socket = channel.accept(); if (Objects.isNull(socket)) { return; } socket.configureBlocking(false); // 接收客户端的socket而且将其注册到服务端这边的selector中,注意客户端在此时跟服务端selector产生关联 socket.register(selector, SelectionKey.OP_READ); System.err.println("服务端已接收链接"); } else if (key.isReadable()) { // 客户端发送信息过来了 doReadJob(); } } catch (IOException e) { e.printStackTrace(); // 错误处理 } } /** * 读取操做 */ private void doReadJob() throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int readCount = socketChannel.read(buffer); if (readCount > 0) { String msg = new String(buffer.array(), StandardCharsets.UTF_8); System.err.println(socketChannel.getRemoteAddress().toString() + "的信息为:" + msg); // 转发给其余客户端 sendMsgToOtherClients(msg); } } /** * 转发消息给其余客户端 * * @param msg 消息 */ private void sendMsgToOtherClients(String msg) throws IOException { SocketChannel self = (SocketChannel) key.channel(); Set<SelectionKey> keys = selector.keys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); SelectableChannel channel = selectionKey.channel(); // 若是是自己或者不是socketChannel类型则跳过 if (self.equals(channel) || channel instanceof ServerSocketChannel) { continue; } SocketChannel socketChannel = (SocketChannel) channel; ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)); socketChannel.write(byteBuffer); } } }
客户端:
public class ChatClient { private Selector selector; private SocketChannel socketChannel; private static ExecutorService dealPool = Executors.newFixedThreadPool(2); public ChatClient() throws IOException { /* * 说明一下: * 客户端这边的selector跟刚才在服务端定义的selector是不一样的两个selector * 客户端这边不须要selector也能实现功能,可是读取的时候必须不断的循环,会致使CPU飙升, * 因此使用selector是为了解决这个问题的,别跟服务端的selector搞混就好 */ selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } public void start() throws IOException, InterruptedException { // 链接 // socketChannel.connect(new InetSocketAddress("localhost", 9999)); while (!socketChannel.finishConnect()) { System.err.println("正在链接..."); TimeUnit.MILLISECONDS.sleep(200); } System.err.println("链接成功"); // 使用两个线程来分别处理读取和写操做 // 写数据 dealPool.execute(new ClientHandler(selector, socketChannel, 1)); // 读取数据 dealPool.execute(new ClientHandler(selector, socketChannel, 0)); } public static void main(String[] args) throws IOException, InterruptedException { new ChatClient().start(); } }
客户端处理器:
public class ClientHandler implements Runnable { private Selector selector; private SocketChannel socketChannel; /** * 0-读,1-写 */ private int type; public ClientHandler() { } public ClientHandler(Selector selector, SocketChannel socketChannel, int type) { // selector是为了解决读时候CPU飙升的问题,具体见客户端的启动类代码注释 this.selector = selector; this.socketChannel = socketChannel; this.type = type; } @Override public void run() { try { if (type == 0) { doClientReadJob(); return; } doClientWriteJob(); } catch (IOException e) { e.printStackTrace(); } } /** * 写操做 */ private void doClientWriteJob() throws IOException { SocketChannel sc = socketChannel; Scanner scanner = new Scanner(System.in); while (true) { if (scanner.hasNextLine()) { String line = scanner.nextLine(); if (null != line && !"".equals(line)) { ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8)); sc.write(buffer); } } } } /** * 读操做 */ private void doClientReadJob() throws IOException { SocketChannel sc = socketChannel; ByteBuffer buf = ByteBuffer.allocate(1024); while (true) { int select = selector.select(); if (select > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 这是必须的,否则下方的remove会出错 SelectionKey next = iterator.next(); // 这里由于只有自己这个客户端注册到客户端的selector中,因此有事件必定是它的,也就不用从key拿了,直接操做就行 buf.clear(); int read = sc.read(buf); if (read > 0) { String msg = new String(buf.array(), StandardCharsets.UTF_8); System.err.println(msg); } // 事件处理完以后要移除这个key,不然的话selector.select()方法不会再读到这个key,即使有新的时间到这个channel来 iterator.remove(); } } } } }
结果图:
在编写的过程当中发现了如下两点:
- select方法以后若是存在key,而且接下来的操做未对这个selectionKey作remove操做,那么下次的select不会再将其选入,即使有事件发生,也就是说,select方法不会选择以前已经选过的key。
- selector.select()方法中偶尔会出现不阻塞的状况。这就是NIO中的空轮询bug,也就是说,没有链接又不阻塞的话,while(true) ... 的写法就是一个死循环,会致使CPU飙升。
第二点问题在NIO框架(如netty)中都采用了比较好的解决方法,能够去查下如何解决的。接下来看下NIO的写法是否解决了IO写法中存在的问题:
服务端使用accept阻塞接收线程,链接一个一个处理,在高并发下处理性能缓慢。
答:上述写法中仍是使用一个ServerSocketChannel来接收客户端,没有解决这个问题;可是能够经过使用线程池的方式来解决。也就是说将服务端的事件分红两个部分,第一个部分为接收客户端,使用一个线程池来维护;第二个部分为客户端的事件处理操做,也维护一个线程池来执行这些事件。
这样性能上去了,因为selector的存在也不会出现资源浪费的事情,netty就是这么作的哦。
没有链接的时候线程一直处于阻塞状态形成资源的浪费(若是使用多线程接收处理并发,那么没链接的时候形成多个线程的资源浪费)。
答:解决。NIO写法主要有selector不断轮询,不会出现没链接不做为的状况,并且多个链接的话也没有问题(参考1的回答)。
两种写法都有Reactor模式的影子,可是IO写法有明显的缺点就是若是没有链接会形成资源浪费的问题(采用多个接收链接的话更甚),而NIO中selector轮询机制就很好的解决了无链接时无做为的状况,而且在性能方面能够经过职责分类和线程池来获得改善,因此,NIO,永远滴神。
须要压力,须要努力。