Alamofire源码学习目录合集swift
Request基类不能被直接使用, 而是使用他的四个子类:api
- DataRequest
- 使用URLSessionDataTask请求数据, 数据保存在内存中, 使用Data对象储存
- DataStreamRequest
- 使用URLSessionDataTask请求数据, 使用OutputStream传出数据
- DownloadRequest
- 使用URLSessionDownloadTask下载数据, 文件下载在磁盘上
- UploadRequest
- 使用URLSessionUploadTask上传请求, 上传内容能够用表单, 文件, InputStream
这四个子类只是定义了一些初始化以及一些必要的方法, 在ResponseSerialization分别对他们作了扩展, 用来解析处理数据使用数组
DataRequest子类是最基本的一个子类, 请求到的数据都使用Data对象写在内存里, 适合普通请求, 小图片请求, 小文件请求缓存
public class DataRequest: Request {
/// 用来建立URLRequest对象的协议对象
public let convertible: URLRequestConvertible
/// 开放给外部的计算属性
public var data: Data? { mutableData }
/// 私有的线程安全的Data对象, 保存请求的数据
@Protected
private var mutableData: Data? = nil
/// 初始化方法比父类多了1个参数: 用来初始化URLRequest对象的URLRequestConvertible
init(id: UUID = UUID(), convertible: URLRequestConvertible, underlyingQueue: DispatchQueue, serializationQueue: DispatchQueue, eventMonitor: EventMonitor?, interceptor: RequestInterceptor?, delegate: RequestDelegate) {
self.convertible = convertible
super.init(id: id,
underlyingQueue: underlyingQueue,
serializationQueue: serializationQueue,
eventMonitor: eventMonitor,
interceptor: interceptor,
delegate: delegate)
}
/// 重置请求时要把已下载的data清空
override func reset() {
super.reset()
mutableData = nil
}
/// Called when `Data` is received by this instance.
///
/// - Note: Also calls `updateDownloadProgress`.
///
/// - Parameter data: The `Data` received.
/// 当task收到data时, 由SessionDelegate调用, 保存数据
func didReceive(data: Data) {
if self.data == nil {
//初始
mutableData = data
} else {
//追加
$mutableData.write { $0?.append(data) }
}
//更新下载进度
updateDownloadProgress()
}
/// 必须实现的父类方法, 返回DataRequest对应的Task
override func task(for request: URLRequest, using session: URLSession) -> URLSessionTask {
// 由于URLRequest在swift里是struct, 因此先用赋值来复制一下
let copiedRequest = request
return session.dataTask(with: copiedRequest)
}
/// 更新下载进度(已下载字节/待下载字节)
func updateDownloadProgress() {
let totalBytesReceived = Int64(data?.count ?? 0)
let totalBytesExpected = task?.response?.expectedContentLength ?? NSURLSessionTransferSizeUnknown
downloadProgress.totalUnitCount = totalBytesExpected
downloadProgress.completedUnitCount = totalBytesReceived
// 在进度回调队列调用进度回调
downloadProgressHandler?.queue.async { self.downloadProgressHandler?.handler(self.downloadProgress) }
}
/// Validates the request, using the specified closure.
///
/// - Note: If validation fails, subsequent calls to response handlers will have an associated error.
///
/// - Parameter validation: `Validation` closure used to validate the response.
///
/// - Returns: The instance.
/// 添加有效性判断回调, 用来判断当前URLRequest,URLSessionTask,下载的Data是否有效
@discardableResult
public func validate(_ validation: @escaping Validation) -> Self {
//建立无参处理闭包
let validator: () -> Void = { [unowned self] in
//不能有错误, 必须有响应
guard self.error == nil, let response = self.response else { return }
//调用判断回调
let result = validation(self.request, response, self.data)
//有错误就设置给error
if case let .failure(error) = result {
self.error = error.asAFError(or: .responseValidationFailed(reason: .customValidationFailed(error: error)))
}
//通知监听器判断有效性完成
self.eventMonitor?.request(self,
didValidateRequest: self.request,
response: response,
data: self.data,
withResult: result)
}
//把闭包加入到数组
$validators.write { $0.append(validator) }
return self
}
}
复制代码
DataStreamRequest相似于DataRequest, 不过没有使用一个Data保存所有数据, 而是将受到的数据封装成DataStreamRequest.Stream类型(包括Data,完成,错误),并持有N个用来处理数据流的闭包,在受到请求数据时,对data封装而后使用闭包交给调用方处理。安全
/// `Request` subclass which streams HTTP response `Data` through a `Handler` closure.
public final class DataStreamRequest: Request {
/// 定义调用方用来处理Stream的回调, 该回调能够抛出错误, 能够控制抛出错误时是否取消请求
public typealias Handler<Success, Failure: Error> = (Stream<Success, Failure>) throws -> Void
/// 封装了数据流对象+取消token,用来给上层处理数据使用
public struct Stream<Success, Failure: Error> {
/// 最新的流事件(数据或者错误)
public let event: Event<Success, Failure>
/// 用来取消数据流的token, 其实只是封装了DataStreamRequest对象, 有一个cancel方法
public let token: CancellationToken
/// 取消数据流(取消请求)
public func cancel() {
token.cancel()
}
}
/// 封装了数据流, 包括: 数据, 错误, 完成三种
public enum Event<Success, Failure: Error> {
/// 数据或者错误
case stream(Result<Success, Failure>)
/// 完成信息(完成状态)(多是请求完成, 请求取消, 请求出错)
case complete(Completion)
}
/// 当数据流完成时, 携带在Event里的数据
public struct Completion {
/// 最后发出的请求
public let request: URLRequest?
/// 最后收到的响应
public let response: HTTPURLResponse?
/// 最后收到的请求指标
public let metrics: URLSessionTaskMetrics?
/// 数据流出错的错误
public let error: AFError?
}
/// 用来取消数据流时, 取消请求的token
public struct CancellationToken {
weak var request: DataStreamRequest?
init(_ request: DataStreamRequest) {
self.request = request
}
/// 取消数据流时直接取消请求
public func cancel() {
request?.cancel()
}
}
/// 用来建立
public let convertible: URLRequestConvertible
/// 若是数据流解析出错是否直接取消请求, 默认为false
public let automaticallyCancelOnStreamError: Bool
/// 包装须要线程安全操做的几个数据
struct StreamMutableState {
/// 当把DataStreamRequest做为InputStream时, 会建立该对象并把输出流转绑定给InputStream
var outputStream: OutputStream?
/// 内部用来处理Data的数据流回调数组, 主要工做为: 封装Stream对象交给上层回调处理, 而后记录正在处理的数据流个数, 所有处理完以后须要执行完成回调
var streams: [(_ data: Data) -> Void] = []
/// 当前正在执行的流的个数
var numberOfExecutingStreams = 0
/// 当数据流还没完成时暂存完成回调的数组
var enqueuedCompletionEvents: [() -> Void] = []
}
@Protected
var streamMutableState = StreamMutableState()
/// 比父类多了两个参数: 1.URLRequestConvertible, 2. 是否在Stream处理失败时取消请求
init(id: UUID = UUID(), convertible: URLRequestConvertible, automaticallyCancelOnStreamError: Bool, underlyingQueue: DispatchQueue, serializationQueue: DispatchQueue, eventMonitor: EventMonitor?, interceptor: RequestInterceptor?, delegate: RequestDelegate) {
self.convertible = convertible
self.automaticallyCancelOnStreamError = automaticallyCancelOnStreamError
super.init(id: id,
underlyingQueue: underlyingQueue,
serializationQueue: serializationQueue,
eventMonitor: eventMonitor,
interceptor: interceptor,
delegate: delegate)
}
/// task也是URLSessionDataTask
override func task(for request: URLRequest, using session: URLSession) -> URLSessionTask {
let copiedRequest = request
return session.dataTask(with: copiedRequest)
}
/// 完成时要把OutputStream关闭
override func finish(error: AFError? = nil) {
$streamMutableState.write { state in
state.outputStream?.close()
}
super.finish(error: error)
}
/// 收到Data的处理
func didReceive(data: Data) {
$streamMutableState.write { state in
if let stream = state.outputStream {
// 若是有做为InputStream, 就会建立对应的OutputStream, 输出数据
underlyingQueue.async {
var bytes = Array(data)
stream.write(&bytes, maxLength: bytes.count)
}
}
// 当前正在处理的回调个数+新的要处理的回调个数
state.numberOfExecutingStreams += state.streams.count
// 复制一份
let localState = state
// 处理数据
underlyingQueue.async { localState.streams.forEach { $0(data) } }
}
}
/// 添加有效性判断回调
@discardableResult
public func validate(_ validation: @escaping Validation) -> Self {
let validator: () -> Void = { [unowned self] in
guard self.error == nil, let response = self.response else { return }
let result = validation(self.request, response)
if case let .failure(error) = result {
self.error = error.asAFError(or: .responseValidationFailed(reason: .customValidationFailed(error: error)))
}
self.eventMonitor?.request(self,
didValidateRequest: self.request,
response: response,
withResult: result)
}
$validators.write { $0.append(validator) }
return self
}
/// Produces an `InputStream` that receives the `Data` received by the instance.
///
/// - Note: The `InputStream` produced by this method must have `open()` called before being able to read `Data`.
/// Additionally, this method will automatically call `resume()` on the instance, regardless of whether or
/// not the creating session has `startRequestsImmediately` set to `true`.
///
/// - Parameter bufferSize: Size, in bytes, of the buffer between the `OutputStream` and `InputStream`.
///
/// - Returns: The `InputStream` bound to the internal `OutboundStream`.
/// 变换成InputStream供外部操做, 调用该方法后会直接发送请求, 返回给上层的InputStream在读取数据以前必须先open(), 结束时候必须close()
public func asInputStream(bufferSize: Int = 1024) -> InputStream? {
// 建立完IOStream以后就会马上发送请求, 无视是否当即发送请求的控制参数
defer { resume() }
var inputStream: InputStream?
$streamMutableState.write { state in
// 建立一对IOStream, OutputStream往InputStream写, buffer默认1k
Foundation.Stream.getBoundStreams(withBufferSize: bufferSize,
inputStream: &inputStream,
outputStream: &state.outputStream)
// 打开OutputStream, 准备读取数据
state.outputStream?.open()
}
return inputStream
}
/// 执行一个能够抛出错误的回调, 捕捉异常, 取消请求并抛出错误, (在ResponseSerialization中调用)
func capturingError(from closure: () throws -> Void) {
do {
try closure()
} catch {
self.error = error.asAFError(or: .responseSerializationFailed(reason: .customSerializationFailed(error: error)))
cancel()
}
}
/// 添加数据流完成回调与队列
func appendStreamCompletion<Success, Failure>(on queue: DispatchQueue, stream: @escaping Handler<Success, Failure>) {
// 先添加一个解析回调
appendResponseSerializer {
// 回调内容为在内部队列执行添加一个解析完成的回调
self.underlyingQueue.async {
self.responseSerializerDidComplete {
// 解析完成的回调内容为操做$streamMutableState
self.$streamMutableState.write { state in
guard state.numberOfExecutingStreams == 0 else {
//若是还有流在处理数据, 就把完成回调追加到数组里
state.enqueuedCompletionEvents.append {
self.enqueueCompletion(on: queue, stream: stream)
}
return
}
// 不然直接执行回调
self.enqueueCompletion(on: queue, stream: stream)
}
}
}
}
//这样操做能够保证完成回调在其余全部的解析器处理完成后添加
}
//发送完成事件
func enqueueCompletion<Success, Failure>(on queue: DispatchQueue, stream: @escaping Handler<Success, Failure>) {
queue.async {
do {
//建立完成事件
let completion = Completion(request: self.request,
response: self.response,
metrics: self.metrics,
error: self.error)
//走你
try stream(.init(event: .complete(completion), token: .init(self)))
} catch {
// 忽略错误, 完成错误没法被处理, 数据已经完整了
}
}
}
}
复制代码
而后扩展了DataStreamRequest.Stream结构体来快速取得内部数据:markdown
/// 原理都同样
extension DataStreamRequest.Stream {
public var result: Result<Success, Failure>? {
guard case let .stream(result) = event else { return nil }
return result
}
public var value: Success? {
guard case let .success(value) = result else { return nil }
return value
}
public var error: Failure? {
guard case let .failure(error) = result else { return nil }
return error
}
public var completion: DataStreamRequest.Completion? {
guard case let .complete(completion) = event else { return nil }
return completion
}
}
复制代码
DownloadRequest用来处理下载任务, 将文件以文件形式保存在磁盘上时使用这个。session
处理本地文件保存策略, 以及本地文件保存路径闭包
/// 先定义了一个符合OptionSet协议的结构体来决定本地文件的保存策略:
public struct Options: OptionSet {
/// 是否建立中间目录
public static let createIntermediateDirectories = Options(rawValue: 1 << 0)
/// 下载文件前是否先移除旧文件
public static let removePreviousFile = Options(rawValue: 1 << 1)
public let rawValue: Int
public init(rawValue: Int) {
self.rawValue = rawValue
}
}
// MARK: 下载目标路径
/// 下载任务会先把文件下载到临时的缓存路径, 而后将文件拷贝到下载目标路径, 使用闭包来决定能够把下载路径的决定推迟到下载完成时, 能够根据临时文件目录与下载响应头来决定下载目标路径以及文件保存策略
/// 注意: 若是是下载本地文件(url为file://开头的), 回调不会有response
public typealias Destination = (_ temporaryURL: URL,
_ response: HTTPURLResponse) -> (destinationURL: URL, options: Options)
/// 建立默认建议的下载路径回调(document根目录)
public class func suggestedDownloadDestination(for directory: FileManager.SearchPathDirectory = .documentDirectory, in domain: FileManager.SearchPathDomainMask = .userDomainMask, options: Options = []) -> Destination {
{ temporaryURL, response in
let directoryURLs = FileManager.default.urls(for: directory, in: domain)
let url = directoryURLs.first?.appendingPathComponent(response.suggestedFilename!) ?? temporaryURL
return (url, options)
}
}
/// 默认的下载路径回调(处理见下面方法)
static let defaultDestination: Destination = { url, _ in
(defaultDestinationURL(url), [])
}
/// 返回默认的文件储存路径(只是把默认的文件名重命名为加上Alamofire_前缀, 文件仍是在缓存目录, 会被系统删除, 若是要保存文件, 须要转移到其余目录)
static let defaultDestinationURL: (URL) -> URL = { url in
let filename = "Alamofire_\(url.lastPathComponent)"
let destination = url.deletingLastPathComponent().appendingPathComponent(filename)
return destination
}
复制代码
/// 定义了下载请求的下载源
public enum Downloadable {
/// 从URLRequest下载
case request(URLRequestConvertible)
/// 断点续传
case resumeData(Data)
}
复制代码
/// 私有包裹一下
private struct DownloadRequestMutableState {
/// 支持断点续传的任务被取消时, 须要被处理的已下载数据
var resumeData: Data?
/// 下载完成后的文件保存路径
var fileURL: URL?
}
/// 私有线程安全的属性
@Protected
private var mutableDownloadState = DownloadRequestMutableState()
/// 开放给外部获取
public var resumeData: Data? { mutableDownloadState.resumeData }
public var fileURL: URL? { mutableDownloadState.fileURL }
复制代码
/// 下载源
public let downloadable: Downloadable
/// 下载路径回调
let destination: Destination
/// 初始化
init(id: UUID = UUID(), downloadable: Downloadable, underlyingQueue: DispatchQueue, serializationQueue: DispatchQueue, eventMonitor: EventMonitor?, interceptor: RequestInterceptor?, delegate: RequestDelegate, destination: @escaping Destination) {
self.downloadable = downloadable
self.destination = destination
super.init(id: id,
underlyingQueue: underlyingQueue,
serializationQueue: serializationQueue,
eventMonitor: eventMonitor,
interceptor: interceptor,
delegate: delegate)
}
//重试时清除数据
override func reset() {
super.reset()
$mutableDownloadState.write {
$0.resumeData = nil
$0.fileURL = nil
}
}
复制代码
/// URLSession下载完成/失败时, 回调过来更新状态
func didFinishDownloading(using task: URLSessionTask, with result: Result<URL, AFError>) {
eventMonitor?.request(self, didFinishDownloadingUsing: task, with: result)
switch result {
case let .success(url): mutableDownloadState.fileURL = url
case let .failure(error): self.error = error
}
}
/// URLSession下载时, 回调更新进度
func updateDownloadProgress(bytesWritten: Int64, totalBytesExpectedToWrite: Int64) {
downloadProgress.totalUnitCount = totalBytesExpectedToWrite
downloadProgress.completedUnitCount += bytesWritten
downloadProgressHandler?.queue.async { self.downloadProgressHandler?.handler(self.downloadProgress) }
}
///建立URLSessionTask
/// 新文件下载
override func task(for request: URLRequest, using session: URLSession) -> URLSessionTask {
session.downloadTask(with: request)
}
/// 断点续传
public func task(forResumeData data: Data, using session: URLSession) -> URLSessionTask {
session.downloadTask(withResumeData: data)
}
/// 1. 直接取消请求, 不设置resumeData属性给监听器, 没有回调处理已下载数据
@discardableResult
override public func cancel() -> Self {
cancel(producingResumeData: false)
}
/// 2. 取消下载, 并判断是否须要填充resumeData属性给监听器, 没有回调处理已下载数据
@discardableResult
public func cancel(producingResumeData shouldProduceResumeData: Bool) -> Self {
cancel(optionallyProducingResumeData: shouldProduceResumeData ? { _ in } : nil)
}
/// 3. 取消下载, 用一个回调来处理已下载数据, 会先填充resumeData属性, 通知监听器后调用回调
@discardableResult
public func cancel(byProducingResumeData completionHandler: @escaping (_ data: Data?) -> Void) -> Self {
cancel(optionallyProducingResumeData: completionHandler)
}
/// 上面1, 2, 3的整合
private func cancel(optionallyProducingResumeData completionHandler: ((_ resumeData: Data?) -> Void)?) -> Self {
// 线程安全
$mutableState.write { mutableState in
// 先判断可否取消
guard mutableState.state.canTransitionTo(.cancelled) else { return }
// 更新状态
mutableState.state = .cancelled
// 先告知本身取消被调用了, 父类会告知监听器
underlyingQueue.async { self.didCancel() }
guard let task = mutableState.tasks.last as? URLSessionDownloadTask, task.state != .completed else {
// 若是下载完成的话, 走finish逻辑
underlyingQueue.async { self.finish() }
return
}
if let completionHandler = completionHandler {
// 取消时有处理已下载数据的回调
// 先恢复一下确保请求指标被获取到了
task.resume()
task.cancel { resumeData in
// 填入resumeData属性
self.mutableDownloadState.resumeData = resumeData
// 告知本身取消成功, 父类会告知监听器
self.underlyingQueue.async { self.didCancelTask(task) }
// 执行已下载数据处理回调
completionHandler(resumeData)
}
} else {
// 没有处理下载数据的回调
task.resume()
//直接取消
task.cancel(byProducingResumeData: { _ in })
//告知
self.underlyingQueue.async { self.didCancelTask(task) }
}
}
return self
}
/// 响应的有效性判断
@discardableResult
public func validate(_ validation: @escaping Validation) -> Self {
let validator: () -> Void = { [unowned self] in
guard self.error == nil, let response = self.response else { return }
let result = validation(self.request, response, self.fileURL)
if case let .failure(error) = result {
self.error = error.asAFError(or: .responseValidationFailed(reason: .customValidationFailed(error: error)))
}
self.eventMonitor?.request(self,
didValidateRequest: self.request,
response: response,
fileURL: self.fileURL,
withResult: result)
}
$validators.write { $0.append(validator) }
return self
}
复制代码
用来建立上传请求, 能够从内存(Data), 磁盘(file), 流(InputStream)上传数据, 注意上传请求是DataRequest的子类app
public enum Uploadable {
/// 从内存
case data(Data)
/// 从文件上传, 并能够设置上传完成后是否删除源文件
case file(URL, shouldRemove: Bool)
/// 从流上传
case stream(InputStream)
}
复制代码
/// 用来建立上传源的协议
public let upload: UploadableConvertible
/// 文件管理器, 用来清除任务, 包括写在硬盘上的多表单上传数据
public let fileManager: FileManager
/// 上传源, 开始请求时才建立. 可选是由于, 从上传源协议对象建立的时候, 是能够失败的
public var uploadable: Uploadable?
/// 初始化
init(id: UUID = UUID(), convertible: UploadConvertible, underlyingQueue: DispatchQueue, serializationQueue: DispatchQueue, eventMonitor: EventMonitor?, interceptor: RequestInterceptor?, fileManager: FileManager, delegate: RequestDelegate) {
upload = convertible
self.fileManager = fileManager
super.init(id: id,
convertible: convertible,
underlyingQueue: underlyingQueue,
serializationQueue: serializationQueue,
eventMonitor: eventMonitor,
interceptor: interceptor,
delegate: delegate)
}
/// 告知监听器 建立上传源成功(Session中调用)
func didCreateUploadable(_ uploadable: Uploadable) {
self.uploadable = uploadable
eventMonitor?.request(self, didCreateUploadable: uploadable)
}
/// 告知监听器 建立上传源失败(Session中调用), 而后走重试或者完成逻辑
func didFailToCreateUploadable(with error: AFError) {
self.error = error
eventMonitor?.request(self, didFailToCreateUploadableWithError: error)
retryOrFinish(error: error)
}
/// 用URLRequest跟URLSession建立URLSessionUploadTask(Session中调用)
override func task(for request: URLRequest, using session: URLSession) -> URLSessionTask {
guard let uploadable = uploadable else {
fatalError("Attempting to create a URLSessionUploadTask when Uploadable value doesn't exist.")
}
switch uploadable {
case let .data(data): return session.uploadTask(with: request, from: data)
case let .file(url, _): return session.uploadTask(with: request, fromFile: url)
case .stream: return session.uploadTask(withStreamedRequest: request)
}
}
/// 重试时清空数据, 重试时, 上传源必须从新建立
override func reset() {
// Uploadable must be recreated on every retry.
uploadable = nil
super.reset()
}
/// 转换为InputStream供外部链接, 必须使用Uploadable.stream, 不然会抛出异常
func inputStream() -> InputStream {
guard let uploadable = uploadable else {
fatalError("Attempting to access the input stream but the uploadable doesn't exist.")
}
guard case let .stream(stream) = uploadable else {
fatalError("Attempted to access the stream of an UploadRequest that wasn't created with one.")
}
eventMonitor?.request(self, didProvideInputStream: stream)
return stream
}
/// 请求完成时, 清理任务(删除源文件)
override public func cleanup() {
defer { super.cleanup() }
guard
let uploadable = self.uploadable,
case let .file(url, shouldRemove) = uploadable,
shouldRemove
else { return }
try? fileManager.removeItem(at: url)
}
复制代码
/// 定义UploadableConvertible协议用来建立上传源的协议
public protocol UploadableConvertible {
func createUploadable() throws -> UploadRequest.Uploadable
}
/// 扩展了**UploadRequest.Uploadable**实现UploadableConvertible协议, 直接返回本身(外部就不用本身写生成上传源的类, 能够直接用
extension UploadRequest.Uploadable: UploadableConvertible {
public func createUploadable() throws -> UploadRequest.Uploadable {
self
}
}
/// 组合这俩协议, 初始化用(本身初始化要用UploadableConvertible, 父类初始化要用URLRequestConvertible)
public protocol UploadConvertible: UploadableConvertible & URLRequestConvertible {}
复制代码
Alamofire的使用很是简单:less
AF.request("请求地址").responseJSON(completionHandler: {
resp in
//处理响应
})
复制代码
可是其内部处理很是复杂, 从开始处理请求地址,到建立URLRequest,再到建立URLSessionTask,每一步都有不少很细的处理。其中Session负责调度Request, Request跟Response是请求的核心。配合上各类有效性判断,认证、缓存、重试、错误抛出等,光看EventMonitor接口,足足有43个回调方法,贯穿了请求初始到响应解析完成中的全部状态处理,在看完Response对响应的解析后,我会尝试画一下时序图看能不能理清逻辑,学一下代码的设计与封装逻辑,对本身撸代码时的思惟有很大的提高。
以上所有内容均为我的理解,若有错误,欢迎评论指出,将第一时间修改~~很是感谢~~