Scalable IO in Java 的简单解读

<!-- lang: java -->
	class Reactor implements Runnable {
	final Selector selector;
	//ServerSocketChannel
	//支持异步操做,对应于java.net.ServerSocket这个类,提供了TCP协议IO接口,支持OP_ACCEPT操做。
	final ServerSocketChannel serverSocket;
	
	Reactor(int port) throws IOException {
		selector = Selector.open();  //建立实例
		serverSocket = ServerSocketChannel.open();
		serverSocket.socket().bind(new InetSocketAddress(port));
		// 全部channel建立的时候都是blocking模式,
		// 只有non-blocking的SelectableChannel才能够参与异步IO操做。
		serverSocket.configureBlocking(false); //设置non-blocking模式。
		/**
		*SelectionKey register(Selector sel, int ops)
		*将当前channel注册到一个Selector上并返回对应的SelectionKey。
		*在这之后,经过调用Selector的select()函数就能够监控这个channel。ops这个参数是一个bit mask,表明了须要监控的IO操做。
		*SelectionKey register(Selector sel, int ops, Object att)
		*这个函数和上一个的意义同样,多出来的att参数会做为attachment被存放在返回的SelectionKey中,这在须要存放一些session state的时候很是有用。
		*Selector定义了4个静态常量来表示4种IO操做,这些常量能够进行位操做组合成一个bit mask。
		*int OP_ACCEPT : 有新的网络链接能够accept,ServerSocketChannel支持这一异步IO。
		*int OP_CONNECT: 表明链接已经创建(或出错),SocketChannel支持这一异步IO。
		*/
		SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
		sk.attach(new Acceptor());	// 绑定attachment
	}
	/*
	Alternatively, use explicit SPI provider:
	SelectorProvider p = SelectorProvider.provider();
	selector = p.openSelector();
	serverSocket = p.openServerSocketChannel();
	*/


	public void run() { // normally in a new Thread
		try {
			while (!Thread.interrupted()) {
				/**
				*在一个Selector中,有3个SelectionKey的集合:
				*1. key set表明了全部注册在这个Selector上的channel,这个集合能够经过keys()方法拿到。
				*2. Selected-key set表明了全部经过select()方法监测到能够进行IO操做的channel,这个集合能够经过selectedKeys()拿到。
				*3. Cancelled-key set表明了已经cancel了注册关系的channel,在下一个select()操做中,这些channel对应的SelectionKey会从key set和cancelled-key set中移走。这个集合没法直接访问。
				*/
				//监控全部注册的channel,当其中有注册的IO操做能够进行时,该函数返回,
				//并将对应的SelectionKey加入selected-key set。
				selector.select();	
				Set selected = selector.selectedKeys();
				Iterator it = selected.iterator();
				while (it.hasNext())
					dispatch((SelectionKey)(it.next());
				selected.clear();
			}
		} catch (IOException ex) { /* ... */ }
	}
	
	void dispatch(SelectionKey k) {
		Runnable r = (Runnable)(k.attachment());
		if (r != null)
			r.run();
	}


	class Acceptor implements Runnable { // inner
		public void run() {
			try {
				//SocketChannel accept() :接受一个链接,返回表明这个链接的SocketChannel对象。
				SocketChannel c = serverSocket.accept();
				if (c != null)
					new Handler(selector, c);
			}catch(IOException ex) { /* ... */ }
		}
	}
}


final class Handler implements Runnable {
	final SocketChannel socket;
	final SelectionKey sk;
	ByteBuffer input = ByteBuffer.allocate(MAXIN);
	ByteBuffer output = ByteBuffer.allocate(MAXOUT);
	static final int READING = 0, SENDING = 1;
	int state = READING;
	
	Handler(Selector sel, SocketChannel c) throws IOException {
		socket = c; 
		c.configureBlocking(false);
		// Optionally try first read now
		/**
		* SocketChannel
		* 支持异步操做,对应于java.net.Socket这个类,提供了TCP协议IO接口,
		* 支持OP_CONNECT,OP_READ和OP_WRITE操做。
		*/
		// 为毛要拆成三句?而不是sk = socket.register(sel, SelectionKey.OP_READ, this)
		sk = socket.register(sel, 0);
		sk.attach(this);
		sk.interestOps(SelectionKey.OP_READ);
		// 使一个还未返回的select()操做马上返回。
		sel.wakeup();
	}
	
	boolean inputIsComplete() { /* ... */ }
	boolean outputIsComplete() { /* ... */ }
	void process() { /* ... */ }
	
	
	public void run() {
		try {
			if (state == READING) read();
			else if (state == SENDING) send();
		} catch (IOException ex) { /* ... */ }
	}
	
	void read() throws IOException {
		socket.read(input);
		if (inputIsComplete()) {
			process();
			state = SENDING;
			// Normally also do first write now
			sk.interestOps(SelectionKey.OP_WRITE);
		}
	}
	void send() throws IOException {
		socket.write(output);
		// void cancel() : cancel这个SelectionKey所对应的注册关系。
		if (outputIsComplete()) sk.cancel();
	}
}


/**
* 下面是变种
*/

/**
 * =========变种=============
 * GoF State-Object pattern 
 * 状态模式,适用于"状态切换"的情景
 * 例子:http://www.jdon.com/designpatterns/designpattern_State.htm
 *
*/

class Handler { 
	// ...
	
	public void run() { // initial state is reader
		socket.read(input);
		if (inputIsComplete()) {
			process();
			sk.attach(new Sender());
			sk.interest(SelectionKey.OP_WRITE);
			sk.selector().wakeup();
		}
	}
	class Sender implements Runnable {
		public void run(){ // ...
			socket.write(output);
			if (outputIsComplete()) sk.cancel();
		}
	}
}


/**
 * =========变种=============
 * Handler with Thread Pool
 *
*/

class Handler implements Runnable {
	// uses util.concurrent thread pool
	static PooledExecutor pool = new PooledExecutor(...);
	static final int PROCESSING = 3;
	// ...
	synchronized void read() { // ...
		socket.read(input);
		if (inputIsComplete()) {
			state = PROCESSING;
			pool.execute(new Processer());
		}
	}
	synchronized void processAndHandOff() {
		process();
		state = SENDING; // or rebind attachment
		sk.interest(SelectionKey.OP_WRITE);
	}
	class Processer implements Runnable {
		public void run() { processAndHandOff(); }
	}
}


/**
 * =========变种=============
 * Multiple Reactor Threads
 *
*/

	//Use to match CPU and IO rates
	//Static or dynamic construction
	//" Each with own Selector, Thread, dispatch loop
	//Main acceptor distributes to other reactors

	Selector[] selectors; // also create threads
	int next = 0;
	class Acceptor { // ...
		public synchronized void run() { ...
			Socket connection = serverSocket.accept();
			if (connection != null)
			new Handler(selectors[next], connection);
			if (++next == selectors.length) next = 0;
		}
	}
相关文章
相关标签/搜索