PipedReader继承自Reader类,是字符管道输入流,它的功能与管道字节输出流PipedInputStream极为类似,经过绑定一个管道输出流PipedWriter实现了相似管道的功能,实现线程间通讯,一个线程在字符管道输出流中写入数据,基于管道特性这些数据实际会传送到(其实保存可能更恰当)它链接的字符管道输入流的内置字符缓冲区buffer中,其余线程从管道字符输入流的字符缓冲区中读取数据如此实现了多线程环境下的线程间通讯。java
package java.io; public class PipedReader extends Reader { //链接的管道输出流的关闭状态 boolean closedByWriter = false; //链接的管道输入流的关闭状态 boolean closedByReader = false; //当前字符管道输入流的链接状态 boolean connected = false; //从管道中读取数据的线程 Thread readSide; //向管道中写入数据的线程 Thread writeSide; /** * 内置缓冲区的默认大小 */ private static final int DEFAULT_PIPE_SIZE = 1024; /** * 存储字符数据的内部缓冲区 */ char buffer[]; /** * 缓冲区的位置,标识缓冲区数组存储下一次从管道输出流PipedWriter读取的字符的位置,若in<0标识缓冲区目前是空的, * 还未放入数据,若in==full则表示缓冲区是满的 */ int in = -1; /** * 缓冲区位置,用于标识管道输入流PipedReader下一次从缓冲区读取字符的位置 */ int out = 0; }
/** * 构造函数,指定了链接的字符管道输出流,并使用默认缓冲区容量初始化内部缓冲区数组 */ public PipedReader(PipedWriter src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } /** * 构造函数,指定了链接的字符管道输出流PipedWrite和内部字符缓冲区buffer的容量,基于参数与字符管道输出流创建链接 * 造成管道,并初始化内部缓冲区 */ public PipedReader(PipedWriter src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } /** * 无参构造函数,建立一个字符管道输入流,但还未与PipedWriter创建链接,在使用以前必须与指定字符管道输出流 * PipedWriter创建链接 */ public PipedReader() { initPipe(DEFAULT_PIPE_SIZE); }
咱们选择第二个构造函数PipedReader(PipedWriter src, int pipeSize)分析下在调用构造函数实例化PipedWriter对象的时候作了什么。在该构造函数内部首先调用了initPipe方法咱们首先进入该方法源码进行分析:数组
private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe size <= 0"); } buffer = new char[pipeSize]; }
没作啥事无非是对指定的缓冲区容量参数pipeSize作了合法性校验,在经过校验后为内部缓冲区分配了一个大小为pipeSize的缓冲区。咱们接着往下分析,接着方法构造函数调用了connect方法,进入该方法数据结构
public void connect(PipedWriter src) throws IOException { src.connect(this); }
该方法调用了PipedWriter的src方法,进入该方法源码多线程
public synchronized void connect(PipedReader snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } else if (snk.closedByReader || closed) { throw new IOException("Pipe closed"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
这里方法主要作了如下事情:判断方法传入的字符管道输入流对象是否为空若是为空抛出空指针异常、判断字符输入流对象是否已经创建链接若是已创建链接抛出IO异常提示“已经创建链接“、判断管道两端的字符流是否有任意一端已经关闭,若是有则抛出IO异常提示“管道已关闭“,若经过上述校验则经过指定当前字符管道输入流链接的字符管道输出流对象内部成员变量sink(链接的管道输入流),初始化字符管道输入流的读写位置和链接状态,完成管道的链接。ide
到这里咱们能够总结PipedReader的构造函数主要作了两件事:1.初始化内部字符缓冲区;2基于方法传入的管道字符输出流对象PipedWriter创建管道链接函数
/** * 接收一个字符,将其插入到字符缓冲区数组,若是当前没有可用的输入,方法会阻塞 * 该方法一般由链接的字符管道输出流写入方法调用将字符数据写入到当前缓冲区 */ synchronized void receive(int c) throws IOException { //检查当前管道的链接状态 if (!connected) { throw new IOException("Pipe not connected"); //判断管道任意一端是否已关闭 } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); //判断当前读取字符数据线程即便用字符管道输出流读取数据的线程状态 } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } //获取调用方即管道写入端(字符管道输出流)线程 writeSide = Thread.currentThread(); //若是写入管道的数据已经读取完 while (in == out) { //若管道读取端线程不活跃,抛出IO异常提示管道已损坏 if ((readSide != null) && !readSide.isAlive()) { throw new IOException("Pipe broken"); } 唤醒其余线程继续读取管道字符数据 notifyAll(); try { //等待释放锁 wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } //若in<0重置字符写入位置in和读取位置out if (in < 0) { in = 0; out = 0; } //在缓冲区对应位置插入字符c buffer[in++] = (char) c; //若插入字符后写入位置超出字符缓冲区的限制,则会重置写入位置in到缓冲区数组开始位置,覆盖缓冲区原有的数据 if (in >= buffer.length) { in = 0; } }
receive方法是字符管道输入流PipedReader用于接收字符管道输出流PipedWriter写入的数据,也是PipedReader与PipedWrite组合使用实现线程间通讯的核心方法。管道写入端线程经过输出流PipedWriter对象调用本方法往PipedReader的缓冲区写入字符数据,其余管道读取端线程经过读取该缓冲区数据以此实现线程间的数据传递和通讯。能够参照本方法分析本类的void receive(char c[], int off, int len)方法源码分析
read()方法用于从当前管道读取下一个字符,若是当前管道没有字符数据可读取即已经到达字符管道输入流的末尾,那么返回-1,在输入数据能够,到达流末尾或者抛出异常前,该方法会一直阻塞this
public synchronized int read() throws IOException { //检查链接状态 if (!connected) { throw new IOException("Pipe not connected"); } //检查管道链接的字符输入流流状态 else if (closedByReader) { throw new IOException("Pipe closed"); } //检查管道写入端状态(包括使用字符输出流线程和字符输出流状态) else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } //获取当前管道使用字符输入流的线程 readSide = Thread.currentThread(); int trials = 2; //当前缓冲区无数据 while (in < 0) { //若链接的字符管道输出流已经关闭,返回-1读取结束 if (closedByWriter) { /* closed by writer, return EOF */ return -1; } //写入端线程未处于活跃状态,管道损坏 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /** 唤醒管道等待写入的线程 **/ notifyAll(); try { //阻塞等待 wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } //当前缓冲区有数据 //从缓冲区获取单个字符,并更新读取位置out指向下一次读取位置 int ret = buffer[out++]; //若下一次读取位置超出缓冲区限制则重置到缓冲区开头,参照接收字符数据超出缓冲区的处理 if (out >= buffer.length) { out = 0; } //当前缓冲区数据已所有读取完 if (in == out) { /* now empty */ in = -1; } return ret; }
read()方法的逻辑较为简单,方法开始首先基于管道、管道链接的字符输入流、管道链接的字符输出流三者的状态信息判断,若是状态不符结束抛出IO异常,经过上述检测以后若读取的缓冲区存储的字符数据为空,那么判断是否是管道写入端使用PipedWrite写入字符数据所在的线程关闭了或者线程未处于活跃状态,若是排除了上述两种状况那么当前读取线程阻塞等待,唤醒其余等待写入的线程往缓冲区写入字符数据。接着阻塞直到当缓冲区存在可读字符数据时,从缓冲区读取单个字符并返回。spa
/** * 将字符数组c从下标off开始最多len个字符读入当前字符管道输入流的内部缓冲区。该方法将一直阻塞直到字符数据所有写入 * 缓冲区,或者发生异常。 */ synchronized void receive(char c[], int off, int len) throws IOException { while (--len >= 0) { receive(c[off++]); } } /** * 通知全部等待线程最后一个字符已接收,一般用于管道字符输出流关闭通知写入结束 */ synchronized void receivedLast() { closedByWriter = true; notifyAll(); } /** * 将最多len个字符从管道输入流读取到方法指定字符数组cbuf,从off下标出开始填入读取字符 * 若是提早到达输入流末尾(即缓冲区暂时没有那么多字符数据可读取)那么读取的字符数可能小于len * 在检测到达流末尾、抛出异常或者至少有一个字符可读以前该方法将一直阻塞 */ public synchronized int read(char cbuf[], int off, int len) throws IOException { //检测链接状态 if (!connected) { throw new IOException("Pipe not connected"); } //管道字符输入流是否关闭 else if (closedByReader) { throw new IOException("Pipe closed"); } //管道输出流以及使用它进行写入的线程状态判断 else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } //方法写入位置参数off的范围合法性校验 if ((off < 0) || (off > cbuf.length) || (len < 0) || ((off + len) > cbuf.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* 读取单个字符可能会阻塞 */ int c = read(); if (c < 0) { return -1; } cbuf[off] = (char)c; int rlen = 1; //循环读取len个字符,直到流末尾或者读取结束 while ((in >= 0) && (--len > 0)) { cbuf[off + rlen] = buffer[out++]; rlen++; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } /** * 返回当前字符管道输入流是否已准备好被读取。当内部缓冲区字符数据不为空则该字符管道输入流已准备好被读取 * 当管道未链接、损坏、关闭方法将抛出IO异常 */ public synchronized boolean ready() throws IOException { //检测管道链接状态 if (!connected) { throw new IOException("Pipe not connected"); } //当前管道字符输出流关闭 else if (closedByReader) { throw new IOException("Pipe closed"); } //管道字符输出流已关闭或所在线程不活跃 else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } //写入缓冲区数据为空 if (in < 0) { return false; } else { return true; } } /** * 关闭当前管道字符输入流,释放相关系统资源,其实就是更新输入流状态和管道写入位置 */ public void close() throws IOException { in = -1; closedByReader = true; }