SwiftNIO EventLoopFuture 和 EventLoopPromise

在上篇 Future & Promise 实现分析(Swift) 咱们简单的阐述了下 future 和 Promise 的一些概念,并简单的用了个样例来体会这种处理异步的模式。本文接着上文遗留的一个话题 SwiftNIO 中如何实现 Future 和 Promise?git

Future 是一个只读的值的容器,它的值在将来某个时刻会被计算出来(产生这个值的行为是异步操做)。 Promise 是一个可写的容器,能够设置 Future 的值。github

带着上面这句话,咱们来看看 SwiftNIO 中如何实现的。swift

概述

并发代码和同步代码之间最主要的区别在于并不是全部的动做都可以当即完成。例如,在向一个 Channel 写入数据时,EventLoop 有可能不会当即将数据冲刷到网络上。为此,SwiftNIO 提供了 EventLoopPromiseEventLoopFuture,用于管理异步操做。api

EventLoopFuture 其实是一个容器,用于存放函数在将来某个时刻的返回值。每一个 EventLoopFuture<T> 对象都有一个对应的 EventLoopPromise<T>,用于存放实际的结果。只要 EventLoopPromise 执行成功,EventLoopFuture 也就完成了。数组

经过轮询的方式检查 EventLoopFuture 是否完成是一种很是低效的方式,因此 EventLoopFuture 被设计成能够接收回调函数。也就是说,在有结果的时候回调函数会被执行。promise

EventLoopFuture 负责处理调度工做,确保回调函数是在最初建立 EventLoopPromise 的那个 EventLoop 上执行,因此就没有必要再针对回调函数作任何同步操做。安全

EventLoopFuture

咱们看下面例子来感觉一下 EventLoopFuture & EventLoopPromise 的使用方式:微信

func someAsyncOperation(args) -> EventLoopFuture<ResultType> {
    let promise = eventLoop.makePromise(of: ResultType.self)
    someAsyncOperationWithACallback(args) { result -> Void in
        // when finished...
        promise.succeed(result)
        // if error...
        promise.fail(error)
    }
    return promise.futureResult
}
复制代码

someAsyncOperation 里面首先是建立一个 promise, 而后在 someAsyncOperationWithACallback 异步操做的会回调中根据各类场景执行 promise 的 successed 或者 fail 方法。someAsyncOperation 返回一个 EventLoopFuture<ResultType>, 而这个值从 promise.futureResult 中获取。EventLoopFuture 一个只读的值的容器,它的值在将来某个时刻会被计算出来。而它的值经过 EvnetLoopPromise 进行设置。 彻底符合咱们本文开篇提到的那句话。网络

若是咱们在调用一个方法,它返回 EventLoopFuture<Value>, 咱们该如何操做这个值?并发

map & flatMap

能够经过调用 flatMap() 或者 map() 能够获取到 EventLoopFuture 中真实的值。

let networkData = getNetworkData(args)

// When network data is received, convert it.
let processedResult: EventLoopFuture<Processed> = networkData.map { (n: NetworkResponse) -> Processed in
    ... parse network data ....
    return processedResult
}
复制代码

map 的 callback 是将 Value 类型的结果转换为 NewValue 类型。它是个 Functor

可是在 flatMap 中 flatMap

let d1 = networkRequest(args).future()
let d2 = d1.flatMap { t -> EventLoopFuture<NewValue> in
    . . . something with t . . .
    return netWorkRequest(args)
}
d2.whenSuccess { u in
    NSLog("Result of second request: \(u)")
}
复制代码

flatMap 的 callback 是将 Value 类型的结果转化为 EventLoopFuture<NewValue>, 它是一个 Monads

为了更好的理解二者的差别,咱们一块儿看看他们的实现代码:

swiftnio-map

swiftnio-flatmap2

源码浅析

public final class EventLoopFuture<Value> {
    @usableFromInline
    internal var _value: Result<Value, Error>? {
        didSet {
            self._isFulfilled.store(true)
        }
    }

    @usableFromInline
    internal let _isFulfilled: UnsafeEmbeddedAtomic<Bool>

    /// The `EventLoop` which is tied to the `EventLoopFuture` and is used to notify all registered callbacks.
    public let eventLoop: EventLoop

    /// Whether this `EventLoopFuture` has been fulfilled. This is a thread-safe
    /// computed-property.
    internal var isFulfilled: Bool {
        return self._isFulfilled.load()
    }

    @usableFromInline
    internal var _callbacks: CallbackList = CallbackList()
}
复制代码

