RxSwift之Subject

一、PublishSubject

PublishSubject将对观察者发送订阅后产生的元素,而在订阅前发送的元素将不会发送给观察者。 git

示例:swift

let subject = PublishSubject<String>()

subject.onNext("🐘")

subject.subscribe(onNext: { print("订阅到了: \($0)") })
    .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")
复制代码
打印结果:
    订阅到了: 🐶
    订阅到了: 🐱
复制代码

源码分析: 数组

经过 PublishSubject 的继承关系,能够看出, PublishSubject 既是 可监听序列,也是 观察者

(1) 在第一次执行 onNext 函数 subject.onNext("🐘")闭包

会执行 PublishSubject.on 函数,并继续调用 dispatch 函数和 self._synchronized_on(event) 函数。

  • self._synchronized_on(event) 函数最终会返回 self._observers
  • self._observers 的类型是 Bag<(Event<Element>) -> Void>
  • 再查看 dispatch 函数的源码
    经分析能够知道,其做用就是对 bag 中保存的 代码块进行执行回调。

由于上一步传入的 bag 仅是一个初始化的 bag 中间没有保存任何代码块,因此不会执行任何回调。到这里,在第一次执行 subject.onNext("🐘") 已经结束。没有任何打印结果。ide

(2) 调用 subscribe函数

  • PublishSubject 源码中,实现了 subscribe
  • 最终会调用 let key = self._observers.insert(observer.on)
  • 查看 insert 函数源码。
    由于是第首次执行,因此 _dictionary 值为nil,且 _pairs.count 小于 arrayDictionaryMaxSize。因此,观察者的 on函数 observer.on 被加入到 _pairs 数组中。

(3)第二次执行 onNext 函数 subject.onNext("🐶")。开始的步骤和 (1)中一致,只是在最后执行 dispatch 函数时,由于 bag._pairs 有保存一个观察者的 on 函数代码块,因此会执行回调。源码分析

最终会回调 subscribe.onNext 闭包,打印结果。(ps: 这一步不清楚的小伙伴请阅读RxSwift核心逻辑简介post

二、BehaviorSubject

当观察者对BehaviorSubject进行订阅时,它会将源 Observable 中最新的元素发送出来。若是不存在最新的元素,就发送默认元素。而后将随后产生的元素发送出来。 ui

示例:spa

let subject = BehaviorSubject<Int>(value: 100)

subject.subscribe(onNext: { print("订阅1:\($0)") })
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(5)

subject.subscribe(onNext: { print("订阅2:\($0)") })
    .disposed(by: disposeBag)
复制代码
打印结果:
    订阅1100
    订阅13
    订阅15
    订阅25
复制代码

源码分析: BehaviorSubjectPublishSubject 源码很类似,可是也有差别之处,咱们重点分析其中的差别,类似之处请参照上面的 PublishSubject 源码分析。

(1)初始化方法

public init(value: Element) {
    self._element = value
    
    #if TRACE_RESOURCES
    _ = Resources.incrementTotal()
    #endif
}
复制代码

BehaviorSubject 的初始化方法须要传入一个初始值,做为默认元素。

(2) subscribe 函数

BehaviorSubject_synchronized_subscribe 函数中比 PublishSubject 多一行代码,执行了一次 observer.on 函数,并将 self._element 做为参数传递。 self._element 中保存的是最新发送的元素,若是没有最新元素,则为 init 初始化时的默认元素。

(3)onNext 函数

BehaviorSubject 在调用 onNext 时,会将最新的元素保存在 self._element 中,在执行执行 subscribe 时,发送出去。

三、ReplaySubject

不管观察者是什么时候进行订阅的,ReplaySubject都将对观察者发送所有的元素。

示例:

let subject = ReplaySubject<Int>.create(bufferSize: 2)

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

subject.subscribe(onNext: { print("订阅到:\($0)") })
    .disposed(by: disposeBag)

subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
复制代码
打印结果:
    订阅到:2
    订阅到:3
    订阅到:4
    订阅到:5
    订阅到:6
复制代码

源码分析: (1)初始化方法 create函数

由于示例中传入的 bufferSize 为 2,因此建立一个 ReplayMany 对象。
其做用是建立一个指定大小的队列 queue 来保存须要发送的元素。

(2)onNext 函数 onNext 函数调用的是 ReplayBufferBaseon 函数。

  • 调用 addValueToBuffer 函数将发送的元素加入到 queue 中。
  • 调用 trim 函数删除 queue 中大于 bufferSize 的多余的元素。

(3)subscribe 函数

在将 observer 加入到 self._observers 以前,先调用了 self.replayBuffer(anyObserver)。会执行 ReplayManyBase.replayBuffer 函数。

override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
    for item in self._queue {
        observer.on(.next(item))
    }
}
复制代码

