RxSwift学习笔记

最近在学习RxSwift相关的内容,在这里记录一些基本的知识点,以便从此查阅。编程

Observable

在RxSwift中,最关键的一个概念是可观察序列(Observable Sequence),它至关于Swift中的序列(Sequence),可观察序列中的每一个元素都是一个事件,咱们知道Swift的序列中能够包含任意多个元素,相似的,可观察序列会不断产生新的事件直到发生错误或正常结束为止。订阅者(Observer)经过订阅(subscribe)一个可观察队列来接收序列所产生的新事件,只有在有观察者的状况下序列才能够发送事件。swift

例如,使用of操做建立一个可观察序列:api

let seq = Observable.of(1, 2, 3) 
复制代码

of是一种用来建立Observable的简便操做,在上面的代码中建立了一个类型为Observable<Int>的Observable,里面包含了三个元素:1,2,3。多线程

来看看Observable中都提供了哪些操做,可观察序列是一个实现了ObservableType协议的类型,ObservableType协议的定义很是简单:闭包

protocol ObservableType : ObservableConvertibleType {
    associatedtype E
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
复制代码

其中E是一个关联类型,表示序列中元素的类型,除此以外协议只定义了一个方法:subscribe,用于向可观察序列添加一个观察者(ObserverType类型):并发

// 接收闭包的subscribe函数是经过协议扩展提供的简便方法
seq.subscribe { (event) in
    print(event)
}
复制代码

subscribe至关于Swift序列中的遍历操做(makeIterator),如上,向seq序列添加一个观察者,在序列中有新的事件时调用该闭包,上面的代码会输出1,2,3。app

Observer

观察者是实现了ObserverType协议的对象,ObserverType协议一样十分简单:ide

public protocol ObserverType {
    associatedtype E
    func on(_ event: Event<E>)
}
复制代码

E为观察者所观察序列中的元素类型,当序列中有新的事件产生时,会调用on方法来接收新的事件。其中事件的类型Event是一个枚举,其中包含3个类型:函数

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
复制代码
  1. .next:表示序列中产生了下一个事件,关联值Element保存了该事件的值。
  2. .error:序列产生了一个错误,关联值Error保存了错误类型,在这以后序列会直接结束(再也不产生新的next事件)。
  3. .completed:序列正常结束。

Dispose

除了产生错误和天然结束之外,还能够手动结束观察,在使用subscribe订阅一个可观察序列时,会返回一个Disposable类型的对象。这里的Disposable是一个协议,只定义了一个方法:学习

protocol Disposable {
    func dispose()
}
复制代码

dispose方法用来结束这次订阅并释放可观察序列中的相关资源,一般来讲你并不须要直接调用该方法,而是经过调用其扩展方法addDisposableToDisposable添加到一个DisposeBag对象中。DisposeBag对象会自动管理全部添加到其中的Disposable对象,在DisposeBag对象销毁的时候会自动调用其中全部Disposable的dispose方法释放资源。

也可使用takeUntil来自动结束订阅:

seq.takeUntil(otherSeq)
	.subscribe({ (event) in
    	print(event)
	})
复制代码

在otherSeq序列发出任意类型的事件以后,自动结束本次订阅。

建立序列

经过Observable类型提供的方法create能够建立一个自定义的可观察序列:

let seq = Observable<Int>.create { (observer) -> Disposable in
    observer.on(.next(1))
    observer.on(.completed)
    return Disposables.create {
        // do some cleanup
    }
}
复制代码

create方法使用一个闭包来建立自定义的序列,闭包接收一个ObserverType的参数observer,并经过observer来发送相应的事件。如上面的代码,建立了一个Observable<Int>类型的可观察序列,订阅该序列的观察者会收到事件1和一个完成事件。最后create方法返回一个本身建立的Disposable对象,能够在这里进行一些相关的资源回收操做。

除了create方法以外,RxSwift中提供了不少中简便的方法用于建立序列,经常使用的有:

