在java多线程通讯中管道通讯是一种重要的通讯方式,在java中咱们经过配套使用管道输出流PipedOutputStream和管道输入流PipedInputStream完成线程间通讯。多线程管道通讯的主要流程是在一个线程中向PipedOutputStream写入数据,这些数据会自动传送到对应的管道输入流PipedInputStream中,其余线程经过读取PipeInputStream中缓冲的数据实现多线程间通讯。java
PipeInputStream是管道输入流,继承自InputStream,链接到一个管道输出流PipedOutputStream。能够缓存链接的管道输出流PipedOutputStream写入的字节数据。一般在一个线程使用PipedInputStream读取数据,在其余线程使用PipedOutputStream写入字节数据。不推荐在一个线程中使用PipedInputStream和PipedOutputStream可能会在线程中形成死锁,管道输入流包含一个缓冲区buff用于读操做和写操做。数组
1)成员变量缓存
package java.io; public class PipedInputStream extends InputStream { //管道输出流是否关闭标记 boolean closedByWriter = false; //管道输入流是否标记 volatile boolean closedByReader = false; //管道输入流与管道输出流是否创建链接 boolean connected = false; //读取“管道”数据即PipedInputStream线程 Thread readSide; //向管道写入数据即PipedOutputStream线程 Thread writeSide; //管道默认大小 private static final int DEFAULT_PIPE_SIZE = 1024; protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; // 缓冲区 protected byte buffer[]; //下一个写入字节的位置 protected int in = -1; //下一个读取字节的位置。若out==in说明管道输出流写入的数据所有被读取 protected int out = 0; }
2)构造方法数据结构
public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } public PipedInputStream() { initPipe(DEFAULT_PIPE_SIZE); }
从源码咱们能够知道管道输入流PipedInputStream构造方法作了两件事,按照指定大小pipeSize初始化缓冲区,若是还指定了关联的管道输出流PipedOutputStream,那么调用connect方法链接它。若是指定的pipeSize小于等于0那么抛出IllegalArgumentException异常,若是当前的管道输入流已经和指定的管道输出流创建链接那么抛出IOException异常提示链接已经创建。多线程
3)其余成员方法ide
//根据指定大小pipeSize初始化缓冲区 private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } //绑定管道输入流与管道输出流 public void connect(PipedOutputStream src) throws IOException { src.connect(this); } //接收绑定的管道输出流PipedOutputStream的write(int b)方法写入的int类型数据 protected synchronized void receive(int b) throws IOException { //检查管道状态 checkStateForReceive(); //获取“写入管道“即PipedOutputStream的线程 writeSide = Thread.currentThread(); //若管道输出流写入的数据所有被读取则等待 if (in == out) awaitSpace(); if (in < 0) { in = 0; out = 0; } //将读取的字节b保存到缓冲区 buffer[in++] = (byte)(b & 0xFF); //若是当前写入的字节数大于缓冲区大小那么从头覆盖以前写入的字节数据 if (in >= buffer.length) { in = 0; } } //接收管道输出流的write(byte b[],int off, int len)方法调用写入的字节数组b synchronized void receive(byte b[], int off, int len) throws IOException { //检查管道状态 checkStateForReceive(); //获取”写入管道“线程 writeSide = Thread.currentThread(); //获取写入字节长度 int bytesToTransfer = len; //循环将字节数组b写入管道输入流内部缓冲数组buffer while (bytesToTransfer > 0) { //若写入管道的字节长度in等于读取字节长度长度out,则等待 if (in == out) awaitSpace(); int nextTransferAmount = 0; //若管道中读取的字节数out小于写入的字节数in,nextTransferAmount等于buffer.length-in if (out < in) { nextTransferAmount = buffer.length - in; } //若管道中写入的字节数小于读取的字节数, else if (in < out) { if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); //将数据复制到缓冲区buffer中 System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; //缓冲区溢出继续写入则覆盖原有字节数据 if (in >= buffer.length) { in = 0; } } } //检查管道状态 private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } } //等待。若”写入管道“的数据被所有读取完,则唤醒”读取管道“线程继续读取字节数据以让缓冲区空出空间继续写入数据,等待 //1000ms private void awaitSpace() throws IOException { while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } } //链接的管道输出流关闭时被调用用于更新管道输入流关闭状态,并唤醒读取管道线程 synchronized void receivedLast() { closedByWriter = true; notifyAll(); } //从管道输入流更确切的说缓冲区buffer中读取一个字节并转化为int值 public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; //若是写入字节数小于0,且非写入管道线程异常终止或者管道输出流写入结束正常关闭,那么唤醒写入管道等待1s while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } //从管道输入流缓冲数组buffer中读取字节数据并填入数组b中逻辑与read()相似 public synchronized int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; while ((in >= 0) && (len > 1)) { int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } // A byte is read beforehand outside the loop if (available > (len - 1)) { available = len - 1; } System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } //返回可不受阻塞地今后输入流中读取的字节数,注意available返回是缓冲数组中的可读字节长度 public synchronized int available() throws IOException { if(in < 0) return 0; else if(in == out) return buffer.length; else if (in > out) return in - out; else return in + buffer.length - out; } //关闭输出流 public void close() throws IOException { closedByReader = true; synchronized (this) { in = -1; } }
PipedOutputStream是管道输出流,继承自OutputStream。经过与一个管道输入流PipedInputStream创建链接往管道输入流中写入数据,其实质是往绑定的管道输入流的内部缓存区中填入数据。PipedInputStream和PipedOutputStream主要用于线程间通讯,不建议在单线程中使用PipedInputStream和PipedOutputStream可能形成死锁。函数
public class PipedOutputStream extends OutputStream { //绑定的管道输入流对象 private PipedInputStream sink; }
1)构造函数oop
public PipedOutputStream(PipedInputStream snk) throws IOException { connect(snk); } public PipedOutputStream() { } public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
PipedOutputStream类内部定义了两个构造函数,一个无参构造函数没作啥,咱们分析下入参是管道输入流对象引用的构造函数。由源码可知与PipedInputStream构造函数作的同样其实PipedInputStream带参构造函数方法内部调用的也是本方法,参数判空,判断是否此前是否已创建链接,内部管道输入流对象引用指向指定对象,初始化对象的读取写入位置,设置链接状态为已链接。源码分析
2)void write(byte b[], int off, int len)写入字节数据this
public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } sink.receive(b, off, len); }
这个方法首先对当前绑定的管道输入流对象进行判空,为空抛出一个IO异常提示未创建管道链接,对写入的字节数组b判空为空抛出NullException异常,接着对待写入字节数组b写入起点off,写入长度len进行范围合法性校验,校验失败抛出IndexOutOfBoundsException数组越界异常,若是写入长度为0直接返回,下面就是调用绑定的管道输入流对象sink.receive(b, off, len)方法,源码解析在前面关于PipedInputStream源码的解析部分。从这边咱们知道PipedOutputStream写入数据其实就是调用绑定输入流的receive方法往内部缓冲数组buffer中填入字节数据。
3)其余成员方法
//写入单个字节,实质就是往绑定的管道输入流内部缓冲数组填入一个字节数据 public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } sink.receive(b); } //释放输入流对象锁,继续读取缓冲区数据 public synchronized void flush() throws IOException { if (sink != null) { synchronized (sink) { sink.notifyAll(); } } } //调用管道输入流的receiveLast关闭通道 public void close() throws IOException { if (sink != null) { sink.receivedLast(); } } }