这段时间看了看工做室的工具库的下载组件,发现其存在一些问题:git
1.下载核心逻辑有 bug,在暂停下载或下载失败等状况时有几率没法顺利完成下载。
2.虽然原来的设计是采用多线程断点续传的设计,但打了一下日志发现其实下载任务都是在同一个线程下串行执行,并无起到加快下载速度的做用。
考虑到原来的代码并不复杂,所以对这部分下载组件进行了重写。这里记录一下里面的多线程断点续传功能的实现。github
请查看完整的PDF版
(更多完整项目下载。未完待续。源码。图文知识后续上传github。)
能够点击关于我联系我获取完整PDF
(VX:mm14525201314)数据库
首先咱们谈一谈,多线程下载的意义。安全
在平常的场景下,网络中不可能只有下载方与服务器之间这样一条链接,为了不在这样的场景下的网络拥塞,TCP 协议经过调节窗口的大小来避免出现拥塞,但这个窗口的大小可能没办法达到咱们预期的效果:充分利用咱们的带宽。所以咱们能够采用多个 TCP 链接的形式来提升咱们带宽的利用率,从而加快下载速度。服务器
打个比喻就是咱们要从一个水缸中用抽水机经过水管抽水,因为管子的直径等等的限制,咱们单条管子没法彻底利用咱们的抽水机的抽水动力。所以咱们就将这些抽水的任务分红了多份,分摊到多个管子上,这样就能够更充分的利用咱们的抽水机动力,从而提升抽水的速度。网络
所以,咱们使用多线程下载的主要意义就是——提升下载速度。多线程
前面提到了咱们主要的目的是将一个总的下载任务分摊到多个子任务中,好比假设咱们用 5 个线程下载这个文件,那么咱们就能够对一个长度为 N 的任务进行以下图的均分:
但真实场景下每每 N 都不是恰好为 5 的倍数的,所以对于最后一个任务还须要加上剩余的任务量,也就是 N/5+N%5。框架
上面的任务分配咱们已经了解了,看起来很理想,但有一个问题,咱们如何实现向服务器只请求这个文件的某一段而不是所有呢?dom
咱们能够经过在请求头中加入 Range 字段来指定请求的范围,从而实现指定某一段的数据。ide
如:RANGE bytes=10000-19999
就指定了 10000-19999 这段字节的数据
因此咱们的核心思想就是经过它拿到文件对应字节段的 InputStream,而后对它读取并写入文件。
下面再讲讲文件写入问题,因为咱们是多线程下载,所以文件并非每次都是从前日后一个个字节写入的,随时可能在文件的任何一个地方写入数据。所以咱们须要可以在文件的指定位置写入数据。这里咱们用到了RandomAccessFile
来实现这个功能。
RandomAccessFile
是一个随机访问文件类,同时整合了 FileOutputStream
和 FileInputStream
,支持从文件的任何字节处读写数据。经过它咱们就能够在文件的任何字节处写入数据。
接下来简单讲讲咱们这里是如何使用 RandomAccessFile
的。咱们对于每一个子任务来讲都有一个开始和结束的位置。每一个任务均可以经过 RandomAccessFile::seek
跳转到文件的对应字节位置,而后从该位置开始读取 InputStream
并写入。
这样,就实现了不一样线程对文件的随机写入。
因为咱们在真正开始下载以前,咱们须要先将任务分配到各个线程,所以咱们须要先了解到文件的大小。
为了获取到文件的大小,咱们用到 Response Headers
中的 Content-Length
字段。
以下图所示,能够看到,打开该下载请求的连接后,Response Headers
中包含了咱们须要的 Content-Length
,也就是该文件的大小,单位是字节。
对于多个子任务,咱们如何实现它们的断点续传呢?
其实原理很简单,只须要保证每一个子任务的下载进度可以被即时地记录便可。这样继续下载时只须要读取这些下载记录,从上次下载结束的位置开始下载便可。
它的实现有不少方式,只要能作到数据持久化便可。这里我使用的是数据库来实现。
这样,咱们的子任务须要拥有一些必要的信息
completedSize
:当前下载完成大小taskSize
:子任务总大小startPos
:子任务开始位置currentPos
:子任务进行到的位置endPos
:子任务结束位置经过这些信息,咱们就可以记录子任务的下载进度从而恢复咱们以前的下载,实现断点续传。
下面咱们用代码来实现这样一个多线程下载功能。
首先,咱们定义一下下载中的各个状态:
public class DownloadStatus { public static final int IDLE = 233; // 空闲,默认状态 public static final int COMPLETED = 234; // 完成 public static final int DOWNLOADING = 235; // 下载中 public static final int PAUSE = 236; // 暂停 public static final int ERROR = 237; // 出错 }
能够看到,这里定义了如上的五种状态。
这里须要用到如数据库及 HTTP 请求的功能,咱们这里定义其接口以下,具体实现各位能够根据须要本身实现:
public interface DownloadDbHelper { /** * 从数据库中删除子任务记录 * @param task 子任务记录 */ void delete(SubDownloadTask task); /** * 向数据库中插入子任务记录 * @param task 子任务记录 */ void insert(SubDownloadTask task); /** * 在数据库中更新子任务记录 * @param task 子任务记录 */ void update(SubDownloadTask task); /** * 获取全部指定Task下的子任务记录 * @param taskTag Task的Tag * @return 子任务记录 */ List<SubDownloadTask> queryByTaskTag(String taskTag); }
public interface DownloadHttpHelper { /** * 获取文件总长度 * @param url 下载url * @param callback 获取文件长度CallBack */ void getTotalSize(String url, NetCallback<Long> callback); /** * 获取InputStream * @param url 下载url * @param start 开始位置 * @param end 结束位置 * @param callback 获取字节流的CallBack */ void getStreamByRange(String url, long start, long end, NetCallback<InputStream> callback); }
咱们先从上到下,从子任务开始实现。在个人设计中,它具备以下的成员变量:
@Entity public class SubDownloadTask implements Runnable { public static final int BUFFER_SIZE = 1024 * 1024; private static final String TAG = SubDownloadTask.class.getSimpleName(); @Id private Long id; private String url; // 文件下载的 url private String taskTag; // 父任务的 Tag private long taskSize; // 子任务大小 private long completedSize; // 子任务完成大小 private long startPos; // 开始位置 private long currentPos; // 当前位置 private long endPos; // 结束位置 private volatile int status; // 当前下载状态 @Transient private SubDownloadListener listener; // 子任务下载监听,主要用于提示父任务 @Transient private File saveFile; // 要保存到的文件 ... }
因为这里的数据库的操做是用 GreenDao
实现,所以这里有一些相关注解,各位能够忽略。
InputStream
获取能够看到,子任务是一个 Runnable,咱们能够经过其 run 方法开始下载,这样就能够经过如 ExecutorService 来开启多个线程执行子任务。
咱们看到其 run 方法:
@Override public void run() { status = DownloadStatus.DOWNLOADING; DownloadManager.getInstance() .getHttpHelper() .getStreamByRange(url, currentPos, endPos, new NetCallback<InputStream>() { @Override public void onResult(InputStream inputStream) { listener.onSubStart(); writeFile(inputStream); } @Override public void onError(String message) { listener.onSubError("文件流获取失败"); status = DownloadStatus.ERROR; } }); }
能够看到,咱们获取了其从 currentPos
到 endPos
端的字节流,经过其 Response Body 拿到了它的 InputStream
,而后调用了 writeFile(InputStream)
方法进行文件的写入。
文件写入
接下来看到 writeFile
方法:
private void writeFile(InputStream in) { try { RandomAccessFile file = new RandomAccessFile(saveFile, "rwd"); // 经过 saveFile 创建RandomAccessFile file.seek(currentPos); // 跳转到对应位置 byte[] buffer = new byte[BUFFER_SIZE]; while (true) { // 循环读取 InputStream,直到暂停或读取结束 if (status != DownloadStatus.DOWNLOADING) { // 状态不为 DOWNLOADING,中止下载 break; } int offset = in.read(buffer, 0, BUFFER_SIZE); if (offset == -1) { // 读取不到数据,说明读取结束 break; } // 将读取到的数据写入文件 file.write(buffer, 0, offset); // 下载数据并在数据库中更新 currentPos += offset; completedSize += offset; DownloadManager.getInstance() .getDbHelper() .update(this); // 通知父任务下载进度 listener.onSubDownloading(offset); } if(status == DownloadStatus.DOWNLOADING) { // 下载完成 status = DownloadStatus.COMPLETED; // 通知父任务下载完成 listener.onSubComplete(completedSize); } file.close(); in.close(); } catch (IOException e) { e.printStackTrace(); listener.onSubError("文件下载失败"); status = DownloadStatus.ERROR; resetTask(); } }
具体流程能够看代码中的注释。能够看到,子任务实际上就是循环读取 InputStream
,并写入文件,同时将下载进度同步到数据库。
父任务也就是咱们具体的下载任务,咱们一样先看到成员变量:
public class DownloadTask implements SubDownloadListener { private static final String TAG = DownloadTask.class.getSimpleName(); private String tag; // 下载任务的 Tag,用于区分不一样下载任务 private String url; // 下载 url private String savePath; // 保存路径 private String fileName; // 保存文件名 private DownloadListener listener; // 下载监听 private long completeSize; // 下载完成大小 private long totalSize; // 下载任务总大小 private int status; // 当前下载进度 private int threadNum; // 线程数(由外部设置的每一个任务的下载线程数) private File file; // 保存文件 private List<SubDownloadTask> subTasks; // 子任务列表 private ExecutorService mExecutorService; // 线程池,用于执行子任务 ... }
对于一个下载任务,能够经过 download 方法开始执行:
public void download() { listener.onStart(); subTasks = querySubTasks(); status = DownloadStatus.DOWNLOADING; if (subTasks.isEmpty()) { // 是新任务 downloadNewTask(); } else if (subTasks.size() == threadNum) { // 不是新任务 downloadExistTask(); } else { // 不是新任务,但下载线程数有误 listener.onError("断点数据有误"); resetTask(); } }
能够看到,咱们先将子任务列表从数据库中读取出来。
downloadNewTask
方法。downloadExistTask
方法。咱们先看到 downloadNewTask
方法:
DownloadManager.getInstance() .getHttpHelper() .getTotalSize(url, new NetCallback<Long>() { @Override public void onResult(Long total) { completeSize = 0L; totalSize = total; initSubTasks(); startAsyncDownload(); } @Override public void onError(String message) { error("获取文件长度失败"); } });
能够看到,获取到总长度后,经过调用 initSubTasks
方法,对子任务列表进行了初始化(计算子任务长度等),而后调用了 startAsyncDownload
方法后经过 ExecutorService
运行子任务进入子任务进行下载。
咱们看到 initSubTasks
方法:
private void initSubTasks() { long averageSize = totalSize / threadNum; for (int taskIndex = 0; taskIndex < threadNum; taskIndex++) { long taskSize = averageSize; if (taskIndex == threadNum - 1) { // 最后一个任务,则 size 还须要加入剩余量 taskSize += totalSize % threadNum; } long start = 0L; int index = taskIndex; while (index > 0) { start += subTasks.get(index - 1).getTaskSize(); index--; } long end = start + taskSize - 1; // 注意这里 SubDownloadTask subTask = new SubDownloadTask(); subTask.setUrl(url); subTask.setStatus(DownloadStatus.IDLE); subTask.setTaskTag(tag); subTask.setCompletedSize(0); subTask.setTaskSize(taskSize); subTask.setStartPos(start); subTask.setCurrentPos(start); subTask.setEndPos(end); subTask.setSaveFile(file); subTask.setListener(this); DownloadManager.getInstance() .getDbHelper() .insert(subTask); subTasks.add(subTask); } }
能够看到就是计算每一个任务的大小及开始及结束点的位置,这里要注意的是 endPos 须要 -1,不然各个任务的下载位置会重叠,而且最后一个任务会多下载一个字节致使如文件损坏等影响。具体缘由就是好比一个大小为 500 的文件,则应当是 0-499 而不是 0-500。
接下来咱们看看 downloadExistTask
方法:
private void downloadExistTask() { // 不是新任务,且下载线程数无误,计算已下载大小 completeSize = countCompleteSize(); totalSize = countTotalSize(); startAsyncDownload(); }
这里其实很简单,遍历子任务列表计算已下载量及总任务量,并调用 startAsyncDownload 开始多线程下载。
具体执行子任务咱们能够看到 startAsyncDownload
方法:
private void startAsyncDownload() { for (SubDownloadTask subTask : subTasks) { if (subTask.getCompletedSize() < subTask.getTaskSize()) { // 只下载没有下载结束的子任务 mExecutorService.execute(subTask); } } }
能够看到,这里其实只是经过 ExecutorService 执行对应子任务(Runnable)而已。
咱们接下来看到 pause 方法:
public void pause() { stopAsyncDownload(); status = DownloadStatus.PAUSE; listener.onPause(); }
能够看到,这里只是调用了 stopAsyncDownload
方法中止子任务。
看到 stopAsyncDownload
方法:
private void stopAsyncDownload() { for (SubDownloadTask subTask : subTasks) { if (subTask.getStatus() != DownloadStatus.COMPLETED) { // 下载完成的再也不取消 subTask.cancel(); } } }
能够看到,调用了子任务的 cancel
方法。
继续看到子任务的 cancel
方法:
void cancel() { status = DownloadStatus.PAUSE; listener.onSubCancel(); }
这里很简单,仅仅是将下载状态设置为了 PAUSE,这样在写入文件的下一次 while 循环时便会停止循环从而结束 Runnable
的执行。
看到 cancel
方法:
public void cancel() { stopAsyncDownload(); resetTask(); listener.onCancel(); }
能够看到和暂停的逻辑差很少,只是在暂停后还须要对子任务重置从而使得下次下载从头开始。
前面提到,外部能够经过 DownloadListener
监听下载的进度,下面是 DownloadListener
接口的定义:
public interface DownloadListener { default void onStart() {} default void onDownloading(long progress, long total) {} default void onPause() {} default void onCancel() {} default void onComplete() {} default void onError(String message) {} }
咱们实时的下载进度实际上是在子任务的保存文件过程当中才能体现出来的,一样,子任务的下载失败也须要通知到 DownloadListener
,这是怎么作到的呢?
前面提到了,咱们还定义了一个 SubDownloadListener
,其监听者就是子任务的父任务。经过监听咱们能够将子任务状态反馈到父任务,父任务再根据具体状况反馈数据给 DownloadListener
。
public interface SubDownloadListener { void onSubStart(); void onSubDownloading(int offset); void onSubCancel(); void onSubComplete(long completeSize); void onSubError(String message); }
好比以前看到,每次下载失败咱们都会调用 onSubError
,每次读取 offset 的数据都会调用 onSubDownload(offset)
,每一个任务下载失败都会调用 onSubComplete(completeSize)
。这样,咱们子任务的下载状态就成功返回给了上层。
咱们接着看看上层是如何处理的:
@Override public void onSubStart() {} @Override public void onSubDownloading(int offset) { synchronized (this) { completeSize = completeSize + offset; listener.onDownloading(completeSize, totalSize); } } @Override public void onSubCancel() {} @Override public void onSubComplete(long completeSize) { checkComplete(); } @Override public void onSubError(String message) { error(message); }
能够看到,每次下载到一段数据,它都会把数据量返回上来,此时 completeSize
就加上了对应的 offset,而后再将新的 completeSize
通知给监听者,这样就实现了下载进度的监听。这里之因此加锁是由于会有多个线程(子任务线程)对 completeSize
进行操做,加锁保证线程安全。
而每次有子任务完成,它都会调用 checkComplete
方法检查是否下载完成,若每一个子任务都下载完成,则说明任务下载完成,而后通知监听者。
一样的,每次子任务出现错误,都会通知监听者出现错误,并作一些错误状况下的处理。
到这里,这篇文章就结束了,咱们成功实现了多线程断点续传下载功能。基于这个原理,咱们能够作一些上层的封装实现一个文件下载框架。
请查看完整的PDF版
(更多完整项目下载。未完待续。源码。图文知识后续上传github。)
能够点击关于我联系我获取完整PDF
(VX:mm14525201314)