源码解析,如需转载,请注明做者:Yuloran (t.cn/EGU6c76)html
造轮子者:Season_zlcjava
本文主要讲述 RxDownload2
的多线程断点下载技术服务器
服务器必须支持按 byte-range
下载,也就是支持 Range: bytes=xxx-xxx
请求头。详见 Http 协议 rfc2616 - Range。多线程
很简单,先读取 Content-Length
响应头,获取文件大小,而后用文件大小除以线程数就可计算出每条线程的下载范围。app
好比,假设文件大小是 100 bytes
,下载线程数为 3
。由于 100 / 3 = 33
,因此:框架
线程 0
的下载范围是 0 ~32
即 [0 * 33 ~ (0 + 1) * 33 - 1]
线程 1
的下载范围是 33~65
即 [1 * 33 ~ (1 + 1) * 33 - 1]
线程 2
的下载范围是 66~99
即 [2 * 33 ~ 100 - 1]
上代码:dom
public void prepareDownload(File lastModifyFile, File tempFile, File saveFile, long fileLength, String lastModify) throws IOException, ParseException {
// 将响应头中的上次修改时间转为 long 类型的 unix 时间戳,而后保存到文件中
writeLastModify(lastModifyFile, lastModify);
// 设置下载文件的大小、计算每条线程的下载范围并保存到 tempFile 中
prepareFile(tempFile, saveFile, fileLength);
}
复制代码
private void prepareFile(File tempFile, File saveFile, long fileLength) throws IOException {
RandomAccessFile rFile = null;
RandomAccessFile rRecord = null;
FileChannel channel = null;
try {
rFile = new RandomAccessFile(saveFile, ACCESS);
rFile.setLength(fileLength);//设置下载文件的长度
rRecord = new RandomAccessFile(tempFile, ACCESS);
// 下载范围在文件中的记录方式:|start|end|start|end|start|end|...
// 数据类型是 long,long类型在 java 中占 8 个字节,因此每一个线程的下载范围都占 16 字节
// 因此 tempFile 的长度 RECORD_FILE_TOTAL_SIZE = 16 * 线程数
rRecord.setLength(RECORD_FILE_TOTAL_SIZE); //设置指针记录文件的大小
// NIO 内存映射文件的方式读写二进制文件,速度更快
channel = rRecord.getChannel();
// 注意映射方式为读写
MappedByteBuffer buffer = channel.map(READ_WRITE, 0, RECORD_FILE_TOTAL_SIZE);
long start;
long end;
// 计算并保存每条线程的下载范围,计算方法同上面举的例子
int eachSize = (int) (fileLength / maxThreads);
for (int i = 0; i < maxThreads; i++) {
if (i == maxThreads - 1) {
start = i * eachSize;
end = fileLength - 1;
} else {
start = i * eachSize;
end = (i + 1) * eachSize - 1;
}
buffer.putLong(start);
buffer.putLong(end);
}
} finally {
closeQuietly(channel);
closeQuietly(rRecord);
closeQuietly(rFile);
}
}
复制代码
很简单,上面已经将每条线程的下载范围保存到了 tempFile
中,只要再从 tempFile
中按位置读出来就好了。工具
public DownloadRange readDownloadRange(File tempFile, int i) throws IOException {
RandomAccessFile record = null;
FileChannel channel = null;
try {
// 入参 i 表示线程序号
record = new RandomAccessFile(tempFile, ACCESS);
channel = record.getChannel();
MappedByteBuffer buffer = channel
.map(READ_WRITE, i * EACH_RECORD_SIZE, (i + 1) * EACH_RECORD_SIZE);
long startByte = buffer.getLong();
long endByte = buffer.getLong();
return new DownloadRange(startByte, endByte);
} finally {
closeQuietly(channel);
closeQuietly(record);
}
}
复制代码
注意 MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, (i + 1) * EACH_RECORD_SIZE);
这句代码是有坑的,可是表现不出来,由于这里的文件打开方式为 READ_WRITE
。要是改为 READ_ONLY
就有致使读取最后一条线程的下载范围时抛出IllegalArgumentException
(代码静态检查工具 Fortify
提示要以合适的权限打开文件,我将其改成了 READ_ONLY
,发现了这一问题)。源码分析
错误缘由:map() 方法的最后一个参数表示要映射的字节数,以只读方式打开时,若参数大小超过了文件剩余可读字节数,就会抛出 IllegalArgumentException
。而以读写方式打开文件时,会自动扩展文件长度,因此不会抛出异常。post
由于每段下载范围的长度都是 EACH_RECORD_SIZE = 16 bytes
,因此,上述代码应修改成: MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, EACH_RECORD_SIZE);
本身写了个示例代码,测试了一下:
RandomAccessFile file = new RandomAccessFile("temp.txt", "rw");
file.setLength(48);
FileChannel channel = file.getChannel();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 48);
for (int i = 0; i < 3; i++) {
if (i == 2) {
buffer.putLong(i * 33).putLong(99);
} else {
buffer.putLong(i * 33).putLong((i + 1) * 33 - 1);
}
}
channel.close();
RandomAccessFile file1 = new RandomAccessFile("temp.txt", "r");
FileChannel channel1 = file1.getChannel();
for (int i = 0; i < 3; i++) {
MappedByteBuffer buffer1 = channel1.map(FileChannel.MapMode.READ_ONLY, i * 16, 16);
System.out.println(String.format("long1: %d", buffer1.getLong()));
System.out.println(String.format("long2: %d", buffer1.getLong()));
}
channel1.close();
复制代码
给 Notepad++
装个十六进制查看器,查看生成的 temp.txt
中的内容是否和咱们代码写的同样:
上面是十六进制,换算成十进制就是上面示例代码写的内容。
很简单,利用 RandomAccessFile
可从任意位置读写的属性,分别将每条线程下载的数据写到同一个文件的不一样位置。
public void saveFile(FlowableEmitter<DownloadStatus> emitter, int i, File tempFile, File saveFile, ResponseBody response) {
RandomAccessFile record = null;
FileChannel recordChannel = null;
RandomAccessFile save = null;
FileChannel saveChannel = null;
InputStream inStream = null;
try {
try {
// 1.映射 tempFile 到内存中
record = new RandomAccessFile(tempFile, ACCESS);
recordChannel = record.getChannel();
MappedByteBuffer recordBuffer = recordChannel
.map(READ_WRITE, 0, RECORD_FILE_TOTAL_SIZE);
// i 表明线程序号,startIndex 表明该线程下载范围的 start 字段在文件中的指针位置
int startIndex = i * EACH_RECORD_SIZE;
// start 表示该线程的起始下载位置
long start = recordBuffer.getLong(startIndex);
// 新建一个下载状态对象,用于发射下载进度
DownloadStatus status = new DownloadStatus();
// totalSize 表明文件总大小,也能够从 saveFile 中读出
long totalSize = recordBuffer.getLong(RECORD_FILE_TOTAL_SIZE - 8) + 1;
status.setTotalSize(totalSize);
int readLen;
byte[] buffer = new byte[2048];
inStream = response.byteStream();
save = new RandomAccessFile(saveFile, ACCESS);
saveChannel = save.getChannel();
while ((readLen = inStream.read(buffer)) != -1 && !emitter.isCancelled()) {
MappedByteBuffer saveBuffer = saveChannel.map(READ_WRITE, start, readLen);
saveBuffer.put(buffer, 0, readLen);
// 成功下载一段数据后,将已下载位置写回 start 字段
start += readLen;
recordBuffer.putLong(startIndex, start);
// 计算已下载字节数 = 文件长度 - 每条线程剩余未下载字节数
status.setDownloadSize(totalSize - getResidue(recordBuffer));
// 发射下载进度
emitter.onNext(status);
}
// 发射下载完成
emitter.onComplete();
} finally {
closeQuietly(record);
closeQuietly(recordChannel);
closeQuietly(save);
closeQuietly(saveChannel);
closeQuietly(inStream);
closeQuietly(response);
}
} catch (IOException e) {
emitter.onError(e);
}
}
复制代码
下载流程就不分析了,只要熟练使用下图所示两个快捷键,什么源码分析都是手到擒来:
RxDownload2
源码解析系列至此结束,虽然框架比较简单,可是仍是有不少值得学习的东西。尤为是做者对 RxJava2
的使用,能够说很是之六了。他写的十篇 Rxjava2
教程也很是的通俗易懂,感兴趣的能够看一看。
RxDownload2 系列文章: