消息服务框架使用案例之--大文件上传(断点续传)功能html
在咱们的一个产品应用中,客户须要上传大量的文件到服务器,其中不乏很大的视频文件。虽然可使用FTP这样成熟稳定的工具,但客户表示不会使用FTP工具,而且咱们产品也以为客户从咱们软件在切换到FTP用户体验很差,若是作成后台脚本调用FTP上传那么进度信息很难呈现到咱们软件上。最终,决定咱们本身作文件上传功能。服务器
大文件上传受限于服务器每次处理数据的能力,不能一次传输完成,因此分块上传是必然的了,因为上传时间可能较长,中途可能由于网络或者人为缘由终止上传,因此还须要断点上传功能。网络
分块上传其实是在客户端分块读取文件,而后在服务器分块写入文件,每次读写记录下读写的起始位置,也就是文件的偏移量,和要读写的数据长度。在上传过程当中,每完成一个文件数据块的写入,就向客户端返回一次信息,客户端据此进行下一文件数据块的读取。
断点续传功能也比较好实现,就是上传过程当中将文件在服务器写为临时文件,等所有写完了(文件上传完),将此临时文件重命名为正式文件便可,若是中途上传中断过,下次上传的时候根据当前临时文件大小,做为在客户端读取文件的偏移量,今后位置继续读取文件数据块,上传到服务器今后偏移量继续写入文件便可。框架
假设咱们将每个文件数据块看作一份“消息”,那么文件上传本质上就是客户端和服务器两端频繁的消息交互而已。消息服务框架(MSF)是一个集成了服务容器和消息访问的框架,正好能够用来作文件上传应用。具体作法就是在服务端,编写一个“文件上传服务”,在客户端,编写一个调用上传服务的回调方法便可。异步
新建一个MSF服务类:tcp
public class FilesService : ServiceBase { }
而后添加一个处理上传文件的方法:分布式
/// <summary> /// 批量上传文件(经过回调客户端的方式,支持断点续传) /// </summary> /// <param name="list">文件列表</param> /// <returns></returns> public UploadResult UploadFiles(List<UploadFileInfos> list) { int uploadCount = 0; foreach (var uploadInfo in list) { string pathfile = string.Empty; try { pathfile = this.MapServerPath(uploadInfo.FilePath); if (!Directory.Exists(Path.GetDirectoryName(pathfile))) { Directory.CreateDirectory(Path.GetDirectoryName(pathfile)); } if (File.Exists(pathfile)) { FileInfo fi = new FileInfo(pathfile); if (fi.Length == uploadInfo.Size && fi.LastWriteTime == uploadInfo.FileModifyTime) { Console.WriteLine("文件 {0} {1}", pathfile, "已上传,跳过"); continue;//文件已上传,跳过 } else { fi.Delete(); } } //"断点"上传的文件 long offset = 0; //上传的分部文件名称增长一个文件长度数字,避免下次客户端上传的时候,修改了内容。 //若是文件上传了一部分,的确修改了内容,那么原来上传的部分文件就丢弃了。 string partFile = pathfile + uploadInfo.Size + ".part"; if (File.Exists(partFile)) { FileInfo fi = new FileInfo(partFile); offset = fi.Length; } while (offset < uploadInfo.Size) { uploadInfo.Offset = offset; uploadInfo.Length = MaxReadSize; if (uploadInfo.Offset + uploadInfo.Length > uploadInfo.Size) uploadInfo.Length = (int)(uploadInfo.Size - uploadInfo.Offset); //回调客户端,通知上传文件块 var data = GetUploadFileData(uploadInfo); if (data.Length == 0) { //若是有长度为零的文件表示客户读取文件失败,终止上传操做 throw new Exception("读取客户端文件失败(Length=0),终止上传操做"); } if (data.Length != uploadInfo.Length) throw new Exception("网络异常:上传的文件流数据块大小与预期的不一致"); //等待上次写完 resetEvent.WaitOne(); //异步写文件 System.Threading.Thread t = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(obj => { WriteFileInfo wfi = (WriteFileInfo)obj; CurWriteFile(wfi.FileName, wfi.WriteData, wfi.Offset); })); t.Start(new WriteFileInfo() { FileName = partFile, WriteData = data, Offset = offset }); offset += uploadInfo.Length; } resetEvent.WaitOne(); //重命名到正常文件名 File.Move(partFile, pathfile); System.IO.File.SetLastWriteTime(pathfile, uploadInfo.FileModifyTime); uploadCount++; resetEvent.Set(); } catch (Exception ex) { resetEvent.Set(); return new UploadResult() { Success = false, FilesCount = 0, Message = ex.Message }; }//end try } //end for return new UploadResult() { Success = true, FilesCount = list.Count }; }
在这个方法中,有一个重要方法,
//回调客户端,通知上传文件块
var data = GetUploadFileData(uploadInfo);ide
它调用了MSF框架服务上下文的回调函数CallBackFunction,来读取客户端文件数据的,代码以下:函数
private byte[] GetUploadFileData(UploadFileInfos fileinfo) { return base.CurrentContext.CallBackFunction<UploadFileInfos, byte[]>(fileinfo); }
另外,服务端写文件的方法CurWriteFile 实现以下:工具
/// <summary> /// 将服务器端获取到的字节流写入文件 /// </summary> /// <param name="pReadByte">流</param> /// <param name="fileName">文件名</param> /// <param name="offset">要写入文件的位置</param> public void CurWriteFile(string fileName, byte[] pReadByte, long offset) { FileStream pFileStream = null; try { pFileStream = new FileStream(fileName, FileMode.OpenOrCreate); pFileStream.Seek(offset, SeekOrigin.Begin); pFileStream.Write(pReadByte, 0, pReadByte.Length); } catch(Exception ex) { throw new Exception("写文件块失败,写入位置:"+offset+",文件名:"+fileName+",错误缘由:"+ex.Message); } finally { if (pFileStream != null) pFileStream.Close(); resetEvent.Set(); } }
如今看文件上传客户端代码,如何提供服务端须要的文件读取回调函数:
ServiceRequest request = new ServiceRequest(); request.ServiceName = "FilesService"; request.MethodName = "UploadFiles"; request.Parameters = new object[] { infos }; Proxy srvProxy = new Proxy(); srvProxy.ServiceBaseUri = string.Format("net.tcp://{0}", serverHost); srvProxy.ErrorMessage += srvProxy_ErrorMessage; Task<UploadResult> result= srvProxy.RequestServiceAsync<UploadResult, UploadFileInfos, byte[]>(request, uploadingInfo => { //action委托方法显示进度给客户端 action(new UploadStateArg() { State = uploadingInfo.Offset + uploadingInfo.Length >= uploadingInfo.Size ? UploadState.Success: UploadState.Uploading, ProgressFile = uploadingInfo.FilePath, ProcessValue = Convert.ToInt32(uploadingInfo.Offset * 100 / uploadingInfo.Size), TotalProcessValue = Convert.ToInt32((uploadingInfo.UploadIndex +1) * 100 / index) }); Console.WriteLine(">>>Debug:Path:{0},FilePath:{1}",folder, uploadingInfo.FilePath); var fullName = Path.IsPathRooted(folder)? folder + uploadingInfo.FilePath : uploadingInfo.FilePath; Console.WriteLine(">>>服务器读取客户端文件:{0},偏移量:{1} 长度:{2}", fullName, uploadingInfo.Offset, uploadingInfo.Length); return ReadFileData(fullName, uploadingInfo.Offset, uploadingInfo.Length); } );
在上面的方法中, srvProxy.RequestServiceAsync泛型方法须要3个参数,第一个参数是服务的结果类型,第二个参数是提供给服务端回调方法(前面的base.CurrentContext.CallBackFunction方法)的参数,第三个参数是服务回调方法的结果。srvProxy.RequestServiceAsync 的回调方法的参数 uploadingInfo 是服务器推送过来的消息,里面包含了须要读取的文件信息,包括文件名,偏移量,读取长度等信息。
其中,客户端读取文件的方法 ReadFileData 实现以下:
/// <summary> /// 读取文件返回字节流 /// </summary> /// <param name="fileName">文件路径</param> /// <param name="offset">要读取的文件流的位置</param> /// <param name="length">要读取的文件块大小</param> /// <returns></returns> private byte[] ReadFileData(string fileName, long offset, int length) { FileStream pFileStream = null; byte[] pReadByte = new byte[0]; try { pFileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read); BinaryReader r = new BinaryReader(pFileStream); r.BaseStream.Seek(offset, SeekOrigin.Begin); pReadByte = r.ReadBytes(length); return pReadByte; } catch { return pReadByte; } finally { if (pFileStream != null) pFileStream.Close(); } }
这样,在一次文件上传的“请求-响应”过程当中,MSF的服务端进行了屡次回调客户端的操做,客户端根据服务端推送过来的参数信息来精确的读取服务端须要的文件数据。一个支持断点续传的大文件上传服务,使用MSF框架就作好了。
本文使用到的其它相关服务端对象的代码定义以下:
/// <summary> /// 上传状态枚举 /// </summary> public enum UploadState { /// <summary> /// 上传成功 /// </summary> Success, /// <summary> /// 上传中 /// </summary> Uploading, /// <summary> /// 错误 /// </summary> Error } /// <summary> /// 上传状态参数 /// </summary> public class UploadStateArg { /// <summary> /// 上传状态 /// </summary> public UploadState State { get; set; } /// <summary> /// 上传的文件名 /// </summary> public string ProgressFile { get; set; } /// <summary> /// 处理的消息,若是出错,这里是错误消息 /// </summary> public string Message { get; set; } /// <summary> /// 处理进度(百分比) /// </summary> public int ProcessValue { get; set; } /// <summary> /// 整体处理进度(百分比) /// </summary> public int TotalProcessValue { get; set; } }
若是你不清楚如何使用MSF来实现本文的功能,请先阅读下面的文章:
建议你读完相关的其它两篇文章:
“一切都是消息”--MSF(消息服务框架)之【请求-响应】模式
“一切都是消息”--MSF(消息服务框架)之【发布-订阅】模式
读完后,建议你再读读MSF的理论总结:
有关消息服务框架(MSF)更多的讨论,请加咱们QQ群讨论,群号:18215717 ,加群口令:消息服务框架