EventLoopFuture 的 fulfilled 状态经过 _isFulfilled (线程安全的)进行管理。在初始化的时候也会进行初始化

@inlinable
internal init(_eventLoop eventLoop: EventLoop, value: Result<Value, Error>?, file: StaticString, line: UInt) {
    self.eventLoop = eventLoop
    self._value = value
    self._isFulfilled = UnsafeEmbeddedAtomic(value: value != nil)

    debugOnly {
        if let me = eventLoop as? SelectableEventLoop {
            me.promiseCreationStoreAdd(future: self, file: file, line: line)
        }
    }
}
复制代码

在给 _value 设置值的时候,会设置 _isFulfilledtrue

@usableFromInline
internal var _value: Result<Value, Error>? {
    didSet {
        self._isFulfilled.store(true)
    }
    
}
复制代码

EventLoopPromise

EventLoopPromise 设计是比较简单的,咱们直接看它的源码:

public struct EventLoopPromise<Value> {
    public let futureResult: EventLoopFuture<Value>
    @inlinable
    internal init(eventLoop: EventLoop, file: StaticString, line: UInt) {
        self.futureResult = EventLoopFuture<Value>(_eventLoop: eventLoop, file: file, line: line)
    }

    @inlinable
    public func succeed(_ value: Value) {
        self._resolve(value: .success(value))
    }

    @inlinable
    public func fail(_ error: Error) {
        self._resolve(value: .failure(error))
    }
    
    @inlinable
    public func completeWith(_ future: EventLoopFuture<Value>) {
        future.cascade(to: self)
    }

    @inlinable
    public func completeWith(_ result: Result<Value, Error>) {
        self._resolve(value: result)
    }

    @inlinable
    internal func _resolve(value: Result<Value, Error>) {
        if self.futureResult.eventLoop.inEventLoop {
            self._setValue(value: value)._run()
        } else {
            self.futureResult.eventLoop.execute {
                self._setValue(value: value)._run()
            }
        }
    }

    @inlinable
    internal func _setValue(value: Result<Value, Error>) -> CallbackList {
        return self.futureResult._setValue(value: value)
    }
}
复制代码

经过 Eventloop 初始化其内部的 futureResult(EventLoopFuture), 这个时候的 futureResultfilfulledfalse 的。 而后提供了一些操做 futureResult 的方法,而这些方法都间接调用了 _resolve —— 确保回调都在建立 promise 这个 EventLoop 上进行执行。

EventLoop 是 SwfitNIO 最基本的 IO 原语,它等待事件的发生,在发生事件时触发某种回调操做。

在下起文章咱们来解读下这个 EventLoop,本文就不进行过多阐述。

Vapor 的 Future & Promise 真面目

Vapor 的官方文档中,有个 Async 的模块, 它有 Future 和 Promise。

咱们可使用它提供操做 Future 的方法:

  • map
  • flatMap
  • transform
  • always
  • wait
  • do / catch

对 Future 进行转换, 或者阻塞等待 Future 的值的肯定。

使用 Promise:

/// 建立一个 promise 
let promiseString = req.eventLoop.newPromise(String.self)
print(promiseString) // Promise<String>
print(promiseString.futureResult) // Future<String>

/// Completes the associated future
promiseString.succeed(result: "Hello")

/// Fails the associated future
promiseString.fail(error: ...)
复制代码

Future 和 Promise 在 Vapor 中的详细用法请到 Async

上面的用法是否是很是的熟悉?咱们一块儿看看它们的如山真面目。

Async+NIO.swift 中:

import Dispatch
import NIO

/// Convenience shorthand for `EventLoopFuture`.
public typealias Future = EventLoopFuture

/// Convenience shorthand for `EventLoopPromise`.
public typealias Promise = EventLoopPromise

extension EventLoop {
    /// Creates a new promise for the specified type.
    public func newPromise<T>(_ type: T.Type, file: StaticString = #file, line: UInt = #line) -> Promise<T> {
        return newPromise(file: file, line: line)
    }
}
复制代码

Vapor 中的 Future 就是 SwiftNIO 的 EventLoopFuturePromise 就是 SwiftNIO 中的 EventLoopPromise

map vs flatMap

map 和 flatMap 在 Vapor 实践中是经常使用到的方法,二者很容易理解的一个区别:

  • map 中的 callback 是 Value -> NewValue
  • flatMap 中的 callback 是 Value -> EventLoopFuture<NewValue>
let futureString: Future<String> = ...