将保存在队列 queue 中的全部元素,发送给订阅者。

四、AsyncSubject

AsyncSubject 将在源 Observable 产生完成时间以后,发出最后一个元素(有且仅有最后一个元素)。若是源 Observable 没有发出任何元素,只有一个完成事件,则AsyncSubject也只有一个完成事件。若是源 Observable 产生了一个 error 事件而停止,那么 AsyncSubject 就不会发出任何元素,而是将 error 事件发送出来。

示例:

let subject = AsyncSubject<Int>()

subject.onNext(1)
subject.onNext(2)

subject.subscribe({ print("订阅到:\($0)")})
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
复制代码
打印结果:
    订阅到:next(4)
    订阅到:completed
复制代码

源码分析:重点分析on 函数 最后会来到下面👇的代码,获取保存的 observer.on 闭包,再执行事件回调。

  • next 事件中,将最新的元素保存在 self._lastElement 中后,并无返回全部的 observer.on 和 发送 next 事件,反而是初始化了一个空的 observer.on 集合和返回 completed 事件。因此不会有观察者响应事件收到信号。
  • error 事件中,会清空全部的 self._observers,并返回全部的 observer.onerror 事件,因此,全部的观察者都只会收到 error 信号。
  • completed 事件中。 会判断是否有发送过来的最新元素:若是有,就将最新元素发送出去,并执行 next 事件。
    而且在执行 next 事件以后,会执行 completed 事件。若是没有最新元素,则仅对全部 observer.on 发送 completed 事件。

五、Variable(已弃用)

示例:

let subject = Variable.init(1)

subject.value = 10
subject.value = 100

subject.asObservable().subscribe({ print("订阅到:\($0)")})
    .disposed(by: disposeBag)

subject.value = 1000
复制代码
打印结果:
    ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
    订阅到:next(100)
    订阅到:next(1000)
    订阅到:completed
复制代码

源码分析:

从源码能够看出, Variable 虽然没有继承自 ObserverType 或者 Observable。可是其有一个 _subject: BehaviorSubject<Element> 属性。因此, Variable 的行为和 BehaviorSubject 是一致的。但由于不是继承自 ObserverType,因此没有 on 函数,不能直接调用 on 函数发送信号。

  • 在初始化时,使用初始化值,初始化 BehaviorSubject,并保存在 self._subject 中。
  • value 作了一层封装,在 valueset 函数中,会调用 _subjecton 函数。完成信号的发送。

官方推荐使用 BehaviorRelayBehaviorSubject 做为替换。

六、BehaviorRelay

BehaviorRelay 就是 BehaviorSubject 去掉终止事件 onErroronCompleted

示例:

let subject = BehaviorRelay(value: 1)

subject.accept(10)

subject.subscribe({ print("订阅到:\($0)")})
    .disposed(by: disposeBag)

subject.accept(100)

subject.accept(1000)
复制代码
打印结果:
    订阅到:next(10)
    订阅到:next(100)
    订阅到:next(1000)
复制代码

源码分析:

查看源码,请注意 BehaviorRelay 上方的注释,注释中说得很是清楚, BehaviorRelay 是对 BehaviorSubject 的封装,可是和 BehaviorSubject 不同的地方在于, BehaviorRelay 不会被 errorcompleted 事件终止。

既然已经有了 BehaviorSubject,又为什么须要BehaviorRelay 来对其进行封装呢? 通常来讲,若是须要知道 BehaviorSubject 当前的发送的信号值,只能在 subscribe 中获取,可是使用 BehaviorRelay 则能够方便的使用 BehaviorRelay.value 获取到当前的信号,很是之方便。

以上就是对常见的 Subject 的一些分析,如有不足之处,请评论指正。

相关文章
相关标签/搜索