Okio 主要是替代 java.io 和 java.nio 那一套复杂的 api 调用框架,让数据访问、存储和处理更加便捷。java
Okio 是 OkHttp 的基石,而 OkHttp 是 Retrofit 的基石。这三个框架合称『Square 安卓平台网络层三板斧』node
参考 OkioTestgit
……
@Test public void readWriteFile() throws Exception {
File file = temporaryFolder.newFile();
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
sink.close();
assertTrue(file.exists());
assertEquals(20, file.length());
BufferedSource source = Okio.buffer(Okio.source(file));
assertEquals("Hello, java.io file!", source.readUtf8());
source.close();
}
……
复制代码
经过 OkioTest 能够大体明白 Okio 主要有 『读』、『写』两大类操做。能够操做的对象为:github
1. 文件
2. 文件路径描述类 Path
3. Socket
4. OutputStream
5. InputStream
复制代码
Okio 经过 sink(xxx) 去写一个对象,经过 source(xxx)去读一个对象。api
经过 Okio.buffer() 得到用来读写的 BufferedSource、BufferedSink缓存
BufferedSink sink = Okio.buffer(Okio.sink(file));
BufferedSource source = Okio.buffer(Okio.source(file));
复制代码
进一步查看 buffer() 方法网络
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
复制代码
看下 RealBufferedSink 类框架
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
}
……
}
复制代码
RealBufferedSink 实现了 BufferedSink 接口,BufferedSink 实现了 Sink 接口。异步
而 Sink 实现了 Closeable, Flushable 接口。socket
1. Flushable 接口只定义了一个方法 flush() ,用来实现把数据从缓存区写入到底层输入流。
2. Closeable 接口定义 close() ,用来关闭流。
3. Sink 接口又定义了一个 write(Buffer source, long byteCount) 和 timeout() 用来写入数据和设置超时。
4. BufferedSink 接口则定义了一堆 wirteXXX(……) 用来操做不一样类型的数据写入。
复制代码
在看 RealBufferedSink 的成员变量
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
复制代码
这里出现了一个 Buffer 对象,一个从构造函数里面传入的 Sink 对象,以及一个用来记录流是否关闭的 boolean 标志。
RealBufferedSink 的各类 wirteXXX(……)大都以下
@Override public BufferedSink writeXXX(……) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeXXX(……);
return emitCompleteSegments();
}
复制代码
可见写入数据的方法,都是有 buffer 对象实现。而在 emitXXX() 和 flush() 方法中则调用了 sink.write(buffer, byteCount) 和 sink.flush()
RealBufferedSource 和 RealBufferedSink 相似
final class RealBufferedSource implements BufferedSource {
public final Buffer buffer = new Buffer();
public final Source source;
boolean closed;
RealBufferedSource(Source source) {
if (source == null) throw new NullPointerException("source == null");
this.source = source;
}
}
复制代码
RealBufferedSource 实现了 BufferedSource 接口,BufferedSource 实现了 Source 接口。
Source 接口一样也实现了 Closeable 接口。
1. Source 集成了 Closeable 接口,表示 Source 提供了一个 close 方法关闭读取数据的流。
2. Source 定义了一个 read(Buffer sink, long byteCount) 用来读取数据,一个 timeout() 方法用来设置读取超时。
3. BufferedSource 定义了不少 readXXX(……) 用来读取数据。
复制代码
RealBufferedSource 中的 readXXX(……) 方法和 RealBufferedSink 中的 writeXXX(……) 相似,都是经过成员变量 buffer 和 构造对象时传入的 Source 对象配合起来读取数据。
总结一下整个读写框架的结构以下:
不管是 File 、Path、InputStream、OutputStream 、Socket ,在 Okio 框架中只要一个简单的 Okio.sink(……) 方法便可得到对应的输入流(RealBufferedSink)和输出流(RealBufferedSource)
并且 Okio 还给输入/输出流的都提供一个额外参数:Timeout,用来设置读写的超时设置。
全部的 sink 方法,都会调用到
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
复制代码
Okio.sink() 会建立一个匿名内部类的实例,实现了 write 方法,用来写入数据到 OutputStream(File 、Path、Socket 都会被转成成 OutputStream(),每次写入数据的时候,都会检测是否超时。(超时机制后面后讲)
Okio.Source() 相似会建立一个实现 Source 接口的匿名内部类实例。实现 read 方法 ,负责从 InputStream 中读取数据。
Okio 在读写数据的时候,里面都会用用一个 Segment 对象。Segment 是 Okio 定义的一个***链表结构***的数据片断,每一个 Segment 能够最多存放 8K 的字节。
写数据的时候 Okio 会先把数据写到 buffer 中
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
sink.close();
复制代码
Okio.buffer() 返回的是 RealBufferedSink
@Override public BufferedSink writeUtf8(String string) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeUtf8(string);
return emitCompleteSegments();
}
复制代码
查看 writeUtf8
@Override public Buffer writeUtf8(String string) {
return writeUtf8(string, 0, string.length());
}
复制代码
而后把 String 变成一个 Segment 链表
@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
……
// Transcode a UTF-16 Java String to UTF-8 bytes.
for (int i = beginIndex; i < endIndex;) {
int c = string.charAt(i);
if (c < 0x80) {
Segment tail = writableSegment(1);
byte[] data = tail.data;
……
while (i < runLimit) {
c = string.charAt(i);
if (c >= 0x80) break;
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
}
……
} else if (c < 0x800) {
// Emit a 11-bit character with 2 bytes.
writeByte(c >> 6 | 0xc0); // 110xxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} ……
}
return this;
}
复制代码
经过 writableSegment 是否是要开辟新的 Segment 到队列尾部
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
复制代码
在看 emitCompleteSegments()
@Override
public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
复制代码
buffer.completeSegmentByteCount() 用来计算 Segment 的缓存的字节长度
public long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
return result;
}
复制代码
sink.write(buffer, byteCount) 就是以前传入的集成的 Sink 匿名类。
总结一下整个流程
读数据的时候 Buffer 起到的做用相似,直接贴一下流程图
Okio 能够给他 OutputStream 、InputStream 增长一个超市设置。读写文件时会设置一个默认的 TimeOut ,这个方法是个空实现。
在读写 Socket 的时候,Okio 给咱们展现一个如何设置一个异步的超时机制,用来在 Socket 读写超时时关闭流。
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
复制代码
private static AsyncTimeout timeout(final Socket socket) {
return new AsyncTimeout() {
@Override
protected IOException newTimeoutException(@Nullable IOException cause) {
……
}
@Override
protected void timedOut() {
try {
socket.close();
}……
}
};
}
复制代码
这里看出会返回一个 AsyncTimeout 的匿名对象,主要在 timeOut() 中关闭 Socket。
private static Sink sink(final OutputStream out, final Timeout timeout) {
……
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
……
while (byteCount > 0) {
timeout.throwIfReached();
……
}
}
……
};
}
复制代码
在看一下 throwIfReached 方法
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
复制代码
若是 hasDeadline 是 true,而且 deadlineNanoTime 大于 System.nanoTime() 来判断是否达超时。
public final Sink sink(final Sink sink) {
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
long toWrite = 0L;
for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
int segmentSize = s.limit - s.pos;
toWrite += segmentSize;
if (toWrite >= byteCount) {
toWrite = byteCount;
break;
}
}
// Emit one write. Only this section is subject to the timeout.
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}
@Override
public void flush() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.flush();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public void close() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public Timeout timeout() {
return AsyncTimeout.this;
}
@Override
public String toString() {
return "AsyncTimeout.sink(" + sink + ")";
}
};
}
复制代码
能够看出 timeout.sink(sink) 从新包装了 Sink 给 Sink 的每一个方法都增长一个 enter() 方法
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
复制代码
这里会发现若是知足了条件,会执行 scheduleTimeout 方法。可是默认状况下,条件不会被知足。
查看一下 SocketTimeoutTest
@Test
public void readWithoutTimeout() throws Exception {
Socket socket = socket(ONE_MB, 0);
BufferedSource source = Okio.buffer(Okio.source(socket));
source.timeout().timeout(5000, TimeUnit.MILLISECONDS);
source.require(ONE_MB);
socket.close();
}
复制代码
这里能够看到,须要调用 source.timeout().timeout(5000, TimeUnit.MILLISECONDS)
public Timeout timeout(long timeout, TimeUnit unit) {
if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
if (unit == null) throw new IllegalArgumentException("unit == null");
this.timeoutNanos = unit.toNanos(timeout);
return this;
}
复制代码
这里能够看到 timeoutNanos 在这里赋值了。因此设置 timeout(5000, TimeUnit.MILLISECONDS) 后会出发 scheduleTimeout(this, timeoutNanos, hasDeadline)
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
……
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
复制代码
这里用到一个同步锁、启动一个 Watchdog 线程。而且根据 timeout 的超时时间,把 AsyncTimeout 添加到一个任务队列中。
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
复制代码
Watchdog 线程会一直同步遍历任务队列执行 awaitTimeout()
static @Nullable
AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
}
复制代码
} 这里会根据队列头部的 AsyncTimeout 节点,计算出剩余时间,而后执行 AsyncTimeout.class.wait(waitMillis, (int) waitNanos) 方法阻塞。
注意这个的 wait(timeout) 会被 AsyncTimeout.class.notify() 唤醒。
复制代码
若是任务队列为空会执行 AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS) ,等待一分钟。而后再判断是否有新的任务。