/// Assume we have created an HTTP client
let client: Client = ... 

/// Flat-map the future string to a future response
let futureResponse = futureString.flatMap(to: Response.self) { string in
    return client.get(string) // Future<Response>
}
复制代码

futureResponseFuture<Response> 类型, 若是 flatMap 替换为 map:

let futureResponse = futureString.map(to: Response.self) { string in
    return client.get(string) // Future<Response>
}
复制代码

那么这时候 futureResponseFuture<Future<Response>> 类型,显然这不是咱们想要的。

提早预告 Vapor 中 Worker, 它是 SwiftNIO 中的 EventLoopGrop

public typealias Worker = EventLoopGroup
复制代码

想了解更多 Vapor 对 SwiftNIO 的包装,能够点击查看源码 vapor/core

API 学习

既然 Vapor 中 Future 和 Promise 是同同样东西,那么学习 SwiftNIO 中 EventLoopFuture 和 EventLoopPromise 提供的 API 是很是有必要的

EventLoopFuture API

flatMap(file:line:_:)

@inlinable
public func flatMap<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) -> EventLoopFuture<NewValue>) -> EventLoopFuture<NewValue>
复制代码

能够将 EventLoopFuture<Value> 的转换为 EventLoopFuture<NewValue> ,callback 参数须要返回 EventLoopFuture<NewValue>

flatMapThrowing(file:line:_:)

@inlinable
public func flatMapThrowing<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) throws -> NewValue) -> EventLoopFuture<NewValue>
复制代码

EventLoopFuture<Value> fulfilled 状态的时候,会运行 callback。callback 能够抛出异常,当 callback 抛出异常,该方法返回的 EventLoopFuture 的值为 error。

flatMapErrorThrowing(file:line:_:)

@inlinable
public func flatMapErrorThrowing(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) throws -> Value) -> EventLoopFuture<Value>
复制代码

EventLoopFuture<Value> 是个 error 状态,callback 会运行,callback 接受一个 error, 返回一个新的 Value 类型的值。

map(file:line:_:)

@inlinable
public func map<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) -> (NewValue)) -> EventLoopFuture<NewValue>
复制代码

EventLoopFuture<Value> fulfilled

flatMapError(file:line:_:)

@inlinable
public func flatMapError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture<Value>) -> EventLoopFuture<Value>
复制代码

flatMapResult(file:line:_:)

@inlinable
public func flatMapResult<NewValue, SomeError: Error>(file: StaticString = #file, line: UInt = #line, _ body: @escaping (Value) -> Result<NewValue, SomeError>) -> EventLoopFuture<NewValue>
复制代码

recover(file:line:_:)

@inlinable
public func recover(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> Value) -> EventLoopFuture<Value>
复制代码

whenSuccess(_:)

@inlinable
public func whenSuccess(_ callback: @escaping (Value) -> Void)
复制代码

whenFailure(_:)

@inlinable
public func whenFailure(_ callback: @escaping (Error) -> Void)
复制代码

whenComplete(_:)

@inlinable
public func whenComplete(_ callback: @escaping (Result<Value, Error>) -> Void)
复制代码

and(_:file:line:)

@inlinable
public func and<OtherValue>(_ other: EventLoopFuture<OtherValue>, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)>
复制代码

and(value:file:line:)

@inlinable
public func and<OtherValue>(value: OtherValue, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)>
复制代码

cascade(to:)

@inlinable
public func cascade(to promise: EventLoopPromise<Value>?)
复制代码

cascadeSuccess(to:)

@inlinable
public func cascadeSuccess(to promise: EventLoopPromise<Value>?)
复制代码

cascadeFailure(to:)

@inlinable
public func cascadeFailure<NewValue>(to promise: EventLoopPromise<NewValue>?)
复制代码

wait(file:line:)

public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value
复制代码

经过阻塞当前线程直到 resolve。

fold(_:with:)

@inlinable
public func fold<OtherValue>(_ futures: [EventLoopFuture<OtherValue>], with combiningFunction: @escaping (Value, OtherValue) -> EventLoopFuture<Value>) -> EventLoopFuture<Value>
复制代码

所有的 futures 完成触发返回一个新的 EventLoopFuture。 一旦遇到 future 失败将立马失败。

reduce(::on:_:)

public static func reduce<InputValue>(_ initialResult: Value, _ futures: [EventLoopFuture<InputValue>], on eventLoop: EventLoop, _ nextPartialResult: @escaping (Value, InputValue) -> Value) -> EventLoopFuture<Value>
复制代码

reduce(into::on::)

public static func reduce<InputValue>(into initialResult: Value, _ futures: [EventLoopFuture<InputValue>], on eventLoop: EventLoop, _ updateAccumulatingResult: @escaping (inout Value, InputValue) -> Void) -> EventLoopFuture<Value>
复制代码

全部的 EventLoopFutures 成功后,返回一个新的 EventLoopFuture。新的 EveentLoopFuture 的值是每一个Future 的值组成的数组。

andAllSucceed(_:on:)

public static func andAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void>
复制代码

全部的 EventLoopFutures 成功后,返回一个新的 EventLoopFuture。忽略每一个 Future 的值。

whenAllSucceed(_:on:)

public static func whenAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Value]>
复制代码