  • just:建立一个只包含一个值的可观察序列:

    let justSeq = Observable.just(1)
    justSeq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    completed
    复制代码
  • ofofjust有点相似,不一样的是of能够将一系列元素建立成事件队列,该Observable依次发送相应事件和结束事件:

    let ofSeq = Observable.of(1, 2, 3)
    ofSeq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    next(3)
    completed
    复制代码
  • empty:这种类型的Observable只发送结束(Completed)事件

    let emptySequence = Observable<String>.empty()
    复制代码
  • error:该队列只发送一个error事件,传递一个自定义的错误类型。

    let errorSeq = Observable<TestError>.error(TestError.Error1)
    复制代码

Share

一般在咱们在订阅一个可观察序列的时候,每一次的订阅行为都是独立的,也就是说:

let seq = Observable.of(1, 2)
// 1
seq.subscribe { (event) in
    print("sub 1: \(event)")
}
// 2
seq.subscribe { (event) in
    print("sub 2: \(event)")
}
---- example output ----
sub 1: next(1)
sub 1: next(2)
sub 1: completed 
sub 2: next(1)
sub 2: next(2)
sub 2: completed 
复制代码

咱们连续订阅同一序列两次,每次都会接收到相同的事件,第二次订阅时并无由于第一次订阅的行为致使元素"耗尽"。有些时候咱们但愿让全部的观察者都共享同一份事件,这个时候可使用share

  • shareshareObservableType协议的一个扩展方法,它返回一个可观察序列,该序列的全部观察者都会共享同一份订阅,上面的代码加上share以后:

    let seq = Observable.of(1, 2).share()
    // 1
    seq.subscribe { (event) in
        print("sub 1: \(event)")
    }
    // 2
    seq.subscribe { (event) in
        print("sub 2: \(event)")
    }
    ---- example output ----
    sub 1: next(1)
    sub 1: next(2)
    sub 1: completed 
    sub 2: completed 
    复制代码

    能够看到,在第一次订阅时序列已经将全部的事件发送,后面再进行第二次订阅的时候只收到了一个完成事件。

  • shareReplayshareReplay的用法与share相似,它的方法签名以下:

    func shareReplay(_ bufferSize: Int) -> Observable<Element>
    复制代码

    不一样的地方在于,shareReplay接收一个整型参数bufferSize,指定缓冲区大小,订阅该序列的观察者会当即收到最近bufferSize条事件。

序列的变换和组合

在Swift的序列Sequence中,可使用map、flatMap和reduce等常见的函数式方法对其中的元素进行变换,RxSwift中的可观察序列一样也支持这些方法。

变换

  • map:这是map方法的签名:

    func map<Result>(_ transform: @escaping (E) throws -> Result) -> Observable<Result>
    复制代码

    在一个自定义的闭包中对序列的每个元素进行变换,返回一个包含转换后结果的可观察序列,与Swift中Sequence的map相似。

    let mappedSeq: Observable<String> = seq.map { (element) -> String in
    	return "value: \(element)"
    }
    复制代码
  • flatMap:先来看看flatMap的签名:

    func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
            -> Observable<O.E> 
    复制代码

    关于flatMap的做用一样能够类比SequenceSequence中的flatMap闭包遍历每个元素进行处理后返回一个新的序列,最后会将这些序列"展平",获得一个包含全部序列元素的新序列:

    let array = [1, 2]
    let res = array.flatMap { (n) -> [String] in
        return ["\(n)a", "\(n)b"]
    }
    // res: ["1a", "1b", "2a", "2b"]
    复制代码

    RxSwift中的flatMap用法与之相似,flatMap中的闭包会遍历可观察序列中的全部元素,并返回一个新的可观察序列,最后flatMap会返回一个包含全部元素的可观察序列:

