服务端对于每一个到达的客户端都从新开启一个线程专门处理它们之间的交互。 这种交互在逻辑上是客户端与服务端直接进行通讯。 随着高并发的场景到来,服务器处理上下文切换,建立和销毁线程的代价,将会让服务器不堪重负。java
Channel --> 区别于单向的InputStream/OutputStream,它是双向的。 Selector --> 主要的控制器 Buffer --> 读写两种模式能够经过flip切换。编程
ServerSocketChannel先建立,后绑定一个端口。 设置为非阻塞模式。 将channel注册到selector上,监听链接事件。 开始循环等待新接入的链接。安全
循环内: 每次调用selector.select()将会阻塞地等待至少一个channel准备就绪,返回准备就绪地channel数量。 若是数量为零,开始下一轮select(); 数量不为零,则将这些准备就绪地channel取出来。 根据这些channel对应的当初向selector注册的类型(accept/read),执行对应的业务逻辑。服务器
public void start() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8000));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动成功!");
for (;;) {
int readyChannels = selector.select();
if (readyChannels == 0) continue;
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
if (selectionKey.isAcceptable()) {
acceptHandler(serverSocketChannel, selector);
}
if (selectionKey.isReadable()) {
readHandler(selectionKey, selector);
}
}
}
}
复制代码
创建链接成功后,设置非阻塞模式,而且将这个刚刚创建的channel,注册到服务端的Selector。并发
private void acceptHandler(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
socketChannel.write(Charset.forName("UTF-8")
.encode("你与聊天室里其余人都不是朋友关系,请注意隐私安全"));
}
复制代码
private void readHandler(SelectionKey selectionKey, Selector selector) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String request = "";
while (socketChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
request += Charset.forName("UTF-8").decode(byteBuffer);
}
// 将channel再次注册到selector上,监听他的可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
if (request.length() > 0) {
broadCast(selector, socketChannel, request);
}
}
复制代码
private void broadCast(Selector selector, SocketChannel sourceChannel, String request) {
// 获取到全部已经接入的客户端channel
Set<SelectionKey> selectionKeySet = selector.keys();
selectionKeySet.forEach(selectionKey -> {
Channel targetChannel = selectionKey.channel();
// 剔除发消息的客户端
if (targetChannel instanceof SocketChannel
&& targetChannel != sourceChannel) {
try {
// 将信息发送到targetChannel客户端
((SocketChannel) targetChannel).write(
Charset.forName("UTF-8").encode(request));
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
复制代码
public void start(String nickname) throws IOException {
SocketChannel socketChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 8000));
Selector selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
//开启一个线程专门处理服务端发来的消息
new Thread(new NioClientHandler(selector)).start();
//向服务端发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String request = scanner.nextLine();
if (request != null && request.length() > 0) {
socketChannel.write(
Charset.forName("UTF-8")
.encode(nickname + " : " + request));
}
}
}
复制代码
与服务端的start方法相似。 但客户端的Selector只注册了一个读事件的SocketChannel。 所以该Selector,实际上就只是不断地监听服务端有没有消息传过来。 若是有消息传来那么该Selector中绑定的这个惟一的channel就会编程已经就绪的状态,将会执行它的readHadler()。 因此客户端使用NIO和BIO的性能影响差异不大。socket
@Override
public void run() {
try {
for (;;) {
int readyChannels = selector.select();
if (readyChannels == 0) continue;
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
if (selectionKey.isReadable()) {
readHandler(selectionKey, selector);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
复制代码
readHandleride
private void readHandler(SelectionKey selectionKey, Selector selector) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String response = "";
while (socketChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
response += Charset.forName("UTF-8").decode(byteBuffer);
}
socketChannel.register(selector, SelectionKey.OP_READ);
if (response.length() > 0) {
System.out.println(response);
}
}
复制代码