学习 Doug Lea 大神写的——Scalable IO in Javajava
Web services、分布式对象等等都具备相同的处理结构react
基础的网络设计
每个处理的 handler 都在各自的线程中处理。web
代码示例spring
public class Server01 implements Runnable { @Override public void run() { try { ServerSocket serverSocket = new ServerSocket(9898); while (!Thread.interrupted()) { // serverSocket.accept() 会阻塞到有客户端链接,以后 Handler 会处理 new Thread(new Handler(serverSocket.accept())).start(); } } catch (Exception e) { e.printStackTrace(); } } private static class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } @Override public void run() { try { byte[] input = new byte[1024]; // 假设能所有读取出来 socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (Exception e) { e.printStackTrace(); } } private byte[] process(byte[] input) { // 里面处理逻辑 return new byte[0]; } } }
这样作的好处是经过 accept 事件来触发任务的执行,将每一个任务单独的去执行。可是缺点也很明显若是客户端连接过大那么须要新建若干个线程去执行,每台服务器能够运行的线程数是有限的。那么多线程的上下文切换的消耗也是巨大的。设计模式
首先咱们先来看下什么是事件驱动,在 java AWT 包中普遍的获得了使用。用户在点击一个 button 按钮的时候就会触发一个事件,而后会使用观察者模式来触发 Listener 中的处理事件。
服务器
Reactor 设计模式是基于事件驱动的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor 会解耦并发请求的服务并分发给对应的事件处理器来处理。目前,许多流行的开源框架都用到了。相似 AWT 中的 Thread。网络
Handlers 执行非阻塞操做的具体类,相似 AWT 中的 ActionListeners。多线程
Reactor 单线程处理任务的设计
并发
代码示例框架
public class Reactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectionKey.attach(new Acceptor()); } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); selectionKeys.clear(); } } } catch (Exception e) { e.printStackTrace(); } } private void dispatch(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); if (null != runnable) { runnable.run(); } } private class Acceptor implements Runnable { @Override public void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (null != socketChannel) { new Handler(selector, socketChannel); } } catch (IOException e) { e.printStackTrace(); } } } private class Handler implements Runnable { private final SocketChannel socketChannel; private final SelectionKey selectionKey; private ByteBuffer input = ByteBuffer.allocate(1024); private ByteBuffer output = ByteBuffer.allocate(1024); private static final int READING = 0, SENDING = 1; private int state = READING; Handler(Selector selector, SocketChannel socketChannel) throws IOException { this.socketChannel = socketChannel; this.socketChannel.configureBlocking(false); selectionKey = this.socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } void process() { } @Override public void run() { try { if (state == READING) { socketChannel.read(input); process(); state = SENDING; selectionKey.interestOps(SelectionKey.OP_WRITE); } if (state == READING) { socketChannel.write(output); selectionKey.cancel(); } } catch (Exception e) { e.printStackTrace(); } } } }
这个程序的重点在于 selectionKey.attach(); 方法每次把须要的对象传入进去,以后在有事件触发的时候会在 dispatch 中 attachment() 获取到这个对象,以后直接调用 run 方法。
Reactor 多线程处理任务的设计
只须要稍微修改下 Handler 这个类
// 添加一个线程池开发时请使用自定义或 spring 的线程池 private final ExecutorService executorService = Executors.newCachedThreadPool(); // 修改 run 方法 @Override public void run() { try { if (state == READING) { socketChannel.read(input); executorService.execute(new Runnable() { @Override public void run() { process(); state = SENDING; selectionKey.interestOps(SelectionKey.OP_WRITE); } }); } if (state == READING) { socketChannel.write(output); selectionKey.cancel(); } } catch (Exception e) { e.printStackTrace(); } }
多个 Reactor
当看到这幅图的时候感受这不就是 Netty EventLoopGroup 的工做模式吗
至此粗略的看完了这篇文章,感受太 6 了,须要后面重复学习,此次只是了解大概。后面学习完会持续更新这篇文章!