    let seq = Observable.of(1, 2)
        .flatMap { (n) -> Observable<String> in
            return Observable.of("\(n)a", "\(n)b") // (1)
        }
        .subscribe { (event) in
            print(event)
        }
    // 获得的seq类型为Observable<String>
    ---- example output ----
    next(1a)
    next(1b)
    next(2a)
    next(2b)
    completed
    复制代码

    在闭包中建立了若干个可观察序列(1),这些序列中发送的next事件都会被传递到seq序列中,其中任何一个序列发生错误(发送了error事件)时,seq序列都会直接结束,再也不继续接收事件;可是只有全部序列都完成(发送了completed事件)后,seq序列才会正常结束。

  • flatMapLatest:做用与flatMap相似,可是对于闭包中生成的可观察序列,它并不会保留全部的序列的订阅,在遍历结束后,只保留最后建立的序列的订阅,以前建立的Observables都会取消订阅(相应序列的dispose方法也会被调用):

    // 与上一个例子相同的代码,仅将flatMap改为flatMapLatest
    let seq = Observable.of(1, 2)
        .flatMapLatest { (n) -> Observable<String> in
            return Observable.of("\(n)a", "\(n)b") // (1)
        }
        .subscribe { (event) in
            print(event)
        }
    ---- example output ----
    next(1a)
    next(2a)
    next(2b)
    completed
    复制代码

    由于订阅关系的改变,如今只有当最后建立的那个Observable正常结束时,seq才会收到completed事件。

    在这种状况下,flatMapLatest会获得与flatMap相同的输出:

    let seq = Observable.of(1, 2)
        .flatMapLatest { (n) -> Observable<String> in
            return Observable<String>.create({ (observer) -> Disposable in
                observer.onNext("\(n)a")
                observer.onNext("\(n)b")
    
                return Disposables.create { }
            })
        }
        .subscribe { (event) in
            print(event)
        }
    复制代码

    这是由于在上面的这个例子中所建立的Observable是同步建立元素的,没法被打断。

    相似的方法还有flatMapFirst,使用方法能够类比flatMapLatest

  • reduce和scanreduce的做用与Sequence中定义的同样,它接收一个初始值和一个闭包,在Observable中的每一个值上调用该闭包,并将每一步的结果做为下一次调用的输入:

    Observable.of(1, 2, 3).reduce(0) { (first, num) -> Float in
            return Float(first + num)
        }
        .subscribe { (event) in
            print(event)
        }
    // 输出:next(6.0), completed
    复制代码

    在上面的代码中,提供了一个初始值0,在闭包中计算和,并将结果序列的元素类型改为Float,序列的观察者最后接收到全部元素的和。

    scan的做用相似于reduce,它跟reduce之间惟一的区别在于,scan会发送每一次调用闭包后的结果:

    Observable.of(1, 2, 3).scan(0) { (first, num) -> Float in
            return Float(first + num)
        }
        .subscribe { (event) in
            print(event)
        }
    // 输出:next(1.0), next(3.0), next(6.0), completed
    复制代码

组合

  • startWith:在序列的开头加入一个指定的元素

    Observable.of(2, 3).startWith(1).subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    next(3)
    completed
    复制代码

    订阅该序列以后,会当即收到startWith指定的事件,即便此时序列并无开始发送事件。

  • merge:当你有多个类型相同的Observable,可使用merge方法将它们合并起来,同时订阅全部Observable中的事件:

    let seq1 = Observable.just(1)
    let seq2 = Observable.just(2)
    let seq = Observable.of(seq1, seq2).merge()
    seq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    completed
    复制代码

    只有当Observable中的元素也是Observable类型的时候才可使用merge方法,当其中一个序列发生错误的时候,seq都会被终止,一样的只有全部序列都完成以后,seq才会收到完成事件。

  • zipzip方法也能够将多个Observable合并在一块儿,与merge不一样的是,zip提供了一个闭包用来对多个Observable中的元素进行组合变化,最后得到一个新的序列:

    let seq1 = Observable.just(1)
    let seq2 = Observable.just(2)
    let seq: Observable<String> = Observable.zip(seq1, seq2) { (num1, num2) -> String in
        return "\(num1 + num2)"
    }
    seq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(3)
    completed
    复制代码

