java nio的一点整理(二)

上一遍说了nio的 channel 和buffer 进行读写,这一遍整理一下 nio实现非阻塞式sooket的通讯。java

先看看传统的io 和socket实现tcp的通讯。安全

服务端代码。服务器

package com.cn.socket;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * Created by ZC-16-012 on 2018/11/1.
 * socket 服务端
 */
public class MyServer {
    //将arrayList包装成线程安全的list
    public static List<Socket> socketList= Collections.synchronizedList(new ArrayList<>());

    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(30000);

        while (true) {
            //阻塞式,若是客户端一直没有链接该服务端,该方法将一直阻塞下去
            Socket s = ss.accept();
            socketList.add(s);
            //每当客户端链接成功后启动一个线程为该客户端服务
            new Thread(new ServerThread(s), "服务端").start();
        }
    }
}
package com.cn.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

/**
 * Created by ZC-16-012 on 2018/11/1.
 * 服务端的线程,负责读客户端的数据
 */
public class ServerThread implements Runnable {

    private Socket s =null;

    private BufferedReader br;

    public ServerThread(Socket s) {
        this.s = s;
        try {
            //初始化该线程的socket的输入流,读客户端的数据
            br= new BufferedReader(new InputStreamReader(s.getInputStream()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        String content = null;
        while ((content = readFromClient()) != null) {
              for (Socket s: MyServer.socketList){
                  try {
                      //将socket中读到的数据再输出给客户端
                      PrintStream ps= new PrintStream(s.getOutputStream());
                      System.out.println("服务端读到的客户端数据="+content);
                      ps.println(content);
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
        }
    }

    private String readFromClient(){
        try {
            return br.readLine();
        } catch (IOException e) {
            //若是捕获到异常,说客户端的socket已经关闭
            MyServer.socketList.remove(s);
            e.printStackTrace();
        }
        return null;
    }
}

客户端代码socket

package com.cn.socket.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

/**
 * Created by ZC-16-012 on 2018/11/1.
 */
public class MyClient {

    public static void main(String[] args) throws IOException {
        Socket s= new Socket("127.0.0.1",30000);

        new Thread(new ClientThread(s),"客户端").start();

        //获取该socket输出流
        PrintStream ps= new PrintStream(s.getOutputStream());
        String line=null;
        BufferedReader br= new BufferedReader(new InputStreamReader(System.in));
        while ((line=br.readLine())!=null){
            ps.println(line);
        }
    }
}
package com.cn.socket.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

/**
 * Created by ZC-16-012 on 2018/11/1.
 *
 * 客户端的线程,负责读取服务端输出的数据,也就是输入流
 */
public class ClientThread implements Runnable {
    Socket s;

    BufferedReader br;

    public ClientThread(Socket s) throws IOException {
        this.s = s;
        br= new BufferedReader(new InputStreamReader(s.getInputStream()));
    }

    @Override
    public void run() {
         String content= null;
        try {
            while ((content=br.readLine())!=null){
                System.out.print(content);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

传统的阻塞式socket主要用的对象是serverSocket和 socket对象,其中如上代码所示,serverSocket.accpet()方法是阻塞式的,假如没有客户端链接,该方法一直阻塞在这里。tcp

而后看一下非阻塞式的socket通讯ide

服务端代码:this

package com.cn.socket.nio;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;

/**
 * Created by ZC-16-012 on 2018/11/1.
 */
public class NioServer {

    private  Selector selector = null;

    private Charset charset= Charset.forName("UTF-8");

    /**
    * 服务器上的全部channel(包括ServerSocketChannel和SocketChannel)都须要向selector注册,
     * selector负责监视这些socket的io状态,当其中任意一个或者多个channel具备可用的IO操做时,
     * 该selector的select()方法会返回大于0的整数。当selector上全部的channel没有须要处理的io时,
     * 则该selector的select()方法会阻塞
    * */
    public void init() throws IOException {
        //开启一个selector注册中心
        selector = Selector.open();
        //开启一个serverSocketChannel,相似于传统socket中的serverSocket
        ServerSocketChannel server = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000);
        server.bind(address);
        //设置socketServer以非阻塞式方式,默认是阻塞模式
        server.configureBlocking(false);
        //将socketChannel注册到selector中,ServerSocketChannel只支持OP_ACCEPT操做
        server.register(selector, SelectionKey.OP_ACCEPT);
        //select()监控全部注册的channel,当他们中间有须要处理的io时,将对应的SelectionKey加入被选择的selectedKey集合中
        //并返回该chanel的数量
        while (selector.select() > 0) {
            //selectedKeys()获取全部被选择的channel,SelectionKey表明全部selectableChannel和selector注册关系
            for (SelectionKey sk : selector.selectedKeys()) {
                selector.selectedKeys().remove(sk);
                //判断该key是否有对应的客户端链接
                if (sk.isAcceptable()) {
                    //调用accept()接收链接,产生服务端的SocketChannel
                    SocketChannel sc = server.accept();
                    sc.configureBlocking(false);
                    //注册到selector监控中心
                    sc.register(selector, SelectionKey.OP_READ);
                    //将sk对应的channel设置成准备接收其余请求
                    sk.interestOps(SelectionKey.OP_ACCEPT);
                }
                //判断服务端是否有数据可读,也就是sk对应的channel
                if (sk.isReadable()) {
                    SocketChannel sc = (SocketChannel) sk.channel();
                    ByteBuffer buff = ByteBuffer.allocate(1024);
                    String content = "";
                    try {
                        while ((sc.read(buff)) > 0) {
                            buff.flip();
                            //将子节解码成字符
                            content += charset.decode(buff);
                            System.out.println("服务端读取数据:" + content);
                            //将sk对应的channel设置成准备下一读取
                            sk.interestOps(SelectionKey.OP_READ);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        //若是读取的出现异常,说明该channel对应的client出现问题
                        sk.cancel();
                        if (sk.channel() != null) {
                            sk.channel().close();
                        }

                    }


                    if (content.length() > 0) {
                        for (SelectionKey key : selector.keys()) {
                            Channel channel = key.channel();
                            if (channel instanceof SocketChannel) {
                                SocketChannel socketChannel = (SocketChannel) channel;
                                socketChannel.write(charset.encode(content));
                            }
                        }
                    }
                }

            }
        }

    }

    public static void main(String[] args){
        try {
            new NioServer().init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端代码:编码

package com.cn.socket.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

/**
 * Created by ZC-16-012 on 2018/11/5.
 * nio 客户端
 */
public class NioClient {

    private Selector selector = null;

    private Charset charset = Charset.forName("UTF-8");

    private SocketChannel sc = null;


    public void init() throws IOException {
        selector = Selector.open();
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000);
        sc = SocketChannel.open(address);
        //将该socket以非阻塞式方式工做
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_READ);
        //开启客户端读取数据线程
        new ClientThread().start();
        //键盘输入,客户端写的数据
        Scanner scan = new Scanner(System.in);
        while (scan.hasNextLine()) {
            String content = scan.nextLine();
            //编码 写出去
            sc.write(charset.encode(content));
        }
    }

    public static void main(String[] args){
        try {
            new NioClient().init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    private class ClientThread extends Thread{
        @Override
        public void run() {
            try {
                while (selector.select()>0){
                    for (SelectionKey sk:selector.selectedKeys()){
                       //删除正在处理的SelectionKey
                        selector.selectedKeys().remove(sk);
                        //若是sk对应的channel有可读的数据
                        if (sk.isReadable()){
                           SocketChannel sc= (SocketChannel) sk.channel();
                            ByteBuffer buff= ByteBuffer.allocate(1024);
                            String content="";
                            while (sc.read(buff)>0){
                                sc.read(buff);
                                buff.flip();
                                content+=charset.decode(buff);
                            }
                            System.out.println("聊天信息:" + content);
                            //为下一次读取作准备
                            sk.interestOps(SelectionKey.OP_READ);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

这里用到selector注册中心以及channel的概念。利用nio的 charset 编码解码,buffer读取数据。.net

相关文章
相关标签/搜索