参考:https://www.jianshu.com/p/f5941bcf3a2djava
将JDK封装的IO操做再进行一层封装数组
好处:安全
(1)使用了segement片断管理的方式管理数据,以片断做为IO操做的单位,使IO操做吞吐率增长架构
(2)使用链表将Segment片断联系起来管理,对于移动指针就能够对数据进行管理,扩容方便异步
(3)使用SegmentPool对废弃的片断回收、复用和内存共享,从而减小内存的申请和减小了GC的频率,性能获得优化。socket
(4)封装了输出输入的超时处理,着在JDK原生io操做中没有ide
(5)具有了不少从字节到特定编码格式或者数据类型的转化方法,很方便函数
OKIO写流程图:性能
public static void main(String[] args) { File file = new File("hello.txt"); try { readString(new FileInputStream(file)); } catch (IOException e) { e.printStackTrace(); } } public static void readString(InputStream in) throws IOException { BufferedSource source = Okio.buffer(Okio.source(in)); //建立RealBufferedSource输入流 String s = source.readUtf8(); //以UTF-8读,就是经过执行回调(Okio.source(in)的read方法)往Buffer中写source输入流的数据,再返回Buffer存储size的所有字节 System.out.println(s); source.close(); } public static void writeString(OutputStream out){ BufferedSink sink = Okio.buffer(Okio.sink(out)); //建立RealBufferedSink输出流 sink.writeLone("test");//里面往Buffer里面写“test”,再经过执行回调(Okio.sink(out)的write方法)将Buffer里面的数据写到输出流sink里 sink.close(); }
3.okio架构优化
两接口分别定义了read()和wriete(),表明着输入流和输出流,
BufferedSource、BufferedSink接口:
分别继承了Source和Sink,维护者Buffer,分别定义了一堆针对各类类型的读和写操做方法。
至关于一个操做的中介代理,内部都维护着一个Buffer,分别定义了进行具体操做。
存储最大长度为8K字节数组,该数组为不可变字节序列,pos表明开始能够读的字节序号,limit表明能够写的字节序号,boolean shared表明该片断是否共享,boolean owner表明本身是否能够操做本片断(与shared互斥),经过per、next造成链表(Buffer构建出双向链表),方法固然有push()、pop()
因为存储的结果就是Head-new1-new2-new3-这样子,新建new的前提仅仅是上一片断(例如new3)的limit到Segment_size_max小于要存储的字节数,若是很长时间会有一个segment只存了小部分数据,可是有不少segment,形成内存浪费。所以须要压缩方法compact(),具体作法:先将上一片断的数据从pos往前移,使pos从序号0开始,而后将本片断的数据写到上一片断上,从limit开始,而后将本片断抛出。
public void compact() { if (prev == this) throw new IllegalStateException();//若是上一节点是本身表明没有上一节点 if (!prev.owner) return; // Cannot compact: prev isn't writable.//若是上一节点不能够操做,返回 int byteCount = limit - pos;//记录当前片断存储的字节长度L1 int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);//计算上一节点没有存储数据的长度L2(pos以前的长度+limit以后的长度,若是perv是共享的,那么就不能加入pos以前的长度) if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.//若是L1>L2,那么就不够位置存,返回 writeTo(prev, byteCount);//将本部分数据写入上一节点 pop();//弹出本节点 SegmentPool.recycle(this); }
/** Moves {@code byteCount} bytes from this segment to {@code sink}. */ public void writeTo(Segment sink, int byteCount) {//往sink部分写入this(Segment)长度为byteCount的数据 if (!sink.owner) throw new IllegalArgumentException(); if (sink.limit + byteCount > SIZE) { // We can't fit byteCount bytes at the sink's current position. Shift sink first. if (sink.shared) throw new IllegalArgumentException(); if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException(); System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);//先将sink本来存储的数据往前移,pos设为1 sink.limit -= sink.pos; sink.pos = 0; } System.arraycopy(data, pos, sink.data, sink.limit, byteCount);//将this(Segment)数据写入sink中,从limit开始 sink.limit += byteCount; pos += byteCount; }
原理:将一个Segment分为[pos...pos+byteCount]和[pos+byteCount...limit]两部分,该方法只用在Buffer.wirte()上,注意的是byteCount参数的意义是两个Segment相同的部分,若是相同的部分超多SHARE_MINIMUM=1024字节,就共享一个Segment,看起来咱们是new了一个Segment,可是里面的data[]引用仍是原来的Segment,所以可以减小数据复制带来的内存开销,从而提升性能
public Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; // We have two competing performance goals: // - Avoid copying data. We accomplish this by sharing segments. // - Avoid short shared segments. These are bad for performance because they are readonly and // may lead to long chains of short segments. // To balance these goals we only share segments when the copy will be large. if (byteCount >= SHARE_MINIMUM) { prefix = new Segment(this); } else { prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix); return prefix; }
该池经过一个segment变量将所有segment联系起来,存储最多8个segment的片断池,若是该池还未满,一旦segment废弃了就放到该池中,减小了GC。若是io操做须要新建segment的话就不用new申请内存空间了,直接从片断池中获取。
具体读写操做的执行类,内部维护着一个Segment head,经过该片断就能够获取整个链表的全部数据,long size表明已经存储的总字节数。实现了三个接口,读、写和Clone。
实现及好处:循环的双向链表实现,链表节点为Segment片断,而Segment维护着固定长度的字节数组。这样采起分片的连接方式,片中又经过数组存储,兼具读的连续性和写的可插入性,对比于单一的用链表或者数组,是比较折中的方式。还有个好处就是根据需求来改动分片的大小来权衡读写的业务操做。
/** Returns a deep copy of this buffer. */ @Override public Buffer clone() { Buffer result = new Buffer(); if (size == 0) return result; result.head = new Segment(head); result.head.next = result.head.prev = result.head;//1 for (Segment s = head.next; s != head; s = s.next) { result.head.prev.push(new Segment(s)); } result.size = size; return result; }
Segment(Segment shareFrom) { this(shareFrom.data, shareFrom.pos, shareFrom.limit); shareFrom.shared = true; } Segment(byte[] data, int pos, int limit) { this.data = data; this.pos = pos; this.limit = limit; this.owner = false; this.shared = true; }
该Clone拷贝方法为浅拷贝,可是咱们注意到虽然拷贝新建了一个Buffer,可是该Buffer的head变量的构造由new Segment(head)构建,细看该构造器中最重要的字节数组data引用指向了新的data,没有从新分配新的内存空间,性能获得优化。Segment的shared共享标示参数设为true,结合Segment类不少设计到shared的方法,有不少好处。
同时注意到语句1,result.head.next = result.head.prev = result.head,这样就造成了循环的链表中循环的特性了,以前分析Segment类不管增长数据add Segment或者读取数据 popSegment都没有造成循环闭环的特性,原来闭环(循环)的特性在Buffer类中造成。
除此以外,Buffer进行写操做每次都必须先获取segment,语句2也是造成闭环的缘由
Segment writableSegment(int minimumCapacity) { if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException(); if (head == null) { head = SegmentPool.take(); // Acquire a first segment. return head.next = head.prev = head;//2 } Segment tail = head.prev; if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {//须要的长度若是从limit上再追加大于了Segment的最大大小了 tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.//使得head.prev每次都指向新push的segment } return tail; }
具体分为int(4)、long(8)、short(2)的等等数据类型的读和写,须要注意的是各类数据类型须要多少个字节存储,读和写的参数分为直接是字节数组或者是source、sink、输入流、输出流等等,大同小异。
若是为String数据类型做为输入或者输出时,编码格式能够选择,utf-8
以md五、sha一、sha256数字摘要输出Buffer里面存储的数据。
其实就是一个不可变的字节数组,以特定的编码格式进行解码,支持的解码方式有utf-8,base64和hex。
该字段其实就是不可变的意思,在Segment存储data仍是ByteString里面data都设了该字段。
定义:
类不被拓展(继承或者实现)
全部域都是final的,全部域都是private
没有任何提供改变该类状态的方法
好处:固然是线程安全的,不用考虑同步的问题(final的语义:当构造函数结束时,final定义的值被保证当其余线程访问该对象时,该值是可见的)
坏处:须要建立大量类的对象(。。。)
超时机制:意义在于防止IO操做阻塞在某个异常上,okio的超时机制的基础就是采起同步超时TimeOut。
同步:TimeOut类
使用在以任何inputStream或者outPutStream做为输入流输出流参数构建时,使用同步超时机制。
其实经过对OKIO的简单使用,咱们知道读写操做的具体操做都在构建输入输出流的的回调方法。
如下sink构建输出流代码为例(source(final InPutStream in)构建输入流在超时处理上也同样执行了语句1,timeOut.throwInReached(),不拓展了)
OKIO:
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) { if (out == null) throw new IllegalArgumentException("out == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { timeout.throwIfReached();//1 Segment head = source.head; int toCopy = (int) Math.min(byteCount, head.limit - head.pos); out.write(head.data, head.pos, toCopy); head.pos += toCopy; byteCount -= toCopy; source.size -= toCopy; if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; } }; }
TimeOut:
public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }
异步:AsyncTimeOut,
使用在socket做为输入输出流时,使用异步超时处理,之因此在socket上使用异步超时处理,由于socket自身性质决定,socket经常由于自身阻塞致使往下不执行。
okio:与其余inputStream不一样的是,他不直接返回source,而是做为AsyncTime.source()的参数再进行一次封装,返回source
public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); return timeout.source(source); }
AsyncTimeOut:咱们发现这里才是执行的准确位置,语句1就是涉及到异步超时处理机制。
在开始读或者写以前执行enter(),该方法就是开启一个WatchDog线程,待分析。。。
public final Source source(final Source source) { return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { boolean throwOnTimeout = false; enter();//1 try { long result = source.read(sink, byteCount); throwOnTimeout = true; return result; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; try { source.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.source(" + source + ")"; } }; }