BIO:在Java1.4以前,咱们创建网络链接均使用BIO,属于同步阻塞IO。默认状况下,当有一条请求接入就有一条线程专门接待。因此,在客户端向服务端请求时,会询问是否有空闲线程进行接待,如若没有则一直等待或拒接。当并发量小时还能够接受,当请求量一多起来则会有许多线程生成,在Java中,多线程的上下文切换会消耗计算机有限的资源和性能,形成资源浪费。java
NIO:NIO的出现是为了解决再BIO下的大并发量问题。其特色是能用一条线程管理全部链接。以下图所示: express
NIO是面向缓冲流的,即数据写入写出都是经过 Channel —— Buffer 这一途径。(双向流通)api
AIO:与以前两个IO模型不一样的是,AIO属于异步非阻塞模型。当进行读写操做时只须调用api的read方法和write方法,这两种方法均是异步。对于读方法来讲,当有流可读取时,操做系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操做而言,当操做系统将write方法传递的流写入完毕时,操做系统主动通知应用程序。换言之就是当调用完api后,操做系统完成后会调用回调函数。数组
总结:通常IO分为同步阻塞模型(BIO),同步非阻塞模型(NIO),异步阻塞模型,异步非阻塞模型(AIO)bash
同步阻塞模型指的是当调用io操做时必须等到其io操做结束服务器
同步非阻塞模型指当调用io操做时没必要等待能够继续干其余事,但必须不断询问io操做是否完成。网络
异步阻塞模型指应用调用io操做后,由操做系统完成io操做,但应用必须等待或去询问操做系统是否完成。多线程
异步非阻塞指应用调用io操做后,由操做系统完成io操做并调用回调函数,应用完成放手无论。并发
首先,先看下服务端的大致代码异步
public class ServerHandle implements Runnable{
//带参数构造函数
public ServerHandle(int port){
}
//中止方法
public void shop(){
}
//写方法
private void write(SocketChannel socketChannel, String response)throws IOException{
}
//当有链接进来时的处理方法
private void handleInput(SelectionKey key) throws IOException{
}
//服务端运行主体方法
@Override
public void run() {
}
}
复制代码
首先咱们先看看该服务端的构造函数的实现:
public ServerHandle(int port){
try {
//建立选择器
selector = Selector.open();
//打开监听通道
serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//传入端口,并设定链接队列最大为1024
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//监听客户端请求
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记启动标志
started = true;
System.out.println("服务器已启动,端口号为:" + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
复制代码
在这里建立了选择器和监听通道,并将该监听通道注册到选择器上并选择其感兴趣的事件(accept)。后续其余接入的链接都将经过该 监听通道 传入。
而后就是写方法的实现:
private void doWrite(SocketChannel channel, String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
wirteBuffer.put(bytes);
//将写模式改成读模式
wirteBuffer.flip();
//写入管道
channel.write(wirteBuffer);
}
复制代码
其次是当由事件传入时,即对链接进来的连接的处理方法
private void handleInput(SelectionKey key) throws IOException{
//当该键可用时
if (key.isValid()){
if (key.isAcceptable()){
//返回该密钥建立的通道。
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
经过该通道获取连接进来的通道
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()){
//返回该密钥建立的通道。
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0){
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服务器收到的信息:" + expression);
//此处是为了区别打印在工做台上的数据是由客户端产生仍是服务端产生
doWrite(socketChannel, "+++++" + expression + "+++++");
} else if(readBytes == 0){
//无数据,忽略
}else if (readBytes < 0){
//资源关闭
key.cancel();
socketChannel.close();
}
}
}
}
复制代码
这里要说明的是,只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发生。 如在构造方法中有一通道serverSocketChannel注册了accept事件。当其就绪时就能够经过调用selector的selectorKeys()方法,访问”已选择键集“中的就绪通道。
压轴方法:
@Override
public void run() {
//循环遍历
while (started) {
try {
//当没有就绪事件时阻塞
selector.select();
//返回就绪通道的键
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
//获取后必须移除,不然会陷入死循环
iterator.remove();
try {
//对就绪通道的处理方法,上述有描述
handleInput(key);
} catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
}
复制代码
此方法为服务端的主体方法。大体流程以下:
public class ClientHandle implements Runnable{
//构造函数,构造时顺便绑定
public ClientHandle(String ip, int port){
}
//处理就绪通道
private void handleInput(SelectionKey key) throws IOException{
}
//写方法(与服务端的写方法一致)
private void doWrite(SocketChannel channel,String request) throws IOException{
}
//链接到服务端
private void doConnect() throws IOException{
}
//发送信息
public void sendMsg(String msg) throws Exception{
}
}
复制代码
首先先看构造函数的实现:
public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//建立选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//若是为 true,则此通道将被置于阻塞模式;若是为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
复制代码
接下来看对就绪通道的处理办法:
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
//这里的做用将在后面的代码(doConnect方法)说明
if(sc.finishConnect()){
System.out.println("已链接事件");
}
else{
System.exit(1);
}
}
//读消息
if(key.isReadable()){
//建立ByteBuffer,并开辟一个1k的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
buffer.flip();
//根据缓冲区可读字节数建立字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}lse if(readBytes==0){
//忽略
}else if(readBytes<0){
//链路已经关闭,释放资源
key.cancel();
sc.close();
}
}
}
}
复制代码
在run方法以前需先看下此方法的实现:
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port))){
System.out.println("connect");
}
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("register");
}
}
复制代码
当SocketChannel工做于非阻塞模式下时,调用connect()时会当即返回: 若是链接创建成功则返回的是true(好比链接localhost时,能当即创建起链接),不然返回false。
在非阻塞模式下,返回false后,必需要在随后的某个地方调用finishConnect()方法完成链接。 当SocketChannel处于阻塞模式下时,调用connect()时会进入阻塞,直至链接创建成功或者发生IO错误时,才从阻塞状态中退出。
因此该代码在connect服务端后返回false(但仍是有做用的),并在else语句将该通道注册在选择器上并选择connect事件。
客户端的run方法:
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key ;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null){
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
发送信息到服务端的方法:
public void sendMsg(String msg) throws Exception{
//覆盖其以前感兴趣的事件(connect),将其更改成OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
复制代码
服务端:
/**
* Created by innoyiya on 2018/8/20.
*/
public class Service {
private static int DEFAULT_POST = 12345;
private static ServerHandle serverHandle;
public static void start(){
start(DEFAULT_POST);
}
public static synchronized void start(int post) {
if (serverHandle != null){
serverHandle.shop();
}
serverHandle = new ServerHandle(post);
new Thread(serverHandle,"server").start();
}
}
复制代码
服务端主体:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Created by innoyiya on 2018/8/20.
*/
public class ServerHandle implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean started;
public ServerHandle(int port){
try {
//建立选择器
selector = Selector.open();
//打开监听通道
serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//断定端口,并设定链接队列最大为1024
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//监听客户端请求
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记启动标志
started = true;
System.out.println("服务器已启动,端口号为:" + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void shop(){
started = false;
}
private void doWrite(SocketChannel channel, String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
wirteBuffer.put(bytes);
wirteBuffer.flip();
channel.write(wirteBuffer);
}
private void handleInput(SelectionKey key) throws IOException{
if (key.isValid()){
if (key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0){
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服务器收到的信息:" + expression);
doWrite(socketChannel, "+++++" + expression + "+++++");
} else if (readBytes < 0){
key.cancel();
socketChannel.close();
}
}
}
}
@Override
public void run() {
//循环遍历
while (started) {
try {
selector.select();
//System.out.println(selector.select());
Set<SelectionKey> keys = selector.selectedKeys();
//System.out.println(keys.size());
Iterator<SelectionKey> iterator = keys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
}
}
复制代码
客户端:
/**
* Created by innoyiya on 2018/8/20.
*/
public class Client {
private static String DEFAULT_HOST = "localhost";
private static int DEFAULT_PORT = 12345;
private static ClientHandle clientHandle;
private static final String EXIT = "exit";
public static void start() {
start(DEFAULT_HOST, DEFAULT_PORT);
}
public static synchronized void start(String ip, int port) {
if (clientHandle != null){
clientHandle.stop();
}
clientHandle = new ClientHandle(ip, port);
new Thread(clientHandle, "Server").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception {
if (msg.equals(EXIT)){
return false;
}
clientHandle.sendMsg(msg);
return true;
}
}
复制代码
客户端主体代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Created by innoyiya on 2018/8/20.
*/
public class ClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;
public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//建立选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//若是为 true,则此通道将被置于阻塞模式;若是为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
System.out.println("已链接事件");
}
else{
System.exit(1);
}
}
//读消息
if(key.isReadable()){
//建立ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操做
buffer.flip();
//根据缓冲区可读字节数建立字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
} else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private void doWrite(SocketChannel channel,String request) throws IOException{
byte[] bytes = request.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
//flip操做
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port))){
System.out.println("connect");
}
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("register");
}
}
public void sendMsg(String msg) throws Exception{
//覆盖其以前感兴趣的事件,将其更改成OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key ;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null){
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
复制代码
测试类:
import java.util.Scanner;
/**
* Created by innoyiya on 2018/8/20.
*/
public class Test {
public static void main(String[] args) throws Exception {
Service.start();
Thread.sleep(1000);
Client.start();
while(Client.sendMsg(new Scanner(System.in).nextLine()));
}
}
复制代码
控制台打印:
服务器已启动,端口号为:12345
register
已链接事件
1234
服务器收到的信息:1234
客户端收到消息:+++++1234+++++
5678
服务器收到的信息:5678
客户端收到消息:+++++5678+++++
复制代码
若有不妥之处,请告诉我。