RxSwift 入门

ReactiveX 是一个库,用于经过使用可观察序列来编写异步的、基于事件的程序。html

它扩展了观察者模式以支持数据、事件序列,并添加了容许你以声明方式组合序列的操做符,同时抽象对低层线程、同步、线程安全等。react

本文主要做为 RxSwift 的入门文章,对 RxSwift 中的一些基础内容、经常使用实践,作些介绍。git

本文地址为:http://www.javashuo.com/article/p-puprblib-v.html,转载请注明出处。程序员

Observables aka Sequences

Basics

观察者模式(这里指Observable(Element> Sequence)和正常序列(Sequence)的等价性对于理解 Rx 是至关重要的。github

每一个 Observable 序列只是一个序列。Observable 与 Swift 的 Sequence 相比,其主要优势是能够异步接收元素。这是 RxSwift 的核心。正则表达式

  • Observable(ObservableType) 与 Sequence 等价
  • Observable.subscribe 方法与 Sequence.makeIterator方法等价
  • Observer(callback)须要被传递到 Observable.subscribe 方法来接受序列元素,而不是在返回的 iterator 上调用 next() 方法

Sequence 是一个简单、熟悉的概念,很容易可视化。swift

人是具备巨大视觉皮层的生物。当咱们能够轻松地想象一个概念时,理解它就容易多了。api

咱们能够经过尝试模拟每一个Rx操做符内的事件状态机到序列上的高级别操做来解除认知负担。数组

若是咱们不使用 Rx 而是使用模型异步系统(model asynchronous systems),这可能意味着咱们的代码会充满状态机和瞬态,这些正式咱们须要模拟的,而不是抽象。安全

ListSequence 多是数学家和程序员首先学习的概念之一。

这是一个数字的序列:

--1--2--3--4--5--6--|   // 正常结束

另外一个字符序列:

--a--b--a--a--a---d---X     // terminates with error

一些序列是有限的,而一些序列是无限的,好比一个按钮点击的列:

---tap-tap-------tap--->

这些被叫作 marble diagram。能够在rxmarbles.com了解更多的 marble diagram。

若是咱们将序列愈发指定为正则表达式,它将以下所示:

next*(error | completed)?

这描述了如下内容:

  • Sequence 能够有 0 个 或者多个元素
  • 一旦收到 errorcompleted 事件,这个 Sequence 就不能再产生其余元素

在 Rx 中, Sequence 被描述为一个 push interface(也叫作 callbak)。

enum Event<Element>  {
    case next(Element)      // next element of a sequence
    case error(Swift.Error) // sequence failed with error
    case completed          // sequence terminated successfully
}

class Observable<Element> {
    func subscribe(_ observer: Observer<Element>) -> Disposable
}

protocol ObserverType {
    func on(_ event: Event<Element>)
}

当序列发送 errorcompleted 事件时,将释放计算序列元素的全部内部资源。

要当即取消序列元素的生成,并释放资源,能够在返回的订阅(subscription)上调用 dispose

若是一个序列在有限时间内结束,则不调用 dispose 或者不使用 disposed(by: disposeBag) 不会形成任何永久性资源泄漏。可是,这些资源会一直被使用,直到序列完成(完成产生元素,或者返回一个错误)。

若是一个序列没有自行终止,好比一系列的按钮点击,资源会被永久分配,直到 dispose 被手动调用(在 disposeBag 内调用,使用 takeUntil 操做符,或者其余方式)。

使用 dispose bag 或者 takeUtil 操做符是一个确保资源被清除的鲁棒(robust)的方式。即便序列将在有限时间内终止,咱们也推荐在生产环境中使用它们。

Disposing

被观察的序列(observed sequence)有另外一种终止的方式。当咱们使用完一个序列而且想要释放分配用于计算即将到来的元素的全部资源时,咱们能够在一个订阅上调用 dispose

这时一个使用 interval 操做符的例子:

let scheduler = SerialDispatchQueueScheduler(qos: .default)
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
    .subscribe { event in
        print(event)
    }

Thread.sleep(forTimeInterval: 2.0)

subscription.dispose()

上边的例子打印:

0
1
2
3
4
5

注意,你一般不但愿调用 dispose,这只是一个例子。手动调用 dispose 一般是一种糟糕的代码味道。dispose 订阅有更好的方式,好比 DisposeBagtakeUntil操做符、或者一些其余的机制。

那么,上边的代码是否能够在 dispose 被执行后,打印任何东西?答案是,是状况而定。

  • 若是上边的 scheduler 是串行调度器(serial scheduler),好比 MainSchedulerdispose 在相同的串行调度器上调用,那么答案就是 no。
  • 不然,答案是 yes。

你仅仅有两个过程在并行执行。

  • 一个在产生元素
  • 另外一个 dispose 订阅

“能够在以后打印某些内容吗?”这个问题,在这两个过程在不一样调度上执行的状况下甚至没有意义。

若是咱们的代码是这样的:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(MainScheduler.instance)
            .subscribe { event in
                print(event)
            }

// ....

subscription.dispose() // called from main thread

dispose 调用返回后,不会打印任何东西。

一样,在这个例子中:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(serialScheduler)
            .subscribe { event in
                print(event)
            }

// ...

subscription.dispose() // executing on same `serialScheduler`

dispose 调用返回后,也不会打印任何东西。

Dispose Bag

Dispose bags are used to return ARC like behavior to RX.

当一个 DisposeBag 被释放时,它会在每个可被 dispose 的对象(disposables)上调用 dispose

它没有 dispose 方法,所以不容许故意显式地调用 dispose。若是须要当即清理,咱们能够建立一个新的 DisposeBag

self.disposeBag = DisposeBag()

这将清除旧的引用,并引发资源清理。

若是仍然须要手动清理,可使用 CompositeDisposable。它具备所需的行为,但一旦调用了 dispose 方法,它将当即处理任何新添加可被dispose的对象(disposable)。

Take until

另外一种在 dealloc 时自动处理(dispose)订阅的方式是使用 takeUtil 操做符。

sequence
    .takeUntil(self.rx.deallocated)
    .subscribe {
        print($0)
    }

Implicit Observable guarantees

还有一些额外的保证,全部的序列产生者(sequence producers、Observable s),必须遵照.

它们在哪个线程上产生元素可有可无,但若是它们生成一个元素并发送给观察者observer.on(.next(nextElement)),那么在 observer.on 方法执行完成前,它们不能发送下一个元素。

若是 .next 事件尚未完成,那么生产者也不能发送终止 .completed.error

简而言之,考虑如下示例:

someObservable
  .subscribe { (e: Event<Element>) in
      print("Event processing started")
      // processing
      print("Event processing ended")
  }

它始终打印:

Event processing started
Event processing ended
Event processing started
Event processing ended
Event processing started
Event processing ended

它永远没法打印:

Event processing started
Event processing started
Event processing ended
Event processing ended

Creating your own Observable (aka observable sequence)

关于观察者有一个重要的事情须要理解。

建立 observable 时,它不会仅仅由于它已建立而执行任何工做。

确实,Observable 能够经过多种方式产生元素。其中一些会致使反作用,一些会影响现有的运行过程,例如点击鼠标事件等。

可是,若是只调用一个返回 Observable 的方法,那么没有序列生成,也没有反作用。Observable仅仅定义序列的生成方法以及用于元素生成的参数。序列生成始于 subscribe 方法被调用。

例如,假设你有一个相似原型的方法:

func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")

// no requests are performed, no work is being done, no URL requests were fired

let cancel = searchForMe
  // sequence generation starts now, URL requests are fired
  .subscribe(onNext: { results in
      print(results)
  })

有许多方法能够生成你本身的 Observable 序列,最简单方法或许是使用 create 函数。

RxSwift 提供了一个方法能够建立一个序列,这个序列订阅时返回一个元素。这个方法是 just。咱们亲自实现一下:

func myJust<E>(_ element: E) -> Observable<E> {
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

myJust(0)
    .subscribe(onNext: { n in
      print(n)
    })

这会打印:

0

不错,create 函数是什么?

它只是一个便利方法,使你可使用 Swift 的闭包轻松实现 subscribe 方法。像 subscribe 方法同样,它带有一个参数 observer,并返回 disposable。

以这种方式实现的序列其实是同步的(synchronous)。它将生成元素,并在 subscribe 调用返回 disposable 表示订阅前终止。所以,它返回的 disposable 并不重要,生成元素的过程不会被中断。

当生成同步序列,一般用于返回的 disposable 是 NopDisposable 的单例。

如今,咱们来建立一个从数组中返回元素的 observable。

func myFrom<E>(_ sequence: [E]) -> Observable<E> {
    return Observable.create { observer in
        for element in sequence {
            observer.on(.next(element))
        }

        observer.on(.completed)
        return Disposables.create()
    }
}

let stringCounter = myFrom(["first", "second"])

print("Started ----")

// first time
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("----")

// again
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("Ended ----")

上边的例子会打印:

Started ----
first
second
----
first
second
Ended ----

Creating an Observable that perfroms work

OK,如今更有趣了。咱们来建立前边示例中使用的 interval 操做符。

这至关于 dispatch queue schedulers 的实际实现

func myInterval(_ interval: TimeInterval) -> Observable<Int> {
    return Observable.create { observer in
        print("Subscribed")
        let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
        timer.scheduleRepeating(deadline: DispatchTime.now() + interval, interval: interval)

        let cancel = Disposables.create {
            print("Disposed")
            timer.cancel()
        }

        var next = 0
        timer.setEventHandler {
            if cancel.isDisposed {
                return
            }
            observer.on(.next(next))
            next += 1
        }
        timer.resume()

        return cancel
    }
}
let counter = myInterval(0.1)

print("Started ----")

let subscription = counter
    .subscribe(onNext: { n in
        print(n)
    })


Thread.sleep(forTimeInterval: 0.5)

subscription.dispose()

print("Ended ----")

上边的示例会打印:

Started ----
Subscribed
0
1
2
3
4
Disposed
Ended ----

若是这样写:

let counter = myInterval(0.1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
        print("First \(n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

Thread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

那么打印以下:

Started ----
Subscribed
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Disposed
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

订阅后的每一个订阅者(subscriber)同行会生成本身独立的元素序列。默认状况下,操做符是无状态的。无状态的操做符远多于有状态的操做符。

Sharing subscription and share operator

可是,若是你但愿多个观察者从一个订阅共享事件(元素),该怎么办?

有两件事须要定义:

  • 如何处理在新订阅者有兴趣观察它们以前收到的过去的元素(replay lastest only, replay all, replay last n)
  • 如何决定什么时候出发共享的订阅(refCount, manual or some other algorithm)

一般是一个这样的组合,replay(1).refCount,也就是 share(replay: 1)

let counter = myInterval(0.1)
    .share(replay: 1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
        print("First \(n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

Thread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

这将打印:

Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

请注意如今只有一个 Subscribed 和 Disposed 事件。

对 URL 可观察对象(observable)的行为是等效的。

下面的例子展现了如何的 HTTP 请求封装在 Rx 中,这种封装很是像 interval 操做符的模式。

extension Reactive where Base: URLSession {
    public func response(_ request: URLRequest) -> Observable<(Data, HTTPURLResponse)> {
        return Observable.create { observer in
            let task = self.dataTaskWithRequest(request) { (data, response, error) in
                guard let response = response, let data = data else {
                    observer.on(.error(error ?? RxCocoaURLError.Unknown))
                    return
                }

                guard let httpResponse = response as? HTTPURLResponse else {
                    observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response)))
                    return
                }

                observer.on(.next(data, httpResponse))
                observer.on(.completed)
            }

            task.resume()

            return Disposables.create {
                task.cancel()
            }
        }
    }
}

Operator

RxSwift 实现了许多操做符。

全部操做符的的 marble diagram 能够在 ReactiveX.io 看到。

Playgrouds 里边几乎有全部操做符的演示。

若是你须要一个操做符,而且不知道如何找到它,这里有一个操做符的决策树

Custom operators

有两种方式能够建立自定义的操做符。

Easy way

全部的内部代码都使用高度优化的运算符版本,所以它们不是最好的教程材料。这就是为何咱们很是鼓励使用标准运算符。

幸运的是,有一种简单的方法来建立操做符。建立新的操做符实际上就是建立可观察对象,前边的章节已经描述了如何作到这一点。

来看一下为优化的 map 操做符的实现:

extension ObservableType {
    func myMap<R>(transform: @escaping (E) -> R) -> Observable<R> {
        return Observable.create { observer in
            let subscription = self.subscribe { e in
                    switch e {
                    case .next(let value):
                        let result = transform(value)
                        observer.on(.next(result))
                    case .error(let error):
                        observer.on(.error(error))
                    case .completed:
                        observer.on(.completed)
                    }
                }

            return subscription
        }
    }
}


如今可使用自定义的 map 了:

let subscription = myInterval(0.1)
    .myMap { e in
        return "This is simply \(e)"
    }
    .subscribe(onNext: { n in
        print(n)
    })

这将打印:

Subscribed
This is simply 0
This is simply 1
This is simply 2
This is simply 3
This is simply 4
This is simply 5
This is simply 6
This is simply 7
This is simply 8
...

Life happens

那么,若是用自定义运算符解决某些状况太难了呢? 你能够退出 Rx monad,在命令性世界中执行操做,而后使用 Subjects 再次将结果隧道传输到Rx。

下边的例子是不该该被常常实践的,是糟糕的代码味道,可是你能够这么作。

let magicBeings: Observable<MagicBeing> = summonFromMiddleEarth()

magicBeings
  .subscribe(onNext: { being in     // exit the Rx monad
      self.doSomeStateMagic(being)
  })
  .disposed(by: disposeBag)

//
//  Mess
//
let kitten = globalParty(   // calculate something in messy world
  being,
  UIApplication.delegate.dataSomething.attendees
)
kittens.on(.next(kitten))   // send result back to rx

//
// Another mess
//
let kittens = BehaviorRelay(value: firstKitten) // again back in Rxmonad
kittens.asObservable()
  .map { kitten in
    return kitten.purr()
  }
  // ....

每一次你这样写的时候,其余人可能在其余地方写这样的代码:

kittens
  .subscribe(onNext: { kitten in
    // do something with kitten
  })
  .disposed(by: disposeBag)

因此,不要尝试这么作。

Error handling

有两种错误机制。

Asynchrouous error handling mechanism in observables

错误处理很是直接,若是一个序列以错误而终止,则全部依赖的序列都将以错误而终止。这是一般的短路逻辑。

你可使用 catch 操做符从可观察对象的失败中恢复,有各类各样的可让你详细指定恢复。

还有 retry 操做符,能够在序列出错的状况下重试。

KVO

KVO 是一个 Objective-C 的机制。这意味着他没有考虑类型安全,该项目试图解决这个问题的一部分。

有两种内置的方式支持 KVO:

// KVO
extension Reactive where Base: NSObject {
    public func observe<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions, retainSelf: Bool = true) -> Observable<E?> {}
}

#if !DISABLE_SWIZZLING
// KVO
extension Reactive where Base: NSObject {
    public func observeWeakly<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions) -> Observable<E?> {}
}
#endif

看一下观察 UIView 的 frame 的例子,注意 UIKit 并不听从 KVO,可是这样能够

view
  .rx.observe(CGRect.self, "frame")
  .subscribe(onNext: { frame in
    ...
  })

view
  .rx.observeWeakly(CGRect.self, "frame")
  .subscribe(onNext: { frame in
    ...
  })

rx.observe

rx.observe 有更好的性能,由于它只是对 KVO 机制的包装,可是它使用场景有限。

  • 它可用于观察从全部权图表中的self或祖先开始的 path(retainSelf = false)

  • 它可用于观察从全部权图中的后代开始的 path(retainSelf = true)

  • path 必须只包含 strong 属性,不然你可能会由于在 dealloc 以前没有取消注册KVO观察者而致使系统崩溃。

例如:

self.rx.observe(CGRect.self, "view.frame", retainSelf: false)

rx.observeWeakly

rx.observeWeaklyrx.observe 慢一些,由于它必须在若引用的状况下处理对象释放。

它不只适用于 rx.observe 适用的全部场景,还适用于:

  • 由于它不会持有被观察的对象,因此它能够用来观察全部权关系位置的任意对象
  • 它能够用来观察 weak 属性

Observing structs

KVO 是 Objective-C 的机制,因此它重度以来 NSValue

RxCocoa 内置支持 KVO 观察 CGRectCGSizeCGPoint 结构体。

当观察其余结构体时,须要手动从 NSValue 中提早值。

这里有展现如何经过实现 KVORepresentable 协议,为其余的结构体扩展 KVO 观察和 *rx.observe**方法。

UI layer tips

在绑定到 UIKit 控件时,Observable 须要在 UI 层中知足某些要求。

Threading

Observable 须要在 MainScheduler 发送值,这只是普通的 UIKit/Cocoa 要求。

你的 API 最好在 MainScheduler 上返回结果。若是你试图从后台线程绑定一些东西到 UI,在 Debug build 中,RxCocoa 一般会抛出异常来通知你。

能够经过添加 observeOn(MainScheduler.instance) 来修复该问题。

Error

你没法将失败绑定到 UIKit 控件,由于这是为定义的行为。

若是你不知道 Observable 是否能够失败,你能够经过使用 catchErrorJustReturn(valueThatIsReturnedWhenErrorHappens) 来确保它不会失败,可是错误发生后,基础序列仍将完成。

若是所需行为是基础序列继续产生元素,则须要某些版本的 retry 操做符。

Sharing subscription

你一般但愿在 UI 层中共享订阅,你不但愿单独的 HTTP 调用将相同的数据绑定到多个 UI 元素。

假设你有这样的代码:

let searchResults = searchText
    .throttle(0.3, $.mainScheduler)
    .distinctUntilChanged
    .flatMapLatest { query in
        API.getSearchResults(query)
            .retry(3)
            .startWith([]) // clears results on new search term
            .catchErrorJustReturn([])
    }
    .share(replay: 1)    // <- notice the `share` operator

你一般想要的是在计算后共享搜索结果,这就是 share 的含义。

在 UI 层中,在转换链的末尾添加 share 一般是一个很好的经验法则。由于你想要共享计算结果,而不是把 searcResults 绑定到多个 UI 元素时,触发多个 HTTP 链接。

另外,请参阅 Driver,它旨在透明地包装这些 share 调用,确保在主 UI 县城上观察元素,而且不会将错误绑定到 UI。


原文为RxSwift/Getting Started,本文在原文基础上依自身须要略有修改。

相关文章
相关标签/搜索