    zip方法按照参数个数的不一样有多个版本,最多支持合并8个可观察序列,须要注意的一点是,闭包所接收的参数是各个序列中对应位置的元素。也就是说,若是seq1发送了一个事件,而seq2发送了多个事件,闭包也只会被执行一次,seq中只有一个元素。

    组合的Observable中任意一个发生错误,最后的seq都会直接出错终止,当全部的Observable都发出completed事件后,seq才会正常结束。

  • combineLatestcombineLatest一样用于将多个序列组合成一个,使用方法与zip同样,可是它的调用机制跟zip不一样,每当其中一个序列有新元素时,combineLatest都会从其余全部序列中取出最后一个元素,传入闭包中生成新的元素添加到结果序列中。

Subject

Subject对象至关于一种中间的代理和桥梁的做用,它既是观察者又是可观察序列,在向一个Subject对象添加观察者以后,能够经过该Subject向其发送事件。Subject对象并不会主动发送completed事件,而且在发送了error或completed事件以后,Subject中的序列会直接终结,没法再发送新的消息。Subject一样也分为几种类型:

  • PublishSubjectPublishSubject的订阅者只会收到在其订阅(subscribe)以后发送的事件

    let subject = PublishSubject<Int>()
    subject.onNext(1)
    subject.subscribe { (event) in
        print(event)
    }
    subject.onNext(2)
    
    ---- example output ----
    next(2)
    复制代码

    能够看到,观察者只收到了事件2,在订阅以前发送的事件1并无接收到。

  • ReplaySubjectReplaySubject在初始化时指定一个大小为n的缓冲区,里面会保存最近发送的n条事件,在订阅以后,观察者会当即收到缓冲区中的事件:

    let subject = ReplaySubject<Int>.create(bufferSize: 2)
    subject.onNext(1)
    subject.subscribe { (event) in
        print(event)
    }
    subject.onNext(2)
    
    ---- example output ----
    next(1)
    next(2)
    复制代码
  • BehaviorSubjectBehaviorSubject在初始化时须要提供一个默认值,在订阅时观察者会马上收到序列上一次发送的事件,若是没有发送过事件则会收到默认值:

    let subject = BehaviorSubject(value: 1)
    subject.subscribe { (event) in
        print(event)
    }
    
    ---- example output ----
    next(1)
    复制代码
  • VariableVariable是对BehaviorSubject的一个封装,行为上与BehaviorSubject相似。Variable没有on之类的方法来发送事件,取而代之的是一个value属性,向value赋值能够向观察者发送next事件,而且访问value能够获取最后一次发送的数据:

    let variable = Variable(1)
    variable.asObservable().subscribe { (event) in
        print(event)
    }
    variable.value = 2
    
    ---- example output ----
    next(1)
    next(2)
    completed
    复制代码

    与其余Subject类型不一样的是,Variable在释放的时候会发送completed事件,而且Variable对象永远不会发送error事件。

Scheduler

Scheduler是RxSwift中进行多线程编程的一种方式,一个Observable在执行的时候会指定一个Scheduler,这个Scheduler决定了在哪一个线程对序列进行操做以及事件回调。默认状况下,在订阅Observable以后,观察者会在与调用subscribe方法时相同的线程收到通知,而且也会在该线程进行销毁(dispose)。

与GCD相似,Scheduler分为串行(serial)和并行(concurrent)两种类型,RxSwift中定义了几种Schedular:

