上一遍说了nio的 channel 和buffer 进行读写,这一遍整理一下 nio实现非阻塞式sooket的通讯。java
先看看传统的io 和socket实现tcp的通讯。安全
服务端代码。服务器
package com.cn.socket; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * Created by ZC-16-012 on 2018/11/1. * socket 服务端 */ public class MyServer { //将arrayList包装成线程安全的list public static List<Socket> socketList= Collections.synchronizedList(new ArrayList<>()); public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(30000); while (true) { //阻塞式,若是客户端一直没有链接该服务端,该方法将一直阻塞下去 Socket s = ss.accept(); socketList.add(s); //每当客户端链接成功后启动一个线程为该客户端服务 new Thread(new ServerThread(s), "服务端").start(); } } }
package com.cn.socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.Socket; /** * Created by ZC-16-012 on 2018/11/1. * 服务端的线程,负责读客户端的数据 */ public class ServerThread implements Runnable { private Socket s =null; private BufferedReader br; public ServerThread(Socket s) { this.s = s; try { //初始化该线程的socket的输入流,读客户端的数据 br= new BufferedReader(new InputStreamReader(s.getInputStream())); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { String content = null; while ((content = readFromClient()) != null) { for (Socket s: MyServer.socketList){ try { //将socket中读到的数据再输出给客户端 PrintStream ps= new PrintStream(s.getOutputStream()); System.out.println("服务端读到的客户端数据="+content); ps.println(content); } catch (IOException e) { e.printStackTrace(); } } } } private String readFromClient(){ try { return br.readLine(); } catch (IOException e) { //若是捕获到异常,说客户端的socket已经关闭 MyServer.socketList.remove(s); e.printStackTrace(); } return null; } }
客户端代码socket
package com.cn.socket.client; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.Socket; /** * Created by ZC-16-012 on 2018/11/1. */ public class MyClient { public static void main(String[] args) throws IOException { Socket s= new Socket("127.0.0.1",30000); new Thread(new ClientThread(s),"客户端").start(); //获取该socket输出流 PrintStream ps= new PrintStream(s.getOutputStream()); String line=null; BufferedReader br= new BufferedReader(new InputStreamReader(System.in)); while ((line=br.readLine())!=null){ ps.println(line); } } }
package com.cn.socket.client; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; /** * Created by ZC-16-012 on 2018/11/1. * * 客户端的线程,负责读取服务端输出的数据,也就是输入流 */ public class ClientThread implements Runnable { Socket s; BufferedReader br; public ClientThread(Socket s) throws IOException { this.s = s; br= new BufferedReader(new InputStreamReader(s.getInputStream())); } @Override public void run() { String content= null; try { while ((content=br.readLine())!=null){ System.out.print(content); } } catch (IOException e) { e.printStackTrace(); } } }
传统的阻塞式socket主要用的对象是serverSocket和 socket对象,其中如上代码所示,serverSocket.accpet()方法是阻塞式的,假如没有客户端链接,该方法一直阻塞在这里。tcp
而后看一下非阻塞式的socket通讯ide
服务端代码:this
package com.cn.socket.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; /** * Created by ZC-16-012 on 2018/11/1. */ public class NioServer { private Selector selector = null; private Charset charset= Charset.forName("UTF-8"); /** * 服务器上的全部channel(包括ServerSocketChannel和SocketChannel)都须要向selector注册, * selector负责监视这些socket的io状态,当其中任意一个或者多个channel具备可用的IO操做时, * 该selector的select()方法会返回大于0的整数。当selector上全部的channel没有须要处理的io时, * 则该selector的select()方法会阻塞 * */ public void init() throws IOException { //开启一个selector注册中心 selector = Selector.open(); //开启一个serverSocketChannel,相似于传统socket中的serverSocket ServerSocketChannel server = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000); server.bind(address); //设置socketServer以非阻塞式方式,默认是阻塞模式 server.configureBlocking(false); //将socketChannel注册到selector中,ServerSocketChannel只支持OP_ACCEPT操做 server.register(selector, SelectionKey.OP_ACCEPT); //select()监控全部注册的channel,当他们中间有须要处理的io时,将对应的SelectionKey加入被选择的selectedKey集合中 //并返回该chanel的数量 while (selector.select() > 0) { //selectedKeys()获取全部被选择的channel,SelectionKey表明全部selectableChannel和selector注册关系 for (SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); //判断该key是否有对应的客户端链接 if (sk.isAcceptable()) { //调用accept()接收链接,产生服务端的SocketChannel SocketChannel sc = server.accept(); sc.configureBlocking(false); //注册到selector监控中心 sc.register(selector, SelectionKey.OP_READ); //将sk对应的channel设置成准备接收其余请求 sk.interestOps(SelectionKey.OP_ACCEPT); } //判断服务端是否有数据可读,也就是sk对应的channel if (sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = ""; try { while ((sc.read(buff)) > 0) { buff.flip(); //将子节解码成字符 content += charset.decode(buff); System.out.println("服务端读取数据:" + content); //将sk对应的channel设置成准备下一读取 sk.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); //若是读取的出现异常,说明该channel对应的client出现问题 sk.cancel(); if (sk.channel() != null) { sk.channel().close(); } } if (content.length() > 0) { for (SelectionKey key : selector.keys()) { Channel channel = key.channel(); if (channel instanceof SocketChannel) { SocketChannel socketChannel = (SocketChannel) channel; socketChannel.write(charset.encode(content)); } } } } } } } public static void main(String[] args){ try { new NioServer().init(); } catch (IOException e) { e.printStackTrace(); } } }
客户端代码:编码
package com.cn.socket.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Scanner; /** * Created by ZC-16-012 on 2018/11/5. * nio 客户端 */ public class NioClient { private Selector selector = null; private Charset charset = Charset.forName("UTF-8"); private SocketChannel sc = null; public void init() throws IOException { selector = Selector.open(); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000); sc = SocketChannel.open(address); //将该socket以非阻塞式方式工做 sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); //开启客户端读取数据线程 new ClientThread().start(); //键盘输入,客户端写的数据 Scanner scan = new Scanner(System.in); while (scan.hasNextLine()) { String content = scan.nextLine(); //编码 写出去 sc.write(charset.encode(content)); } } public static void main(String[] args){ try { new NioClient().init(); } catch (IOException e) { e.printStackTrace(); } } private class ClientThread extends Thread{ @Override public void run() { try { while (selector.select()>0){ for (SelectionKey sk:selector.selectedKeys()){ //删除正在处理的SelectionKey selector.selectedKeys().remove(sk); //若是sk对应的channel有可读的数据 if (sk.isReadable()){ SocketChannel sc= (SocketChannel) sk.channel(); ByteBuffer buff= ByteBuffer.allocate(1024); String content=""; while (sc.read(buff)>0){ sc.read(buff); buff.flip(); content+=charset.decode(buff); } System.out.println("聊天信息:" + content); //为下一次读取作准备 sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException e) { e.printStackTrace(); } } } }
这里用到selector注册中心以及channel的概念。利用nio的 charset 编码解码,buffer读取数据。.net