全部的 EventLoopFutures 完成后,返回一个新的 EventLoopFuture。

这个返回的 EventLoopFuture 老是成功的,无论这些 futures 是否成功仍是失败。忽略每一个 Future 的值。

andAllComplete(_:on:)

public static func andAllComplete(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void>
复制代码

在全部的 EventLoopFutures 完成后,返回一个新的 EventLoopFuture, 这个 EventLoopFuture 的值是这些 futures 的结果数组。

这个返回的 EventLoopFuture 老是成功的,无论这些 futures 是否成功仍是失败。

若是但愿将它们展平单个 EventLoopFuture,且遇到第一个 future 失败也将失败,建议使用 reduce 方法。

whenAllComplete(_:on:)

public static func whenAllComplete(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Result<Value, Error>]>
复制代码

hop(to:)

@inlinable
public func hop(to target: EventLoop) -> EventLoopFuture<Value>
复制代码

当这个 future 完成时会触发返回一个 EventLoopFuture。 可是执行回调是在 target 这个 Eventloop 上而不是在原来的那个。

在某些场合你须要切换 Eventloop: 好比, 你须要在另外一个 eventloop 去关闭一个 channel。而后关闭完成时候须要切回到当前的 eventloop。它还有一个优化,避免两个 eventloop 是同一个 eventloop 从新分配的状况。

always(_:)

public func always(_ callback: @escaping (Result<Value, Error>) -> Void) -> EventLoopFuture<Value>
复制代码

给 EventLoopFuture 添加一个监听的回调,该回调在 EventLoopFuture 有任何值的时候运行。

EventLoopPromise API

futureResult

public let futureResult: EventLoopFuture<Value>
复制代码

经过 EventLoopPromise 获取 EventLoopFuture。 你可使用它来添加回调,这些回调会在 EventLoopPromise 完成立马调用。

succeed(_:)

@inlinable
public func succeed(_ value: Value)
复制代码

将成功的结果传递给关联的 EventLoopFuture<Value>

fail(_:)

@inlinable
public func fail(_ error: Error)
复制代码

将失败的结果传递给关联的 EventLoopFuture<Value>

completeWith(_:)

@inlinable
public func completeWith(_ future: EventLoopFuture<Value>)
复制代码

经过传入 EventLoopFuture<Value> 完成 promise。

间接调用 EventLoopFuture 的 eascade 方法。

completeWith(_:)

@inlinable
public func completeWith(_ result: Result<Value, Error>)
复制代码

经过传入 Result<Value, Error> 完成 promise。

此方法至关于调用

switch result {
case .success(let value):
    promise.succeed(value)
case .failure(let error):
    promise.fail(error)
}
复制代码

总结

SwiftNIO 中的 Promise 和 Future 的前缀都是 EventLoop,在 Netty 的源码里没有这俩名字,它有不少 Future 类。这应该和 Netty 的设计不一样。EventLoopFuture 能够经过 flatMap 生成一个新的 EventLoopFuture(间接经过 EventLoopPromise), 也能够经过初始化方式进行生成。EventLoopFuture 对象都有一个对应的 EventLoopPromise, 只要 EventLoopPromise 执行成功,EventLoopFuture 也就完成,完成后会触发回调,而这些回调函数都在最初建立 EventLoopPromise 的那个 EventLoop 上执行。 Vapor 的 Future 和 Promise 仅仅是 EventLoopFuture 和 EventLoopPromise 的一个别称,理解了它俩,也基本上了解 Vapor 不少方法须要返回一个 Future 的这样的设计, 为啥要用 map & flatMap 实现链式调用。

本文到此完结,更多阅读,能够关注 SwiftOldBird 微信公众号:

相关文章
相关标签/搜索