  • CurrentThreadScheduler:这是默认的Scheduler,表明了当前的线程,serial类型。
  • MainScheduler:表示主线程,serial类型
  • SerialDispatchQueueScheduler:提供了一些快捷的方法来建立串行Scheduler,内部封装了DispatchQueue
  • ConcurrentDispatchQueueScheduler:提供了快捷的方法来建立并行Scheduler,一样封装了DispatchQueue

subscribeOn和observeOn

subscribeOnobserveOn是其中两个最重要的方法,它们能够改变Observable所在的Scheduler:

// main thread
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
let seq = Observable.of(1, 2)
seq.subscribeOn(scheduler)
    .map {
        return $0 * 2 // 子线程
    }
    .subscribe { (event) in
        print(event) // 子线程
    }
复制代码

在上面的代码中建立了一个并发的Scheduler,并在序列seq上调用subscribeOn指定了该Scheduler,能够看到,咱们在主线程中订阅该序列,可是map方法以及事件的回调都是在建立的子线程中执行。

subscribeOnobserveOn均可以指定序列的Scheduler,它们之间的区别在于:

  • subscribeOn设定了整个序列开始的时候所在的Scheduler,序列在建立以及以后的操做都会在这个Scheduler上进行,subscribeOn在整个链式调用中只能调用一次,以后再次调用subscribeOn没有任何效果。
  • observeOn指定一个Scheduler,在这以后的操做都会被派发到这个Scheduler上执行,observeOn能够在链式操做的中间改变Scheduler
createObservable().
	.doSomething()
	.subscribeOn(scheduler1) // (1)
	.doSomethingElse()
	.observeOn(scheduler2) // (2)
	.doAnother()
	...
复制代码

如上代码,在(1)处执行了subscribeOn以后,以前的操做createObservable()和doSomething()都会在scheduler1中执行,随后的doSomethingElse()一样也在scheduler1中执行,随后用observeOn指定了另一个scheduler2,以后的doAnother()会在scheduler2上执行。

为原有代码添加Rx扩展

RxSwift中提供了一种扩展机制,能够很方便的为原有的代码添加上Rx扩展。首先来看一个结构体Reactive

public struct Reactive<Base> {
    /// base是扩展的对象实例
    public let base: Base
	
    public init(_ base: Base) {
        self.base = base
    }
}
复制代码

Reactive是一个泛型结构体,只定义了一个属性base,而且在初始化结构体的时候传入该属性的值。

此外还定义了一个协议ReactiveCompatible

public protocol ReactiveCompatible {
    associatedtype CompatibleType

    static var rx: Reactive<CompatibleType>.Type { get set }
    var rx: Reactive<CompatibleType> { get set }
}
复制代码

该协议中分别为类对象和实例对象定义一个名字相同的属性:rx,类型为上面定义的Reactive,随后经过协议扩展为其提供了get的默认的实现:

extension ReactiveCompatible {
    public static var rx: Reactive<Self>.Type {
        get {
            return Reactive<Self>.self
        }
        set {
            // this enables using Reactive to "mutate" base type
        }
    }

    public var rx: Reactive<Self> {
        get {
            return Reactive(self)
        }
        set {
            // this enables using Reactive to "mutate" base object
        }
    }
}
复制代码

关联类型CompatibleType被自动推导为实现该协议的类自己,使用self初始化一个Reactive对象。

最后经过协议扩展为全部的NSObject类型实现了ReactiveCompatible协议:

extension NSObject: ReactiveCompatible { }
复制代码

这样一来,代码中全部继承自NSObject的类型实例中都会有一个类型为Reactive的属性rx,当咱们要为本身的类型添加Rx扩展时,只须要经过扩展向Reactive中添加方法就能够了,例如向UIButton类型添加扩展:

extension Reactive where Base: UIButton { // 为Reactive<UIButton>添加扩展
    public var tap: ControlEvent<Void> {
        return controlEvent(.touchUpInside) // 经过base能够访问该实例自己
    }
}
复制代码

因为Reactive是一个泛型类型,咱们能够经过where语句指定泛型的类型,这样一来,咱们就能够在UIButton实例的rx中访问tap属性了:

let button = UIButton(...)
button.rx.tap
复制代码

相似RxCocoa这样的RxSwift扩展库都是经过这种方式进行Rx扩展的。

相关文章
相关标签/搜索