访问:https://github.com/yangbajing/scala-applications/tree/master/file-upload 获取本文所述完整源码,包括Akka HTTP后端和HTML5实现的前端。html
在不少应用里面都会有相似大文件上传的需求,但不少时候咱们程序员都会以不支持或很差实现将其推脱掉^_^。前端
此次由于公司项目须要,另外一个组的同事使用Spring实现了一版。他们是采用在前端对大文件分块上传,后端在全部分块上传完成后合并的方式实现了。这个方案有一个弊端,若文件很大,后期对分块作合并时会很是的耗磁盘资源。git
本文,我使用Akka HTTP和Akka Stream作为后端服务,能够很优雅的实现大文件的断点续传。原理其实很是的简单,前端计算文件的hash(使用sha256),将hash传到后端查询是否有相同文件已上传,如有将返回已上传文件及文件长度(bytes)。这时候前端就能够知道文件的上传进度,进而判断还须要断点续传的偏移量或者已上传完成(这就是秒传)。程序员
这里有一个设计取舍:客户端对单个文件不作分片,从文件头开始上传。这样的一个好处是可简化服务端的实现,同时也能够优化服务端对文件的存储 (同一个文件将一直使用APPEND的方式写入文件,这样能够更高效的利用磁盘IO。同时,不须要分块合并,若文件很大,生成的大量分块在文件上传完成后再次合并会是一个很是大的资源开销) 。github
这个怎么说呢?断点下载Akka HTTP原生支持^_^。你只须要使用以下代码:web
private def downloadRoute: Route = path("download" / Segment) { hash => getFromFile(FileUtils.getLocalPath(hash).toFile) }
FileUtils.getLocalPath(hash)
函数经过对hash
值(sha256hex)进行计算和拼接,获取实际文件的本地存储路径再交给Akka HTTP提供的getFromFile
指令,剩下的工做就交给Akka。app
咱们能够经过向Akka HTTP发起HEAD
请求来查看支持的HTTP功能,看到在反回的header里有Accept-Ranges: bytes
,意思是服务端支持使用字节为单位的范围下载(断点下载功能既基于此实现)。curl
$ curl --head http://localhost:33333/file/download/7d0559e2f7bf42f0c2becc7fbf91b20ca2e7ec373c941fca21314169de9c7ef4 HTTP/1.1 200 OK Last-Modified: Fri, 28 Dec 2018 14:12:32 GMT ETag: "132766a7f528d080" Accept-Ranges: bytes Server: akka-http/10.1.6 Date: Sat, 29 Dec 2018 02:17:41 GMT Content-Type: application/octet-stream Content-Length: 65463496
很遗憾,Akka HTTP默认不支持断点上传,这须要自行实现。可是,Akka HTTP作为一个toolkit,足够灵活且强大,实现断点上传功能so easy。ide
基于常规的代码设计方式,咱们须要Controller
、Service
,那就先从Controller
开始:
def uploadRoute: Route = path("upload") { post { withoutSizeLimit { entity(as[Multipart.FormData]) { formData => onSuccess(fileService.handleUpload(formData)) { results => import me.yangbajing.fileupload.util.JacksonSupport._ complete(results) } } } } }
对于不熟悉Akka HTTP的朋友,推荐阅读我写的电子书(打个广告):Scala Web 开发。这里须要注意的一个指令是withoutSizeLimit
,默认Akka HTTP对请求大小限制比较低,咱们能够经过withoutSizeLimit
指令取消对单个API的请求大小限制,同时又不影响整个Web服务的大小限制。另外,这里经过entity(as[Multipart.FormData])
以Unmarshaller的方式获取整个Multipart.FormData
对象并传入 FileService#handleUpload
函数进行处理。
override def handleUpload(formData: Multipart.FormData): Future[Seq[FileBO]] = { formData.parts .map { part => val (hash, contentLength, startPosition) = part.name.split('.') match { case Array(a, b, c) => (a, b.toLong, c.toLong) case Array(a, b) => (a, b.toLong, 0L) case Array(a) => (a, 0L, 0L) case _ => throw new IllegalArgumentException(s"Multipart.FormData name格式不符合要求:${part.name}") } FileInfo(part, hash, contentLength, startPosition) } .log("fileInfo", info => logger.debug(s"fileInfo: $info")) .mapAsync(Constants.FILE_PART_MAX)(processFile) .runWith(Sink.seq) }
formData.parts
是一个Akka Stream流,类型签名为Source[Multipart.FormData.BodyPart, Any]
。有关Akka Stream更详细的资料请参阅Akka Streams官方文档。这里,每一个part
都表明FormData
的一个字段(对应HTML 5的FormData
类型,同时前端须要使用Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryrP4DAyu31ilqEWmz
方式发起请求)。每一个part.name
都是用英号逗号分隔的三部分来作为请求的字段名,分别是:<hash>.<content length>.<start position>
,这样咱们就能够在不加入任何其它字段的状况下告知服务端当前上传文件的sha256hex计算出的hash值、文件大小(bytes)和上传起始偏移量。
文件上传的核心逻辑在 FileUtils#uploadFile
函数:
def uploadFile(fileInfo: FileInfo)(implicit mat: Materializer, ec: ExecutionContext): Future[FileBO] = { val maybeMeta = FileUtils.getFileMeta(fileInfo.hash) val beContinue = maybeMeta.isDefined && fileInfo.startPosition > 0L if (beContinue) uploadContinue(fileInfo, maybeMeta.get) else uploadNewFile(fileInfo) }
uploadFile
函数根据是否为续传来分别调用uploadContinue
或uploadNewFile
函数。首先来看看新文件上传时的代码逻辑:
def uploadNewFile(fileInfo: FileInfo)(implicit mat: Materializer, ec: ExecutionContext) = { val bodyPart = fileInfo.bodyPart val tmpPath = if (fileInfo.validHash) FileUtils.getLocalPath(fileInfo.hash) else Files.createTempFile(FileUtils.TMP_DIR, bodyPart.filename.getOrElse(""), "") // (1) val sha = MessageDigest.getInstance("SHA-256") bodyPart.entity.dataBytes .map { byteString => byteString.asByteBuffers.foreach(sha.update) // (2) byteString } .runWith(FileIO.toPath(tmpPath)) // (3) .map { ioResult => val hash = Utils.bytesToHex(sha.digest()) // (4) if (fileInfo.validHash) { require(fileInfo.hash == hash, s"前端上传hash与服务端计算hash值不匹配,${fileInfo.hash} != $hash") } val localPath = if (fileInfo.validHash) tmpPath else move(hash, tmpPath, ioResult.count) // (5) FileBO(hash, localPath, ioResult.count, bodyPart.filename, bodyPart.headers) } }
ByteString
计算并更新sha256值。sha.digest()
方法获取已上传文件的sha256值。def uploadContinue(fileInfo: FileInfo, meta: FileMeta)(implicit mat: Materializer, ec: ExecutionContext) = { val bodyPart = fileInfo.bodyPart val localPath = FileUtils.getLocalPath(fileInfo.hash) logger.debug(s"断点续传,startPosition:${fileInfo.startPosition},路径:$localPath") val hash = fileInfo.hash bodyPart.entity.dataBytes .runWith(FileIO.toPath(localPath, Set(APPEND), fileInfo.startPosition)) // (1) .map(ioResult => FileBO(hash, localPath, meta.size + ioResult.count, bodyPart.filename, bodyPart.headers)) }
断点上传时的逻辑其实相对简单,须要注意的是在(1)
处调用FileIO.toPath
将流定入本地时须要以APPEND
模式追加写入到已存在文件。
在已实现断点上传功能之上,秒传的实现逻辑就很是清晰了。客户端在调用file/upload
接口上传文件以前先调用/file/progress/{hash}
接口判断相同hash值文件的上传状况,再决定下一步处理。
/file/progress/{hash}
接口秒传的代码逻辑台下:
def progressRoute: Route = path("progress" / Segment) { hash => onSuccess(fileService.progressByHash(hash)) { case Some(v) => import me.yangbajing.fileupload.util.JacksonSupport._ complete(v) case None => complete(StatusCodes.NotFound) } }
文件上传进度接口定义如上。
// FileServiceImpl.scala override def progressByHash(hash: String): Future[Option[FileMeta]] = { require(Objects.nonNull(hash) && hash.nonEmpty, "hash 不能为空。") Future.successful(FileUtils.getFileMeta(hash)) } // FileUtils.getFileMeta def getFileMeta(hash: String): Option[FileMeta] = { if (hash == null || Constants.HASH_LENGTH != hash.length) { None } else { val path = getLocalPath(hash) if (Files.exists(path) && Files.isReadable(path)) Some(FileMeta(hash, Files.size(path), path)) else None } }
文件上传进度服务实现定义如上。
本文以Akka HTTP和HTML 5讲述了怎样实现一个支持大文件断点上传、下载和秒传的示例应用程序。