此篇文章会详细解读由BIO到NIO的逐步演进的心灵路程,为Reactor-Netty 库的讲解铺平道路。java
关于Java编程方法论-Reactor与Webflux
的视频分享,已经完成了Rxjava 与 Reactor,b站地址以下:linux
Rxjava源码解读与分享:www.bilibili.com/video/av345…编程
Reactor源码解读与分享:www.bilibili.com/video/av353…数组
咱们经过一个BIO的Demo来展现其用法:bash
//服务端
public class BIOServer {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服务端Socket
Socket socket = null;//客户端socket
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
serverSocket = new ServerSocket(port);
System.out.println(stringNowTime() + ": serverSocket started");
while(true)
{
socket = serverSocket.accept();
System.out.println(stringNowTime() + ": id为" + socket.hashCode()+ "的Clientsocket connected");
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " "+inputContent);
count++;
}
System.out.println("id为" + socket.hashCode()+ "的Clientsocket "+stringNowTime()+"读取结束");
}
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOServer server = new BIOServer();
server.initBIOServer(8888);
}
}
// 客户端
public class BIOClient {
public void initBIOClient(String host, int port) {
BufferedReader reader = null;
BufferedWriter writer = null;
Socket socket = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(System.in));
socket = new Socket(host, port);
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
System.out.println("clientSocket started: " + stringNowTime());
while (((inputContent = reader.readLine()) != null) && count < 2) {
inputContent = stringNowTime() + ": 第" + count + "条消息: " + inputContent + "\n";
writer.write(inputContent);//将消息发送给服务端
writer.flush();
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOClient client = new BIOClient();
client.initBIOClient("127.0.0.1", 8888);
}
}
复制代码
经过上面的例子,咱们能够知道,不管是服务端仍是客户端,咱们关注的几个操做有基于服务端的serverSocket = new ServerSocket(port)
serverSocket.accept()
,基于客户端的Socket socket = new Socket(host, port);
以及二者都有的读取与写入Socket数据的方式,即经过流来进行读写,这个读写难免经过一个中间字节数组buffer来进行。服务器
因而,咱们经过源码来看这些相应的逻辑。咱们先来看ServerSocket.java
这个类的相关代码。 咱们查看ServerSocket.java
的构造器能够知道,其最后依然会调用它的bind
方法:数据结构
//java.net.ServerSocket#ServerSocket(int)
public ServerSocket(int port) throws IOException {
this(port, 50, null);
}
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
复制代码
按照咱们的Demo和上面的源码可知,这里传入的参数endpoint并不会为null,同时,属于InetSocketAddress
类型,backlog大小为50,因而,咱们应该关注的主要代码逻辑也就是getImpl().bind(epoint.getAddress(), epoint.getPort());
:多线程
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
// 咱们应该关注的主要逻辑
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
复制代码
这里getImpl()
,由上面构造器的实现中,咱们有看到setImpl();
,可知,其factory
默认为null,因此,这里咱们关注的是SocksSocketImpl
这个类,建立其对象,并将当前ServerSocket
对象设定其中,这个设定的源码请在SocksSocketImpl
的父类java.net.SocketImpl
中查看。 那么getImpl也就明了了,其实就是咱们Socket的底层实现对应的实体类了,由于不一样的操做系统内核是不一样的,他们对于Socket的实现固然会各有不一样,咱们这点要注意下,这里针对的是win下面的系统。异步
/** * The factory for all server sockets. */
private static SocketImplFactory factory = null;
private void setImpl() {
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
impl = new SocksSocketImpl();
}
if (impl != null)
impl.setServerSocket(this);
}
/** * Get the {@code SocketImpl} attached to this socket, creating * it if necessary. * * @return the {@code SocketImpl} attached to that ServerSocket. * @throws SocketException if creation fails. * @since 1.4 */
SocketImpl getImpl() throws SocketException {
if (!created)
createImpl();
return impl;
}
/** * Creates the socket implementation. * * @throws IOException if creation fails * @since 1.4 */
void createImpl() throws SocketException {
if (impl == null)
setImpl();
try {
impl.create(true);
created = true;
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
复制代码
咱们再看SocksSocketImpl
的bind方法实现,而后获得其最后无非是调用本地方法bind0
。socket
//java.net.AbstractPlainSocketImpl#bind
/** * Binds the socket to the specified address of the specified local port. * @param address the address * @param lport the port */
protected synchronized void bind(InetAddress address, int lport) throws IOException {
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpBind(fd, address, lport);
}
}
socketBind(address, lport);
if (socket != null)
socket.setBound();
if (serverSocket != null)
serverSocket.setBound();
}
//java.net.PlainSocketImpl#socketBind
@Override
void socketBind(InetAddress address, int port) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (address == null)
throw new NullPointerException("inet address argument is null.");
if (preferIPv4Stack && !(address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
bind0(nativefd, address, port, useExclusiveBind);
if (port == 0) {
localport = localPort0(nativefd);
} else {
localport = port;
}
this.address = address;
}
//java.net.PlainSocketImpl#bind0
static native void bind0(int fd, InetAddress localAddress, int localport, boolean exclBind) throws IOException;
复制代码
这里,咱们还要了解的是,使用了多线程只是可以实现对"业务逻辑处理"的多线程,可是对于数据报文的接收仍是须要一个一个来的,也就是咱们上面Demo中见到的accept以及read方法阻塞问题,多线程是根本解决不了的,那么首先咱们来看看accept为何会形成阻塞,accept方法的做用是询问操做系统是否有新的Socket套接字信息从端口XXX处发送过来,注意这里询问的是操做系统,也就是说Socket套接字IO模式的支持是基于操做系统的,若是操做系统没有发现有套接字从指定端口XXX链接进来,那么操做系统就会等待,这样accept方法就会阻塞,他的内部实现使用的是操做系统级别的同步IO。
因而,咱们来分析下ServerSocket.accept
方法的源码过程:
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
复制代码
首先进行的是一些判断,接着建立了一个Socket对象(为何这里要建立一个Socket对象,后面会讲到),执行了implAccept方法,来看看implAccept方法:
/** * Subclasses of ServerSocket use this method to override accept() * to return their own subclass of socket. So a FooServerSocket * will typically hand this method an <i>empty</i> FooSocket. On * return from implAccept the FooSocket will be connected to a client. * * @param s the Socket * @throws java.nio.channels.IllegalBlockingModeException * if this socket has an associated channel, * and the channel is in non-blocking mode * @throws IOException if an I/O error occurs when waiting * for a connection. * @since 1.1 * @revised 1.4 * @spec JSR-51 */
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
if (s.impl == null)
s.setImpl();
else {
s.impl.reset();
}
si = s.impl;
s.impl = null;
si.address = new InetAddress();
si.fd = new FileDescriptor();
getImpl().accept(si); // <1>
SocketCleanable.register(si.fd); // raw fd has been set
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
s.impl = si;
s.postAccept();
}
复制代码
上面执行了<1>处getImpl的accept方法以后,咱们在AbstractPlainSocketImpl找到accept方法:
//java.net.AbstractPlainSocketImpl#accept
/** * Accepts connections. * @param s the connection */
protected void accept(SocketImpl s) throws IOException {
acquireFD();
try {
socketAccept(s);
} finally {
releaseFD();
}
}
复制代码
能够看到他调用了socketAccept方法,由于每一个操做系统的Socket地实现都不一样,因此这里Windows下就执行了咱们PlainSocketImpl里面的socketAccept方法:
// java.net.PlainSocketImpl#socketAccept
@Override
void socketAccept(SocketImpl s) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (s == null)
throw new NullPointerException("socket is null");
int newfd = -1;
InetSocketAddress[] isaa = new InetSocketAddress[1];
if (timeout <= 0) { //<1>
newfd = accept0(nativefd, isaa); // <2>
} else {
configureBlocking(nativefd, false);
try {
waitForNewConnection(nativefd, timeout);
newfd = accept0(nativefd, isaa); // <3>
if (newfd != -1) {
configureBlocking(newfd, true);
}
} finally {
configureBlocking(nativefd, true);
}
} // <4>
/* Update (SocketImpl)s' fd */
fdAccess.set(s.fd, newfd);
/* Update socketImpls remote port, address and localport */
InetSocketAddress isa = isaa[0];
s.port = isa.getPort();
s.address = isa.getAddress();
s.localport = localport;
if (preferIPv4Stack && !(s.address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
}
//java.net.PlainSocketImpl#accept0
static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;
复制代码
这里<1>到<4>之间是咱们关注的代码,<2>和<3>执行了accept0方法,这个是native方法,具体来讲就是与操做系统交互来实现监听指定端口上是否有客户端接入,正是由于accept0在没有客户端接入的时候会一直处于阻塞状态,因此形成了咱们程序级别的accept方法阻塞,固然对于程序级别的阻塞,咱们是能够避免的,也就是咱们能够将accept方法修改为非阻塞式,可是对于accept0形成的阻塞咱们暂时是无法改变的,操做系统级别的阻塞其实就是咱们一般所说的同步异步中的同步了。 前面说到咱们能够在程序级别改变accept的阻塞,具体怎么实现?其实就是经过咱们上面socketAccept方法中判断timeout的值来实现,在第<1>处判断timeout的值若是小于等于0,那么直接执行accept0方法,这时候将一直处于阻塞状态,可是若是咱们设置了timeout的话,即timeout值大于0的话,则程序会在等到咱们设置的时间后返回,注意这里的newfd若是等于-1的话,表示此次accept没有发现有数据从底层返回;那么到底timeout的值是在哪设置?咱们能够经过ServerSocket的setSoTimeout方法进行设置,来看看这个方法:
/** * Enable/disable {@link SocketOptions#SO_TIMEOUT SO_TIMEOUT} with the * specified timeout, in milliseconds. With this option set to a non-zero * timeout, a call to accept() for this ServerSocket * will block for only this amount of time. If the timeout expires, * a <B>java.net.SocketTimeoutException</B> is raised, though the * ServerSocket is still valid. The option <B>must</B> be enabled * prior to entering the blocking operation to have effect. The * timeout must be {@code > 0}. * A timeout of zero is interpreted as an infinite timeout. * @param timeout the specified timeout, in milliseconds * @exception SocketException if there is an error in * the underlying protocol, such as a TCP error. * @since 1.1 * @see #getSoTimeout() */
public synchronized void setSoTimeout(int timeout) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.SO_TIMEOUT, timeout);
}
复制代码
其执行了getImpl的setOption方法,而且设置了timeout时间,这里,咱们从AbstractPlainSocketImpl中查看:
//java.net.AbstractPlainSocketImpl#setOption
public void setOption(int opt, Object val) throws SocketException {
if (isClosedOrPending()) {
throw new SocketException("Socket Closed");
}
boolean on = true;
switch (opt) {
/* check type safety b4 going native. These should never * fail, since only java.Socket* has access to * PlainSocketImpl.setOption(). */
case SO_LINGER:
if (val == null || (!(val instanceof Integer) && !(val instanceof Boolean)))
throw new SocketException("Bad parameter for option");
if (val instanceof Boolean) {
/* true only if disabling - enabling should be Integer */
on = false;
}
break;
case SO_TIMEOUT: //<1>
if (val == null || (!(val instanceof Integer)))
throw new SocketException("Bad parameter for SO_TIMEOUT");
int tmp = ((Integer) val).intValue();
if (tmp < 0)
throw new IllegalArgumentException("timeout < 0");
timeout = tmp;
break;
case IP_TOS:
if (val == null || !(val instanceof Integer)) {
throw new SocketException("bad argument for IP_TOS");
}
trafficClass = ((Integer)val).intValue();
break;
case SO_BINDADDR:
throw new SocketException("Cannot re-bind socket");
case TCP_NODELAY:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for TCP_NODELAY");
on = ((Boolean)val).booleanValue();
break;
case SO_SNDBUF:
case SO_RCVBUF:
if (val == null || !(val instanceof Integer) ||
!(((Integer)val).intValue() > 0)) {
throw new SocketException("bad parameter for SO_SNDBUF " +
"or SO_RCVBUF");
}
break;
case SO_KEEPALIVE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_KEEPALIVE");
on = ((Boolean)val).booleanValue();
break;
case SO_OOBINLINE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_OOBINLINE");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEADDR:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEADDR");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEPORT:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEPORT");
if (!supportedOptions().contains(StandardSocketOptions.SO_REUSEPORT))
throw new UnsupportedOperationException("unsupported option");
on = ((Boolean)val).booleanValue();
break;
default:
throw new SocketException("unrecognized TCP option: " + opt);
}
socketSetOption(opt, on, val);
}
复制代码
这个方法比较长,咱们仅看与timeout
有关的代码,即<1>处的代码。其实这里仅仅就是将咱们setOption里面传入的timeout值设置到了AbstractPlainSocketImpl的全局变量timeout里而已。
这样,咱们就能够在程序级别将accept方法设置成为非阻塞式的了,可是read方法如今仍是阻塞式的,即后面咱们还须要改造read方法,一样将它在程序级别上变成非阻塞式。
在正式改造前,咱们有必要来解释下Socket下同步/异步和阻塞/非阻塞:
同步/异步是属于操做系统级别的,指的是操做系统在收到程序请求的IO以后,若是IO资源没有准备好的话,该如何响应程序的问题,同步的话就是不响应,直到IO资源准备好;而异步的话则会返回给程序一个标志,这个标志用于当IO资源准备好后经过事件机制发送的内容应该发到什么地方。
阻塞/非阻塞是属于程序级别的,指的是程序在请求操做系统进行IO操做时,若是IO资源没有准备好的话,程序该怎么处理的问题,阻塞的话就是程序什么都不作,一直等到IO资源准备好,非阻塞的话程序则继续运行,可是会时不时的去查看下IO到底准备好没有呢;
咱们一般见到的BIO是同步阻塞式的,同步的话说明操做系统底层是一直等待IO资源准备直到ok的,阻塞的话是程序自己也在一直等待IO资源准备直到ok,具体来说程序级别的阻塞就是accept和read形成的,咱们能够经过改造将其变成非阻塞式,可是操做系统层次的阻塞咱们无法改变。
咱们的NIO是同步非阻塞式的,其实它的非阻塞实现原理和咱们上面的讲解差很少的,就是为了改善accept和read方法带来的阻塞现象,因此引入了Channel
和Buffer
的概念。 好了,咱们对咱们的Demo进行改进,解决accept带来的阻塞问题(为多个客户端链接作的异步处理,这里就很少解释了,读者可自行思考,实在不行可到本人相关视频中找到对应解读):
public class BIOProNotB {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服务端Socket
Socket socket = null;//客户端socket
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientSocketThread thread = null;
try {
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(1000);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
try {
socket = serverSocket.accept();
} catch (SocketTimeoutException e) {
//运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里能够作一些其余事情
System.out.println("now time is: " + stringNowTime());
continue;
}
System.out.println(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
thread = new ClientSocketThread(socket);
threadPool.execute(thread);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
class ClientSocketThread extends Thread {
public Socket socket;
public ClientSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " " + inputContent);
count++;
}
System.out.println("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BIOProNotB server = new BIOProNotB();
server.initBIOServer(8888);
}
}
复制代码
为咱们的ServerSocket设置了timeout时间,这样的话调用accept方法的时候每隔1s他就会被唤醒一次,而再也不是一直在那里,只有有客户端接入才会返回信息;咱们运行一下看看结果:
2019-01-02 17:28:43:362: serverSocket started
now time is: 2019-01-02 17:28:44:363
now time is: 2019-01-02 17:28:45:363
now time is: 2019-01-02 17:28:46:363
now time is: 2019-01-02 17:28:47:363
now time is: 2019-01-02 17:28:48:363
now time is: 2019-01-02 17:28:49:363
now time is: 2019-01-02 17:28:50:363
now time is: 2019-01-02 17:28:51:364
now time is: 2019-01-02 17:28:52:365
now time is: 2019-01-02 17:28:53:365
now time is: 2019-01-02 17:28:54:365
now time is: 2019-01-02 17:28:55:365
now time is: 2019-01-02 17:28:56:365 // <1>
2019-01-02 17:28:56:911: id为1308927845的Clientsocket connected
now time is: 2019-01-02 17:28:57:913 // <2>
now time is: 2019-01-02 17:28:58:913
复制代码
能够看到,咱们刚开始并无客户端接入的时候,是会执行System.out.println("now time is: " + stringNowTime());
的输出,还有一点须要注意的就是,仔细看看上面的输出结果的标记<1>与<2>,你会发现<2>处时间值不是17:28:57:365,缘由就在于若是accept正常返回值的话,是不会执行catch语句部分的。
这样的话,咱们就把accept部分改形成了非阻塞式了,那么read部分能够改造么?固然能够,改造方法和accept很相似,咱们在read的时候,会调用 java.net.AbstractPlainSocketImpl#getInputStream
:
/** * Gets an InputStream for this socket. */
protected synchronized InputStream getInputStream() throws IOException {
synchronized (fdLock) {
if (isClosedOrPending())
throw new IOException("Socket Closed");
if (shut_rd)
throw new IOException("Socket input is shutdown");
if (socketInputStream == null)
socketInputStream = new SocketInputStream(this);
}
return socketInputStream;
}
复制代码
这里面建立了一个SocketInputStream
对象,会将当前AbstractPlainSocketImpl
对象传进去,因而,在读数据的时候,咱们会调用以下方法:
public int read(byte b[], int off, int length) throws IOException {
return read(b, off, length, impl.getTimeout());
}
int read(byte b[], int off, int length, int timeout) throws IOException {
int n;
// EOF already encountered
if (eof) {
return -1;
}
// connection reset
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
// bounds check
if (length <= 0 || off < 0 || length > b.length - off) {
if (length == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException("length == " + length
+ " off == " + off + " buffer length == " + b.length);
}
// acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try {
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
impl.setConnectionReset();
} finally {
impl.releaseFD();
}
/* * If we get here we are at EOF, the socket has been closed, * or the connection has been reset. */
if (impl.isClosedOrPending()) {
throw new SocketException("Socket closed");
}
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
eof = true;
return -1;
}
private int socketRead(FileDescriptor fd, byte b[], int off, int len, int timeout) throws IOException {
return socketRead0(fd, b, off, len, timeout);
}
复制代码
这里,咱们看到了socketRead一样设定了timeout,并且这个timeout就是咱们建立这个SocketInputStream
对象时传入的AbstractPlainSocketImpl
对象来控制的,因此,咱们只须要设定serverSocket.setSoTimeout(1000)
便可。 咱们再次修改服务端代码(代码总共两次设定,第一次是设定的是ServerSocket级别的,第二次设定的客户端链接返回的那个Socket,二者不同):
public class BIOProNotBR {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服务端Socket
Socket socket = null;//客户端socket
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientSocketThread thread = null;
try {
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(1000);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
try {
socket = serverSocket.accept();
} catch (SocketTimeoutException e) {
//运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里能够作一些其余事情
System.out.println("now time is: " + stringNowTime());
continue;
}
System.out.println(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
thread = new ClientSocketThread(socket);
threadPool.execute(thread);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
class ClientSocketThread extends Thread {
public Socket socket;
public ClientSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
socket.setSoTimeout(1000);
} catch (SocketException e1) {
e1.printStackTrace();
}
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while (true) {
try {
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " " + inputContent);
count++;
}
} catch (Exception e) {
//执行到这里表示read方法没有获取到任何数据,线程能够执行一些其余的操做
System.out.println("Not read data: " + stringNowTime());
continue;
}
//执行到这里表示读取到了数据,咱们能够在这里进行回复客户端的工做
System.out.println("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BIOProNotBR server = new BIOProNotBR();
server.initBIOServer(8888);
}
}
复制代码
执行以下:
2019-01-02 17:59:03:713: serverSocket started
now time is: 2019-01-02 17:59:04:714
now time is: 2019-01-02 17:59:05:714
now time is: 2019-01-02 17:59:06:714
2019-01-02 17:59:06:932: id为1810132623的Clientsocket connected
now time is: 2019-01-02 17:59:07:934
Not read data: 2019-01-02 17:59:07:935
now time is: 2019-01-02 17:59:08:934
Not read data: 2019-01-02 17:59:08:935
now time is: 2019-01-02 17:59:09:935
Not read data: 2019-01-02 17:59:09:936
收到id为1810132623 2019-01-02 17:59:09: 第0条消息: ccc // <1>
now time is: 2019-01-02 17:59:10:935
Not read data: 2019-01-02 17:59:10:981 // <2>
收到id为1810132623 2019-01-02 17:59:11: 第1条消息: bbb
now time is: 2019-01-02 17:59:11:935
Not read data: 2019-01-02 17:59:12:470
now time is: 2019-01-02 17:59:12:935
id为1810132623的Clientsocket 2019-01-02 17:59:13:191读取结束
now time is: 2019-01-02 17:59:13:935
id为1810132623的Clientsocket 2019-01-02 17:59:14:192读取结束
复制代码
其中,Not read data输出部分解决了咱们的read阻塞问题,每隔1s会去唤醒咱们的read操做,若是在1s内没有读到数据的话就会执行System.out.println("Not read data: " + stringNowTime())
,在这里咱们就能够进行一些其余操做了,避免了阻塞中当前线程的现象,当咱们有数据发送以后,就有了<1>处的输出了,由于read获得输出,因此再也不执行catch语句部分,所以你会发现<2>处输出时间是和<1>处的时间相差1s而不是和以前的17:59:09:936相差一秒;
这样的话,咱们就解决了accept以及read带来的阻塞问题了,同时在服务端为每个客户端都建立了一个线程来处理各自的业务逻辑,这点其实基本上已经解决了阻塞问题了,咱们能够理解成是最第一版的NIO,可是,为每一个客户端都建立一个线程这点确实让人头疼的,特别是客户端多了的话,很浪费服务器资源,再加上线程之间的切换开销,更是雪上加霜,即便你引入了线程池技术来控制线程的个数,可是当客户端多起来的时候会致使线程池的BlockingQueue队列愈来愈大,那么,这时候的NIO就能够为咱们解决这个问题,它并不会为每一个客户端都建立一个线程,在服务端只有一个线程,会为每一个客户端建立一个通道。
accept()本地方法,咱们能够来试着看一看Linux这块的相关解读:
#include <sys/types.h>
#include <sys/socket.h>
int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);
复制代码
accept()系统调用主要用在基于链接的套接字类型,好比SOCK_STREAM和SOCK_SEQPACKET。它提取出所监听套接字的等待链接队列中第一个链接请求,建立一个新的套接字,并返回指向该套接字的文件描述符。新创建的套接字不在监听状态,原来所监听的套接字也不受该系统调用的影响。
备注:新创建的套接字准备发送send()和接收数据recv()。
参数:
sockfd, 利用系统调用socket()创建的套接字描述符,经过bind()绑定到一个本地地址(通常为服务器的套接字),而且经过listen()一直在监听链接;
addr, 指向struct sockaddr的指针,该结构用通信层服务器对等套接字的地址(通常为客户端地址)填写,返回地址addr的确切格式由套接字的地址类别(好比TCP或UDP)决定;若addr为NULL,没有有效地址填写,这种状况下,addrlen也不使用,应该置为NULL;
备注:addr是个指向局部数据结构sockaddr_in的指针,这就是要求接入的信息本地的套接字(地址和指针)。
addrlen, 一个值结果参数,调用函数必须初始化为包含addr所指向结构大小的数值,函数返回时包含对等地址(通常为服务器地址)的实际数值;
备注:addrlen是个局部整形变量,设置为sizeof(struct sockaddr_in)。
若是队列中没有等待的链接,套接字也没有被标记为Non-blocking,accept()会阻塞调用函数直到链接出现;若是套接字被标记为Non-blocking,队列中也没有等待的链接,accept()返回错误EAGAIN或EWOULDBLOCK。
备注:通常来讲,实现时accept()为阻塞函数,当监听socket调用accept()时,它先到本身的receive_buf中查看是否有链接数据包;如有,把数据拷贝出来,删掉接收到的数据包,建立新的socket与客户发来的地址创建链接;若没有,就阻塞等待;
为了在套接字中有到来的链接时获得通知,可使用select()或poll()。当尝试创建新链接时,系统发送一个可读事件,而后调用accept()为该链接获取套接字。另外一种方法是,当套接字中有链接到来时设定套接字发送SIGIO信号。
返回值 成功时,返回非负整数,该整数是接收到套接字的描述符;出错时,返回-1,相应地设定全局变量errno。
因此,咱们在咱们的Java部分的源码里(java.net.ServerSocket#accept)会new 一个Socket出来,方便链接后拿到的新Socket的文件描述符的信息给设定到咱们new出来的这个Socket上来,这点在java.net.PlainSocketImpl#socketAccept
中看到的尤其明显,读者能够回顾相关源码。