知识点:java
学习NIO咱们先了解前置概念:
1)阻塞和非阻塞
阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候
阻塞:每每须要等待缓冲区中的数据准备好事后才处理其余事情,不然就一直等待。 非阻塞:当咱们的进程范文咱们的数据缓冲区的编程
2)同步 异步区别
基于应用程序和操做系统处理IO事件采起的方式来区分:
异步:同一时刻能够处理多个io读写,应用程序等待操做系统通知
同步:同一时间只能处理一条io读写,应用程序直接参与io读写数组
咱们接着看下图bash
咱们正式开始学习NIO服务器
Java NIO 是 java 1.4, 以后新出的一套IO接口NIO中的N能够理解为Non-blocking,有些人会认为是new,其实也没错。 BIO(Block IO)和Nio(Non-Block IO)的对比网络
Nio主要用到的是块,因此nio效率比io高。
JavaAPI中有俩套nio:
1)针对标准输入输出nio
2)网络编程nio
Io以流的形式处理数据,nio以块的形式处理数据。面向流的io一次处理一个字节,一个输入流产生了一个字节,一个输出流就消费一个字节。
面向块的io,每一个操做都在一步中产生或者消费一个数据块。
它读取数据方式和写数据的方式否必需要经过通道来操做缓冲区实现。
核心组件包括 Channels Buffers Selectorssession
1) Buffer介绍: 缓冲区,本质就是一个数组,可是它是特殊的数组,缓冲区对象内置了一些机制,可以追踪和记录缓冲区的状态变化状况,若是咱们使用get方法从缓冲区中获取数据或者用put方法吧数据写入缓冲区,都会引发缓冲区的状态变化
在缓冲区中,最重要的属性是以下三个,他们一块儿合做完成了对缓冲区内容状态的变化跟踪
1)position:指定了下一个将要被写入或者读取的元素索引,它的值由get()/put() 方法自动更新,在新建立一个Buffer对象时,position被初始化为0
2)limit:操做缓冲区的可操做空间和可操做范围,指定还有多少数据须要去除,或者还有多少空间能够放入数据
3)capacity:指定了能够存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许咱们使用的底层数组的容量。多线程
以上三个属性值之间有一些相对的大小的关系:0<=position<=limit<=capacity
若是咱们建立了一个新的容量为10的bytebuffer对象,在初始化的时候。position设置为0,limit和capacity被设置为10,在之后使用bytebuffer 对象过程当中,capacity的值不会再发生变化,而其余俩个值会顺着使用而变化 以下图:app
如今咱们能够从通道中读取一些数据到缓冲区,注意从通道读取数据,至关于往缓冲区中写入数据。若是读取四个本身的数据,则此时的position为4,即下一个将要被写入的字节索引为4,而limit依旧是10异步
package com.Allen.buffer;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class testBufferDemo01 {
public static void main(String[] args) throws IOException {
String fileURL="F://a.txt";
FileInputStream fis=new FileInputStream(fileURL);
//获取通路
FileChannel channel=fis.getChannel();
//定义缓冲区大小
ByteBuffer buffer=ByteBuffer.allocate(10);
output("init", buffer);
//先读
channel.read(buffer);
output("read", buffer);
buffer.flip();
output("flip", buffer);
while (buffer.hasRemaining()) {
byte b=buffer.get();
}
output("get", buffer);
buffer.clear();
output("clear", buffer);
fis.close();
}
public static void output(String string,ByteBuffer buffer){
System.out.println(string);
System.out.println(buffer.capacity()+":"+buffer.position()+":"+buffer.limit());
}
}
复制代码
结果
通道是个对象,经过它能够读取和写入数据,全部的数据都是经过buffer对象来处理。咱们永远不会把字节直接写入通道,相反是吧数据写入包含一个或者多个字节的缓冲区。一样不会直接读取字节,而是把数据从通道读入缓冲区,再从缓冲区获取这个字节,nio中提供了多种通道对象,而全部的通道对象都实现了channel接口。
使用nIo读取数据】
任什么时候候读取数据,都不是直接从通道中读取,而是从通道读取到缓冲区,因此使用NIO读取数据能够分红下面三个步骤
1)从FileInputStream获取Channel
2)建立Buffer
3)将数据从Channel 读取到Buffer中
下面就是一个nio读复制文件的实例
package com.allen.test;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class testNio {
public static void main(String[] args) throws IOException {
String oldFileUrl="E://1.txt";
String newFileUrl="E://2.txt";
FileInputStream fis=new FileInputStream(oldFileUrl);
FileChannel inChannel=fis.getChannel();
ByteBuffer bf=ByteBuffer.allocate(1024);
FileOutputStream fos=new FileOutputStream(newFileUrl);
FileChannel outChannel=fos.getChannel();
while(true){
int eof=inChannel.read(bf);
if(eof==-1){
break;
}else{
bf.flip();
outChannel.write(bf);
bf.clear();
}
}
inChannel.close();
fis.close();
outChannel.close();
fos.close();
}
}
复制代码
Selector 通常称 为选择器 ,固然你也能够翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此能够实现单线程管理多个channels,也就是能够管理多个网络连接。 使用Selector的好处在于: 使用更少的线程来就能够来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。
有了selector,能够用一个线程处理全部的channel。线程之间的切换对操做系统来讲,代建是很高的,而且每一个线程也会占用必定的系统资源,因此对于系统而言,线程越少越好(可是也不是绝对的,若cpu有多个内核,不使用多任务是在浪费CPU能力)
Selector selector=Selector.open();
注册channel到selector上
Channel.configureBlocking(false)
SelectionKey key=channel.register(selector,SelectionKey.OP_READ)
注册到server上的channel必须设置成异步模式,不然异步io没法工做,这就意味着咱们不能够把一个Filechannel注册到selector,由于filechannel没有异步模式,可是socketchannel有异步模式
Register方法的第二个参数,它是一个interst set ,意思是注册的selector对channel中的那些事务感兴趣。事件分红四种:read write connect accept,通道触发一个时间指该事件已经Read,全部某个channel成功链接到另外一个服务器称之为connect ready。一个serversocketchanel准备好接受新的链接称为connect ready。一个数据可读的通道能够说read ready。等待写数据的通道write ready。
Wirte:SelectionKey.OP_WRITE
Read:SelectionKey.OP_READ
Accept:SelectionKey.OP_ACCEPT
Connect:SelectionKey.OP_CONNECT
如果对多个事件感情求,能够写为(用or)
Int interest=SelectionKey.OP_READ|SelectionKey.OP_ACCEPT
SelectionKey表示通道在selector上这个注册,经过SelectionKey能够获得selector和注册的channel.selector感兴趣的事。 一旦向selector注册了一个或者多个通道,能够调用重载的select方法返回你所感兴趣的事件已经准备就绪的通道。
阻塞/IO通讯模型
服务器
package com.allen.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* 网络多客户端聊天室
* 功能1: 客户端经过Java NIO链接到服务端,支持多客户端的链接
* 功能2:客户端初次链接时,服务端提示输入昵称,若是昵称已经有人使用,提示从新输入,若是昵称惟一,则登陆成功,以后发送消息都须要按照规定格式带着昵称发送消息
* 功能3:客户端登陆后,发送已经设置好的欢迎信息和在线人数给客户端,而且通知其余客户端该客户端上线
* 功能4:服务器收到已登陆客户端输入内容,转发至其余登陆客户端。
*
* TODO 客户端下线检测
*/
public class NIOServer {
private int port = 8080;
private Charset charset = Charset.forName("UTF-8");
//用来记录在线人数,以及昵称
private static HashSet<String> users = new HashSet<String>();
private static String USER_EXIST = "系统提示:该昵称已经存在,请换一个昵称";
//至关于自定义协议格式,与客户端协商好
private static String USER_CONTENT_SPILIT = "#@#";
private Selector selector = null;
public NIOServer(int port) throws IOException{
this.port = port;
//要想富,先修路
//先把通道打开
ServerSocketChannel server = ServerSocketChannel.open();
//设置高速公路的关卡
server.bind(new InetSocketAddress(this.port));
server.configureBlocking(false);
//开门迎客,排队叫号大厅开始工做
selector = Selector.open();
//告诉服务叫号大厅的工做人员,你能够接待了(事件)
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务已启动,监听端口是:" + this.port);
}
public void listener() throws IOException{
//死循环,这里不会阻塞
//CPU工做频率可控了,是可控的固定值
while(true) {
//在轮询,咱们服务大厅中,到底有多少我的正在排队
int wait = selector.select();
if(wait == 0) continue; //若是没有人排队,进入下一次轮询
//取号,默认给他分配个号码(排队号码)
Set<SelectionKey> keys = selector.selectedKeys(); //能够经过这个方法,知道可用通道的集合
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
//处理一个,号码就要被消除,打发他走人(别在服务大厅占着茅坑不拉屎了)
//过号不候
iterator.remove();
//处理逻辑
process(key);
}
}
}
public void process(SelectionKey key) throws IOException {
//判断客户端肯定已经进入服务大厅而且已经能够实现交互了
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
//非阻塞模式
client.configureBlocking(false);
//注册选择器,并设置为读取模式,收到一个链接请求,而后起一个SocketChannel,并注册到selector上,以后这个链接的数据,就由这个SocketChannel处理
client.register(selector, SelectionKey.OP_READ);
//将此对应的channel设置为准备接受其余客户端请求
key.interestOps(SelectionKey.OP_ACCEPT);
// System.out.println("有客户端链接,IP地址为 :" + sc.getRemoteAddress());
client.write(charset.encode("请输入你的昵称"));
}
//处理来自客户端的数据读取请求
if(key.isReadable()){
//返回该SelectionKey对应的 Channel,其中有数据须要读取
SocketChannel client = (SocketChannel)key.channel();
//往缓冲区读数据
ByteBuffer buff = ByteBuffer.allocate(1024);
StringBuilder content = new StringBuilder();
try{
while(client.read(buff) > 0)
{
buff.flip();
content.append(charset.decode(buff));
}
// System.out.println("从IP地址为:" + sc.getRemoteAddress() + "的获取到消息: " + content);
//将此对应的channel设置为准备下一次接受数据
key.interestOps(SelectionKey.OP_READ);
}catch (IOException io){
key.cancel();
if(key.channel() != null)
{
key.channel().close();
}
}
if(content.length() > 0) {
String[] arrayContent = content.toString().split(USER_CONTENT_SPILIT);
//注册用户
if(arrayContent != null && arrayContent.length == 1) {
String nickName = arrayContent[0];
if(users.contains(nickName)) {
client.write(charset.encode(USER_EXIST));
} else {
users.add(nickName);
int onlineCount = onlineCount();
String message = "欢迎 " + nickName + " 进入聊天室! 当前在线人数:" + onlineCount;
broadCast(null, message);
}
}
//注册完了,发送消息
else if(arrayContent != null && arrayContent.length > 1) {
String nickName = arrayContent[0];
String message = content.substring(nickName.length() + USER_CONTENT_SPILIT.length());
message = nickName + "说 : " + message;
if(users.contains(nickName)) {
//不回发给发送此内容的客户端
broadCast(client, message);
}
}
}
}
}
//TODO 要是能检测下线,就不用这么统计了
public int onlineCount() {
int res = 0;
for(SelectionKey key : selector.keys()){
Channel target = key.channel();
if(target instanceof SocketChannel){
res++;
}
}
return res;
}
public void broadCast(SocketChannel client, String content) throws IOException {
//广播数据到全部的SocketChannel中
for(SelectionKey key : selector.keys()) {
Channel targetchannel = key.channel();
//若是client不为空,不回发给发送此内容的客户端
if(targetchannel instanceof SocketChannel && targetchannel != client) {
SocketChannel target = (SocketChannel)targetchannel;
target.write(charset.encode(content));
}
}
}
public static void main(String[] args) throws IOException {
new NIOServer(8080).listener();
}
}
复制代码
客户端
package com.allen.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class NIOClient {
private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 8080);
private Selector selector = null;
private SocketChannel client = null;
private String nickName = "";
private Charset charset = Charset.forName("UTF-8");
private static String USER_EXIST = "系统提示:该昵称已经存在,请换一个昵称";
private static String USER_CONTENT_SPILIT = "#@#";
public NIOClient() throws IOException{
//无论三七二十一,先把路修好,把关卡开放
//链接远程主机的IP和端口
client = SocketChannel.open(serverAdrress);
client.configureBlocking(false);
//开门接客
selector = Selector.open();
client.register(selector, SelectionKey.OP_READ);
}
public void session(){
//开辟一个新线程从服务器端读数据
new Reader().start();
//开辟一个新线程往服务器端写数据
new Writer().start();
}
private class Writer extends Thread{
@Override
public void run() {
try{
//在主线程中 从键盘读取数据输入到服务器端
Scanner scan = new Scanner(System.in);
while(scan.hasNextLine()){
String line = scan.nextLine();
if("".equals(line)) continue; //不容许发空消息
if("".equals(nickName)) {
nickName = line;
line = nickName + USER_CONTENT_SPILIT;
} else {
line = nickName + USER_CONTENT_SPILIT + line;
}
// client.register(selector, SelectionKey.OP_WRITE);
client.write(charset.encode(line));//client既能写也能读,这边是写
}
scan.close();
}catch(Exception e){
}
}
}
private class Reader extends Thread {
public void run() {
try {
//轮询
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys(); //能够经过这个方法,知道可用通道的集合
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = (SelectionKey) keyIterator.next();
keyIterator.remove();
process(key);
}
}
}
catch (IOException io){
}
}
private void process(SelectionKey key) throws IOException {
if(key.isReadable()){
//使用 NIO 读取 Channel中的数据,这个和全局变量client是同样的,由于只注册了一个SocketChannel
//client既能写也能读,这边是读
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
while(sc.read(buff) > 0)
{
buff.flip();
content += charset.decode(buff);
}
//若系统发送通知名字已经存在,则须要换个昵称
if(USER_EXIST.equals(content)) {
nickName = "";
}
System.out.println(content);
key.interestOps(SelectionKey.OP_READ);
}
}
}
public static void main(String[] args) throws IOException
{
new NIOClient().session();
}
}
复制代码