经过上篇socke基础,咱们回顾了下socket的用法。上篇内容很简单,服务端也只是接收了一个客户端的链接,接下来咱们就升级下咱们的demo,使其像一个真正的服务器。java
首先咱们容许服务端接收多个客户端的链接。修改OioServer
以下服务器
代码2-1并发
public class OioServer { private ServerSocket serverSocket; public void start() { Socket socket = null; try { openServer(8081); if (Objects.isNull(serverSocket)) { return; } while (true) { socket = listenAccept(); handleSocket(socket); } } catch (Exception e) { e.printStackTrace(); SocketUtils.closeServerSocketSafely(serverSocket); SocketUtils.closeSocketSafely(socket); } } private void handleSocket(Socket socket) { new Thread(() -> { while (!socket.isClosed()) { String msg = SocketUtils.read(socket); SocketUtils.write(socket, " I get you" + msg); } }).start(); } public void openServer(int port) throws IOException { // 1 建立ServerSocket serverSocket = new ServerSocket(); // 2 绑定端口 SocketAddress socketAddress = new InetSocketAddress(port); serverSocket.bind(socketAddress); // 3 accept客户端 } public Socket listenAccept() throws IOException { return serverSocket.accept(); } }
当调用start()方法后,咱们服务器就开始监听8081接口了。而后每次一个客户端链接进来,咱们就会获得一个socket,而后咱们建立一个线程去处理这个socket。app
为何要建立新的线程?由于socket读写都是阻塞的,若是不启动新线程,那主线程就会被阻塞。这个时候,有新的客户端链接进来将不会被处理。可是,咱们为每一个socket建立一个线程,这样是有代价的,而且咱们服务器是不可能建立无数个线程的。固咱们使用为每一个socket建立一个线程这种方法在高并发的状况下显然是不可行的。那么有什么方法改进吗?答案是确定的。如今java有了nio,可是我如今不急于把这个王炸展现出来,让咱们一步步靠近它,并揭开它的神秘面纱。socket
如今咱们知道了为每一个socket建立一个线程是由于,socket的操做(读或写)是阻塞的,那咱们不让它阻塞不就能够了?有办法吗?有。对于读,咱们可使用inputStream.available()
;来判断一下,是否可读,不可读咱们就不调用阻塞方法 inputStream.read(bytes)
。因而咱们再SocketUtils
中天加一个方法高并发
代码2-2学习
/** * 从socket中读数据 */ public static ReadResult readWithNoBlocking(Socket socket) { try { InputStream inputStream = socket.getInputStream(); byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); if (inputStream.available() <= 0) { return ReadResult.unReadableResult(); } while ((len = inputStream.read(bytes)) != -1) { sb.append(new String(bytes, 0, len, "UTF-8")); if (inputStream.available() <= 0) { return ReadResult.readableResult(sb.toString()); } } return ReadResult.readableResult(sb.toString()); } catch (IOException e) { e.printStackTrace(); return ReadResult.unReadableResult(); } }
而后修改OioServer,ui
代码2-4this
public class OioServer { private ServerSocket serverSocket; private volatile List<Socket> socketList = new ArrayList<>(); ... public void start() { Socket socket = null; try { openServer(8081); // 开启处理socket链接的线程 startChildHandler(); // 主线程监听链接 while (true) { Socket socket = listenAccept(); handleSocket(socket); } } catch (Exception e) { e.printStackTrace(); SocketUtils.closeServerSocketSafely(serverSocket); SocketUtils.closeSocketSafely(socket); } } // 添加socket到socketList中 private void handleSocket(Socket socket) { socketList.add(socket); } // 处理全部socket private void startChildHandler() { new Thread(() -> { while (true) { for (Socket socketToDeal : socketList) { ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal); if (readResult.readable()) { System.out.println("收到客户端消息" + socketToDeal.getInetAddress().toString() + " " + readResult.result()); SocketUtils.write(socketToDeal, "Get u:" + readResult.result()); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }
首先咱们修改了handleSocket方法,是新建的socket添加到socketList中,由于咱们有了SocketUtils.readWithNoBlocking
方法,读操做不再会阻塞住线程了,这样咱们就能够在循环中不断坚持全部的socket是否有消息发过来,并处理。线程
虽然上述代码健壮性有待考证,可是咱们确实获得了一个只要一个线程就能够处理全部socket的服务器模型。也能够说,这是简易版的nio服务器。
如今咱们已经有一个nio 的server了,可是,彻底是没有章法的编写的,若是要增长功能,或者定制化一些东西,那必需要修改OioServer
,这违反了开闭原则。所以咱们须要提取一些通用逻辑,将逻辑的处理交给使用方,下面是以可读为例。
代码2-5
public class NioServer { private ServerSocket serverSocket; private volatile List<SocketContext> socketList = new ArrayList<>(); private volatile List<SocketContext> statusChangedContext = new ArrayList<>(); public void start(int port) { // 监听端口线程 new Thread(() ->{ Socket socket = null; try { openServer(port); startChildHandler(); while (true) { socket = listenAccept(); handleSocket(socket); } } catch (Exception e) { e.printStackTrace(); SocketUtils.closeServerSocketSafely(serverSocket); SocketUtils.closeSocketSafely(socket); } }).start(); } // 监听全部socket private void startChildHandler() { new Thread(() -> { while (true) { for (SocketContext socketToDeal : socketList) { ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal.getSocket()); if (readResult.readable()) { // 若是socket可读,将其加入到statusChangedContext中,并唤醒调用线程 socketToDeal.setStatus(SocketContext.STATUS_READABLE); socketToDeal.setMsg(readResult.result()); statusChangedContext.add(socketToDeal); synchronized (statusChangedContext) { statusChangedContext.notifyAll(); } } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } private void handleSocket(Socket socket) { SocketContext socketContext = new SocketContext(); socketContext.setSocket(socket); socketList.add(socketContext); } private void openServer(int port) throws IOException { // 1 建立ServerSocket serverSocket = new ServerSocket(); // 2 绑定端口 SocketAddress socketAddress = new InetSocketAddress(port); serverSocket.bind(socketAddress); // 3 accept客户端 } private Socket listenAccept() throws IOException { return serverSocket.accept(); } public List<SocketContext> getStatusChangedContext() { if (statusChangedContext.size() == 0) { try { // 当statusChangedContext为空,也就是没有事件要处理的时候,咱们挂起调用方线程,这样能够节约资源 synchronized (statusChangedContext) { statusChangedContext.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } return statusChangedContext; } public static class SocketContext { public static final int STATUS_READABLE = 1; private Socket socket; private int status; private String msg; public Socket getSocket() { return socket; } public void setSocket(Socket socket) { this.socket = socket; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public String read() { return msg; } public void setMsg(String msg) { this.msg = msg; } public void write(String msg) { SocketUtils.write(this.socket, msg); } } }
而后咱们就能够这样使用它了
代码2-6
public class NioServerTest { @Test public void test() { NioSocket server = new NioSocket(); server.start(8081); while (true) { Iterator<SocketContext> socketContexts = server.getStatusChangedContext().iterator(); while (socketContexts.hasNext()) { SocketContext context = socketContexts.next(); socketContexts.remove(); if (context.getStatus() == SocketContext.STATUS_READABLE) { // 处理读 System.out.println(context.read()); context.write("Ok"); } } } } }
从代码2-4
到代码2-5
逻辑跨越应该不大,这里解释下2-5
的一些细节.
为了让NioSocket
在后台持续监听咱们设定的端口,咱们将 socket = listenAccept(); handleSocket(socket);
这两个步骤放入一个单独的线程。每次有客户端接入,便会获得一个新的socket,将这个新的socket加入到socketList
中,而后在startChildHandler
启动的线程中遍历全部socket,并判断其状态改变(可读)。
为了把业务控制权交于调用方,在本例中也就是NioSocketTest.test
。我定义看一个变量statusChangedContext
,若是有socket可读,则将其包装成SocketContext
加入到statusChangedContext
中取。这样,调用方直接拿到statusChangedContext
去遍历,就能够处理全部的socket的读事件。
当调用方调用getStatusChangedContext()
方法时,若是此时statusChangedContext
为空,则调用线程会被挂起,知道有可读事件出现,调用线程被唤醒(statusChangedContext.notifyAll()
)
若是看官老爷读了上面两部分,那么至少对nio的使用已经有所领悟了。上面咱们自制了一个nio 的socket,虽然只能对read事件做出反应,可是其余的事件,好比,可写、socket断开等事件也是能够按照这个思路去作的。那么咱们就能够无缝切入java nio了。
代码2-7
public class NioServer { private Selector selector; private Selector chiledSelector; public void start(int port) throws IOException { // 经过open()方法找到Selector selector = Selector.open(); chiledSelector = Selector.open(); // 打开服务器套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); // 服务器配置为非阻塞 ssc.configureBlocking(false); // 进行服务的绑定 ssc.bind(new InetSocketAddress("localhost", port)); // 注册到selector,等待链接 SelectionKey selectionKey = ssc.register(selector, 0); selectionKey.interestOps(SelectionKey.OP_ACCEPT); while (!Thread.currentThread().isInterrupted()) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { SocketChannel clientChannel = ssc.accept(); handleSocket(clientChannel); } keyIterator.remove(); //该事件已经处理,能够丢弃 } } } public Set<SelectionKey> getStatusChangedContext() throws IOException { chiledSelector.select(); return chiledSelector.selectedKeys(); } private void handleSocket(SocketChannel clientChannel) throws IOException { clientChannel.configureBlocking(false); clientChannel.register(chiledSelector, SelectionKey.OP_READ); System.out.println("a new client connected " + clientChannel.getRemoteAddress()); } public void write(SelectionKey key, String msg) throws IOException, ClosedChannelException { SocketChannel channel = (SocketChannel) key.channel(); System.out.println("write:" + msg); ByteBuffer sendBuffer = ByteBuffer.allocate(1024); sendBuffer.clear(); sendBuffer.put(msg.getBytes()); sendBuffer.flip(); channel.write(sendBuffer); channel.register(chiledSelector, SelectionKey.OP_READ); } public String read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); readBuffer.clear(); int numRead; try { numRead = socketChannel.read(readBuffer); } catch (IOException e) { key.cancel(); socketChannel.close(); return null; } return new String(readBuffer.array(), 0, numRead); } }
代码2-8
public class NioServerTest { @Test public void test() throws IOException { NioServer server = new NioServer(); server.start(8081); while (true) { Iterator<SelectionKey> socketContexts = server.getStatusChangedContext().iterator(); while (socketContexts.hasNext()) { SelectionKey key = socketContexts.next(); socketContexts.remove(); if ((key.readyOps() & SelectionKey.OP_READ) != 0) { System.out.println(server.read(key)); server.write(key, "Ok"); } } } } }
上面利用java nio写的server跟咱们本身实现的nio写的server效果是同样的。咱们本身建立监听客户端线程,还有处理socket线程的工做,交给了java nio内部(固然不是简单的起了两个线程而已,我只是简化了这个模型)。
在java nio中,socket不在是socket,而是SocketChannel,这里你们暂时理解他俩等价吧。而后一个Selector就至关于一个线程,而后咱们将channel与selector经过register
方法关联起来,并指定咱们感兴趣的事。注意:这里跟咱们本身实现的nio有区别,咱们没有提供注册兴趣事件,而是默认对可读事件感兴趣。而后咱们调selector.select()方法,一样,这个方法没有事件发生会阻塞。而后获得事件集合去遍历处理。
这篇文章,咱们经过bio的socket本身经过线程和循环实现了服务端,并有了事件的概念。而后咱们又用Nio的方式去实现了相同的功能。经过两种方式,咱们很天然的理解了Nio的使用及基本原理,下一章咱们将会更加细致的学习Java NIO.