ReactiveX 是一个库,用于经过使用可观察序列来编写异步的、基于事件的程序。html
它扩展了观察者模式以支持数据、事件序列,并添加了容许你以声明方式组合序列的操做符,同时抽象对低层线程、同步、线程安全等。react
本文主要做为 RxSwift 的入门文章,对 RxSwift 中的一些基础内容、经常使用实践,作些介绍。git
本文地址为:http://www.javashuo.com/article/p-puprblib-v.html,转载请注明出处。程序员
观察者模式(这里指Observable(Element> Sequence
)和正常序列(Sequence
)的等价性对于理解 Rx 是至关重要的。github
每一个 Observable
序列只是一个序列。Observable
与 Swift 的 Sequence
相比,其主要优势是能够异步接收元素。这是 RxSwift 的核心。正则表达式
Observable
(ObservableType
) 与 Sequence
等价Observable.subscribe
方法与 Sequence.makeIterator
方法等价Observable.subscribe
方法来接受序列元素,而不是在返回的 iterator 上调用 next()
方法Sequence 是一个简单、熟悉的概念,很容易可视化。swift
人是具备巨大视觉皮层的生物。当咱们能够轻松地想象一个概念时,理解它就容易多了。api
咱们能够经过尝试模拟每一个Rx操做符内的事件状态机到序列上的高级别操做来解除认知负担。数组
若是咱们不使用 Rx 而是使用模型异步系统(model asynchronous systems),这可能意味着咱们的代码会充满状态机和瞬态,这些正式咱们须要模拟的,而不是抽象。安全
List
和 Sequence
多是数学家和程序员首先学习的概念之一。
这是一个数字的序列:
--1--2--3--4--5--6--| // 正常结束
另外一个字符序列:
--a--b--a--a--a---d---X // terminates with error
一些序列是有限的,而一些序列是无限的,好比一个按钮点击的列:
---tap-tap-------tap--->
这些被叫作 marble diagram。能够在rxmarbles.com了解更多的 marble diagram。
若是咱们将序列愈发指定为正则表达式,它将以下所示:
next*(error | completed)?
这描述了如下内容:
error
或 completed
事件,这个 Sequence 就不能再产生其余元素在 Rx 中, Sequence 被描述为一个 push interface(也叫作 callbak)。
enum Event<Element> { case next(Element) // next element of a sequence case error(Swift.Error) // sequence failed with error case completed // sequence terminated successfully } class Observable<Element> { func subscribe(_ observer: Observer<Element>) -> Disposable } protocol ObserverType { func on(_ event: Event<Element>) }
当序列发送 error
或 completed
事件时,将释放计算序列元素的全部内部资源。
要当即取消序列元素的生成,并释放资源,能够在返回的订阅(subscription)上调用 dispose
。
若是一个序列在有限时间内结束,则不调用 dispose
或者不使用 disposed(by: disposeBag)
不会形成任何永久性资源泄漏。可是,这些资源会一直被使用,直到序列完成(完成产生元素,或者返回一个错误)。
若是一个序列没有自行终止,好比一系列的按钮点击,资源会被永久分配,直到 dispose
被手动调用(在 disposeBag 内调用,使用 takeUntil
操做符,或者其余方式)。
使用 dispose bag 或者 takeUtil
操做符是一个确保资源被清除的鲁棒(robust)的方式。即便序列将在有限时间内终止,咱们也推荐在生产环境中使用它们。
被观察的序列(observed sequence)有另外一种终止的方式。当咱们使用完一个序列而且想要释放分配用于计算即将到来的元素的全部资源时,咱们能够在一个订阅上调用 dispose
。
这时一个使用 interval
操做符的例子:
let scheduler = SerialDispatchQueueScheduler(qos: .default) let subscription = Observable<Int>.interval(0.3, scheduler: scheduler) .subscribe { event in print(event) } Thread.sleep(forTimeInterval: 2.0) subscription.dispose()
上边的例子打印:
0 1 2 3 4 5
注意,你一般不但愿调用 dispose
,这只是一个例子。手动调用 dispose
一般是一种糟糕的代码味道。dispose 订阅有更好的方式,好比 DisposeBag
、takeUntil
操做符、或者一些其余的机制。
那么,上边的代码是否能够在 dispose
被执行后,打印任何东西?答案是,是状况而定。
scheduler
是串行调度器(serial scheduler),好比 MainScheduler
,dispose
在相同的串行调度器上调用,那么答案就是 no。你仅仅有两个过程在并行执行。
“能够在以后打印某些内容吗?”这个问题,在这两个过程在不一样调度上执行的状况下甚至没有意义。
若是咱们的代码是这样的:
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler) .observeOn(MainScheduler.instance) .subscribe { event in print(event) } // .... subscription.dispose() // called from main thread
在 dispose
调用返回后,不会打印任何东西。
一样,在这个例子中:
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler) .observeOn(serialScheduler) .subscribe { event in print(event) } // ... subscription.dispose() // executing on same `serialScheduler`
在 dispose
调用返回后,也不会打印任何东西。
Dispose bags are used to return ARC like behavior to RX.
当一个 DisposeBag
被释放时,它会在每个可被 dispose 的对象(disposables)上调用 dispose
。
它没有 dispose
方法,所以不容许故意显式地调用 dispose
。若是须要当即清理,咱们能够建立一个新的 DisposeBag
。
self.disposeBag = DisposeBag()
这将清除旧的引用,并引发资源清理。
若是仍然须要手动清理,可使用 CompositeDisposable
。它具备所需的行为,但一旦调用了 dispose
方法,它将当即处理任何新添加可被dispose的对象(disposable)。
另外一种在 dealloc 时自动处理(dispose)订阅的方式是使用 takeUtil
操做符。
sequence .takeUntil(self.rx.deallocated) .subscribe { print($0) }
Observable
guarantees还有一些额外的保证,全部的序列产生者(sequence producers、Observable
s),必须遵照.
它们在哪个线程上产生元素可有可无,但若是它们生成一个元素并发送给观察者observer.on(.next(nextElement))
,那么在 observer.on
方法执行完成前,它们不能发送下一个元素。
若是 .next
事件尚未完成,那么生产者也不能发送终止 .completed
或 .error
。
简而言之,考虑如下示例:
someObservable .subscribe { (e: Event<Element>) in print("Event processing started") // processing print("Event processing ended") }
它始终打印:
Event processing started Event processing ended Event processing started Event processing ended Event processing started Event processing ended
它永远没法打印:
Event processing started Event processing started Event processing ended Event processing ended
Observable
(aka observable sequence)关于观察者有一个重要的事情须要理解。
建立 observable 时,它不会仅仅由于它已建立而执行任何工做。
确实,Observable
能够经过多种方式产生元素。其中一些会致使反作用,一些会影响现有的运行过程,例如点击鼠标事件等。
可是,若是只调用一个返回 Observable
的方法,那么没有序列生成,也没有反作用。Observable
仅仅定义序列的生成方法以及用于元素生成的参数。序列生成始于 subscribe
方法被调用。
例如,假设你有一个相似原型的方法:
func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me") // no requests are performed, no work is being done, no URL requests were fired let cancel = searchForMe // sequence generation starts now, URL requests are fired .subscribe(onNext: { results in print(results) })
有许多方法能够生成你本身的 Observable
序列,最简单方法或许是使用 create
函数。
RxSwift 提供了一个方法能够建立一个序列,这个序列订阅时返回一个元素。这个方法是 just
。咱们亲自实现一下:
func myJust<E>(_ element: E) -> Observable<E> { return Observable.create { observer in observer.on(.next(element)) observer.on(.completed) return Disposables.create() } } myJust(0) .subscribe(onNext: { n in print(n) })
这会打印:
0
不错,create
函数是什么?
它只是一个便利方法,使你可使用 Swift 的闭包轻松实现 subscribe
方法。像 subscribe
方法同样,它带有一个参数 observer
,并返回 disposable。
以这种方式实现的序列其实是同步的(synchronous)。它将生成元素,并在 subscribe
调用返回 disposable 表示订阅前终止。所以,它返回的 disposable 并不重要,生成元素的过程不会被中断。
当生成同步序列,一般用于返回的 disposable 是 NopDisposable
的单例。
如今,咱们来建立一个从数组中返回元素的 observable。
func myFrom<E>(_ sequence: [E]) -> Observable<E> { return Observable.create { observer in for element in sequence { observer.on(.next(element)) } observer.on(.completed) return Disposables.create() } } let stringCounter = myFrom(["first", "second"]) print("Started ----") // first time stringCounter .subscribe(onNext: { n in print(n) }) print("----") // again stringCounter .subscribe(onNext: { n in print(n) }) print("Ended ----")
上边的例子会打印:
Started ---- first second ---- first second Ended ----
Observable
that perfroms workOK,如今更有趣了。咱们来建立前边示例中使用的 interval
操做符。
这至关于 dispatch queue schedulers 的实际实现
func myInterval(_ interval: TimeInterval) -> Observable<Int> { return Observable.create { observer in print("Subscribed") let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.global()) timer.scheduleRepeating(deadline: DispatchTime.now() + interval, interval: interval) let cancel = Disposables.create { print("Disposed") timer.cancel() } var next = 0 timer.setEventHandler { if cancel.isDisposed { return } observer.on(.next(next)) next += 1 } timer.resume() return cancel } }
let counter = myInterval(0.1) print("Started ----") let subscription = counter .subscribe(onNext: { n in print(n) }) Thread.sleep(forTimeInterval: 0.5) subscription.dispose() print("Ended ----")
上边的示例会打印:
Started ---- Subscribed 0 1 2 3 4 Disposed Ended ----
若是这样写:
let counter = myInterval(0.1) print("Started ----") let subscription1 = counter .subscribe(onNext: { n in print("First \(n)") }) let subscription2 = counter .subscribe(onNext: { n in print("Second \(n)") }) Thread.sleep(forTimeInterval: 0.5) subscription1.dispose() Thread.sleep(forTimeInterval: 0.5) subscription2.dispose() print("Ended ----")
那么打印以下:
Started ---- Subscribed Subscribed First 0 Second 0 First 1 Second 1 First 2 Second 2 First 3 Second 3 First 4 Second 4 Disposed Second 5 Second 6 Second 7 Second 8 Second 9 Disposed Ended ----
订阅后的每一个订阅者(subscriber)同行会生成本身独立的元素序列。默认状况下,操做符是无状态的。无状态的操做符远多于有状态的操做符。
share
operator可是,若是你但愿多个观察者从一个订阅共享事件(元素),该怎么办?
有两件事须要定义:
一般是一个这样的组合,replay(1).refCount
,也就是 share(replay: 1)
。
let counter = myInterval(0.1) .share(replay: 1) print("Started ----") let subscription1 = counter .subscribe(onNext: { n in print("First \(n)") }) let subscription2 = counter .subscribe(onNext: { n in print("Second \(n)") }) Thread.sleep(forTimeInterval: 0.5) subscription1.dispose() Thread.sleep(forTimeInterval: 0.5) subscription2.dispose() print("Ended ----")
这将打印:
Started ---- Subscribed First 0 Second 0 First 1 Second 1 First 2 Second 2 First 3 Second 3 First 4 Second 4 First 5 Second 5 Second 6 Second 7 Second 8 Second 9 Disposed Ended ----
请注意如今只有一个 Subscribed 和 Disposed 事件。
对 URL 可观察对象(observable)的行为是等效的。
下面的例子展现了如何的 HTTP 请求封装在 Rx 中,这种封装很是像 interval
操做符的模式。
extension Reactive where Base: URLSession { public func response(_ request: URLRequest) -> Observable<(Data, HTTPURLResponse)> { return Observable.create { observer in let task = self.dataTaskWithRequest(request) { (data, response, error) in guard let response = response, let data = data else { observer.on(.error(error ?? RxCocoaURLError.Unknown)) return } guard let httpResponse = response as? HTTPURLResponse else { observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response))) return } observer.on(.next(data, httpResponse)) observer.on(.completed) } task.resume() return Disposables.create { task.cancel() } } } }
RxSwift 实现了许多操做符。
全部操做符的的 marble diagram 能够在 ReactiveX.io 看到。
在 Playgrouds 里边几乎有全部操做符的演示。
若是你须要一个操做符,而且不知道如何找到它,这里有一个操做符的决策树。
有两种方式能够建立自定义的操做符。
全部的内部代码都使用高度优化的运算符版本,所以它们不是最好的教程材料。这就是为何咱们很是鼓励使用标准运算符。
幸运的是,有一种简单的方法来建立操做符。建立新的操做符实际上就是建立可观察对象,前边的章节已经描述了如何作到这一点。
来看一下为优化的 map
操做符的实现:
extension ObservableType { func myMap<R>(transform: @escaping (E) -> R) -> Observable<R> { return Observable.create { observer in let subscription = self.subscribe { e in switch e { case .next(let value): let result = transform(value) observer.on(.next(result)) case .error(let error): observer.on(.error(error)) case .completed: observer.on(.completed) } } return subscription } } }
如今可使用自定义的 map 了:
let subscription = myInterval(0.1) .myMap { e in return "This is simply \(e)" } .subscribe(onNext: { n in print(n) })
这将打印:
Subscribed This is simply 0 This is simply 1 This is simply 2 This is simply 3 This is simply 4 This is simply 5 This is simply 6 This is simply 7 This is simply 8 ...
那么,若是用自定义运算符解决某些状况太难了呢? 你能够退出 Rx monad,在命令性世界中执行操做,而后使用 Subjects
再次将结果隧道传输到Rx。
下边的例子是不该该被常常实践的,是糟糕的代码味道,可是你能够这么作。
let magicBeings: Observable<MagicBeing> = summonFromMiddleEarth() magicBeings .subscribe(onNext: { being in // exit the Rx monad self.doSomeStateMagic(being) }) .disposed(by: disposeBag) // // Mess // let kitten = globalParty( // calculate something in messy world being, UIApplication.delegate.dataSomething.attendees ) kittens.on(.next(kitten)) // send result back to rx // // Another mess // let kittens = BehaviorRelay(value: firstKitten) // again back in Rxmonad kittens.asObservable() .map { kitten in return kitten.purr() } // ....
每一次你这样写的时候,其余人可能在其余地方写这样的代码:
kittens .subscribe(onNext: { kitten in // do something with kitten }) .disposed(by: disposeBag)
因此,不要尝试这么作。
有两种错误机制。
错误处理很是直接,若是一个序列以错误而终止,则全部依赖的序列都将以错误而终止。这是一般的短路逻辑。
你可使用 catch
操做符从可观察对象的失败中恢复,有各类各样的可让你详细指定恢复。
还有 retry
操做符,能够在序列出错的状况下重试。
KVO 是一个 Objective-C 的机制。这意味着他没有考虑类型安全,该项目试图解决这个问题的一部分。
有两种内置的方式支持 KVO:
// KVO extension Reactive where Base: NSObject { public func observe<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions, retainSelf: Bool = true) -> Observable<E?> {} } #if !DISABLE_SWIZZLING // KVO extension Reactive where Base: NSObject { public func observeWeakly<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions) -> Observable<E?> {} } #endif
看一下观察 UIView
的 frame 的例子,注意 UIKit 并不听从 KVO,可是这样能够
view .rx.observe(CGRect.self, "frame") .subscribe(onNext: { frame in ... })
或
view .rx.observeWeakly(CGRect.self, "frame") .subscribe(onNext: { frame in ... })
rx.observe
rx.observe
有更好的性能,由于它只是对 KVO 机制的包装,可是它使用场景有限。
它可用于观察从全部权图表中的self或祖先开始的 path(retainSelf = false)
它可用于观察从全部权图中的后代开始的 path(retainSelf = true)
path 必须只包含 strong 属性,不然你可能会由于在 dealloc 以前没有取消注册KVO观察者而致使系统崩溃。
例如:
self.rx.observe(CGRect.self, "view.frame", retainSelf: false)
rx.observeWeakly
rx.observeWeakly
比 rx.observe
慢一些,由于它必须在若引用的状况下处理对象释放。
它不只适用于 rx.observe
适用的全部场景,还适用于:
KVO 是 Objective-C 的机制,因此它重度以来 NSValue
。
RxCocoa 内置支持 KVO 观察 CGRect
、CGSize
、CGPoint
结构体。
当观察其余结构体时,须要手动从 NSValue
中提早值。
这里有展现如何经过实现 KVORepresentable
协议,为其余的结构体扩展 KVO 观察和 *rx.observe**方法。
在绑定到 UIKit 控件时,Observable 须要在 UI 层中知足某些要求。
Observable
须要在 MainScheduler
发送值,这只是普通的 UIKit/Cocoa 要求。
你的 API 最好在 MainScheduler
上返回结果。若是你试图从后台线程绑定一些东西到 UI,在 Debug build 中,RxCocoa 一般会抛出异常来通知你。
能够经过添加 observeOn(MainScheduler.instance)
来修复该问题。
你没法将失败绑定到 UIKit 控件,由于这是为定义的行为。
若是你不知道 Observable
是否能够失败,你能够经过使用 catchErrorJustReturn(valueThatIsReturnedWhenErrorHappens)
来确保它不会失败,可是错误发生后,基础序列仍将完成。
若是所需行为是基础序列继续产生元素,则须要某些版本的 retry
操做符。
你一般但愿在 UI 层中共享订阅,你不但愿单独的 HTTP 调用将相同的数据绑定到多个 UI 元素。
假设你有这样的代码:
let searchResults = searchText .throttle(0.3, $.mainScheduler) .distinctUntilChanged .flatMapLatest { query in API.getSearchResults(query) .retry(3) .startWith([]) // clears results on new search term .catchErrorJustReturn([]) } .share(replay: 1) // <- notice the `share` operator
你一般想要的是在计算后共享搜索结果,这就是 share
的含义。
在 UI 层中,在转换链的末尾添加 share
一般是一个很好的经验法则。由于你想要共享计算结果,而不是把 searcResults 绑定到多个 UI 元素时,触发多个 HTTP 链接。
另外,请参阅 Driver
,它旨在透明地包装这些 share
调用,确保在主 UI 县城上观察元素,而且不会将错误绑定到 UI。
原文为RxSwift/Getting Started,本文在原文基础上依自身须要略有修改。