<!-- lang: java --> class Reactor implements Runnable { final Selector selector; //ServerSocketChannel //支持异步操做,对应于java.net.ServerSocket这个类,提供了TCP协议IO接口,支持OP_ACCEPT操做。 final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); //建立实例 serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); // 全部channel建立的时候都是blocking模式, // 只有non-blocking的SelectableChannel才能够参与异步IO操做。 serverSocket.configureBlocking(false); //设置non-blocking模式。 /** *SelectionKey register(Selector sel, int ops) *将当前channel注册到一个Selector上并返回对应的SelectionKey。 *在这之后,经过调用Selector的select()函数就能够监控这个channel。ops这个参数是一个bit mask,表明了须要监控的IO操做。 *SelectionKey register(Selector sel, int ops, Object att) *这个函数和上一个的意义同样,多出来的att参数会做为attachment被存放在返回的SelectionKey中,这在须要存放一些session state的时候很是有用。 *Selector定义了4个静态常量来表示4种IO操做,这些常量能够进行位操做组合成一个bit mask。 *int OP_ACCEPT : 有新的网络链接能够accept,ServerSocketChannel支持这一异步IO。 *int OP_CONNECT: 表明链接已经创建(或出错),SocketChannel支持这一异步IO。 */ SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); // 绑定attachment } /* Alternatively, use explicit SPI provider: SelectorProvider p = SelectorProvider.provider(); selector = p.openSelector(); serverSocket = p.openServerSocketChannel(); */ public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { /** *在一个Selector中,有3个SelectionKey的集合: *1. key set表明了全部注册在这个Selector上的channel,这个集合能够经过keys()方法拿到。 *2. Selected-key set表明了全部经过select()方法监测到能够进行IO操做的channel,这个集合能够经过selectedKeys()拿到。 *3. Cancelled-key set表明了已经cancel了注册关系的channel,在下一个select()操做中,这些channel对应的SelectionKey会从key set和cancelled-key set中移走。这个集合没法直接访问。 */ //监控全部注册的channel,当其中有注册的IO操做能够进行时,该函数返回, //并将对应的SelectionKey加入selected-key set。 selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey)(it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable)(k.attachment()); if (r != null) r.run(); } class Acceptor implements Runnable { // inner public void run() { try { //SocketChannel accept() :接受一个链接,返回表明这个链接的SocketChannel对象。 SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); }catch(IOException ex) { /* ... */ } } } } final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now /** * SocketChannel * 支持异步操做,对应于java.net.Socket这个类,提供了TCP协议IO接口, * 支持OP_CONNECT,OP_READ和OP_WRITE操做。 */ // 为毛要拆成三句?而不是sk = socket.register(sel, SelectionKey.OP_READ, this) sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); // 使一个还未返回的select()操做马上返回。 sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ } public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { socket.write(output); // void cancel() : cancel这个SelectionKey所对应的注册关系。 if (outputIsComplete()) sk.cancel(); } } /** * 下面是变种 */ /** * =========变种============= * GoF State-Object pattern * 状态模式,适用于"状态切换"的情景 * 例子:http://www.jdon.com/designpatterns/designpattern_State.htm * */ class Handler { // ... public void run() { // initial state is reader socket.read(input); if (inputIsComplete()) { process(); sk.attach(new Sender()); sk.interest(SelectionKey.OP_WRITE); sk.selector().wakeup(); } } class Sender implements Runnable { public void run(){ // ... socket.write(output); if (outputIsComplete()) sk.cancel(); } } } /** * =========变种============= * Handler with Thread Pool * */ class Handler implements Runnable { // uses util.concurrent thread pool static PooledExecutor pool = new PooledExecutor(...); static final int PROCESSING = 3; // ... synchronized void read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } } /** * =========变种============= * Multiple Reactor Threads * */ //Use to match CPU and IO rates //Static or dynamic construction //" Each with own Selector, Thread, dispatch loop //Main acceptor distributes to other reactors Selector[] selectors; // also create threads int next = 0; class Acceptor { // ... public synchronized void run() { ... Socket connection = serverSocket.accept(); if (connection != null) new Handler(selectors[next], connection); if (++next == selectors.length) next = 0; } }