PublishSubject将对观察者发送订阅后产生的元素,而在订阅前发送的元素将不会发送给观察者。 git
![]()
示例:swift
let subject = PublishSubject<String>()
subject.onNext("🐘")
subject.subscribe(onNext: { print("订阅到了: \($0)") })
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
复制代码
打印结果:
订阅到了: 🐶
订阅到了: 🐱
复制代码
源码分析: 数组
PublishSubject
的继承关系,能够看出,
PublishSubject
既是
可监听序列,也是
观察者。
(1) 在第一次执行 onNext
函数 subject.onNext("🐘")
时 闭包
PublishSubject.on
函数,并继续调用
dispatch
函数和
self._synchronized_on(event)
函数。
self._synchronized_on(event)
函数最终会返回 self._observers
。self._observers
的类型是 Bag<(Event<Element>) -> Void>
。
dispatch
函数的源码
bag
中保存的 代码块进行执行回调。由于上一步传入的 bag
仅是一个初始化的 bag
中间没有保存任何代码块,因此不会执行任何回调。到这里,在第一次执行 subject.onNext("🐘")
已经结束。没有任何打印结果。ide
(2) 调用 subscribe
。函数
PublishSubject
源码中,实现了 subscribe
。
let key = self._observers.insert(observer.on)
insert
函数源码。
_dictionary
值为nil,且 _pairs.count
小于 arrayDictionaryMaxSize
。因此,观察者的 on
函数 observer.on
被加入到 _pairs
数组中。(3)第二次执行 onNext
函数 subject.onNext("🐶")
。开始的步骤和 (1)中一致,只是在最后执行 dispatch
函数时,由于 bag._pairs
有保存一个观察者的 on
函数代码块,因此会执行回调。源码分析
最终会回调 subscribe.onNext
闭包,打印结果。(ps: 这一步不清楚的小伙伴请阅读RxSwift核心逻辑简介)post
当观察者对BehaviorSubject进行订阅时,它会将源
Observable
中最新的元素发送出来。若是不存在最新的元素,就发送默认元素。而后将随后产生的元素发送出来。 ui![]()
示例:spa
let subject = BehaviorSubject<Int>(value: 100)
subject.subscribe(onNext: { print("订阅1:\($0)") })
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(5)
subject.subscribe(onNext: { print("订阅2:\($0)") })
.disposed(by: disposeBag)
复制代码
打印结果:
订阅1:100
订阅1:3
订阅1:5
订阅2:5
复制代码
源码分析: BehaviorSubject
和 PublishSubject
源码很类似,可是也有差别之处,咱们重点分析其中的差别,类似之处请参照上面的 PublishSubject
源码分析。
(1)初始化方法
public init(value: Element) {
self._element = value
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
复制代码
BehaviorSubject
的初始化方法须要传入一个初始值,做为默认元素。
(2) subscribe
函数
BehaviorSubject
在
_synchronized_subscribe
函数中比
PublishSubject
多一行代码,执行了一次
observer.on
函数,并将
self._element
做为参数传递。
self._element
中保存的是最新发送的元素,若是没有最新元素,则为
init
初始化时的默认元素。
(3)onNext
函数
BehaviorSubject
在调用
onNext
时,会将最新的元素保存在
self._element
中,在执行执行
subscribe
时,发送出去。
不管观察者是什么时候进行订阅的,ReplaySubject都将对观察者发送所有的元素。
![]()
示例:
let subject = ReplaySubject<Int>.create(bufferSize: 2)
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.subscribe(onNext: { print("订阅到:\($0)") })
.disposed(by: disposeBag)
subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
复制代码
打印结果:
订阅到:2
订阅到:3
订阅到:4
订阅到:5
订阅到:6
复制代码
源码分析: (1)初始化方法 create
函数
bufferSize
为 2,因此建立一个
ReplayMany
对象。
queue
来保存须要发送的元素。
(2)onNext
函数 onNext
函数调用的是 ReplayBufferBase
的 on
函数。
addValueToBuffer
函数将发送的元素加入到 queue
中。trim
函数删除 queue
中大于 bufferSize
的多余的元素。(3)subscribe
函数
observer
加入到
self._observers
以前,先调用了
self.replayBuffer(anyObserver)
。会执行
ReplayManyBase.replayBuffer
函数。
override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
for item in self._queue {
observer.on(.next(item))
}
}
复制代码
将保存在队列 queue
中的全部元素,发送给订阅者。
AsyncSubject 将在源
Observable
产生完成时间以后,发出最后一个元素(有且仅有最后一个元素)。若是源Observable
没有发出任何元素,只有一个完成事件,则AsyncSubject也只有一个完成事件。若是源Observable
产生了一个error
事件而停止,那么 AsyncSubject 就不会发出任何元素,而是将error
事件发送出来。![]()
示例:
let subject = AsyncSubject<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe({ print("订阅到:\($0)")})
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
复制代码
打印结果:
订阅到:next(4)
订阅到:completed
复制代码
源码分析:重点分析on
函数 最后会来到下面👇的代码,获取保存的 observer.on
闭包,再执行事件回调。
next
事件中,将最新的元素保存在 self._lastElement
中后,并无返回全部的 observer.on
和 发送 next
事件,反而是初始化了一个空的 observer.on
集合和返回 completed
事件。因此不会有观察者响应事件收到信号。error
事件中,会清空全部的 self._observers
,并返回全部的 observer.on
和 error
事件,因此,全部的观察者都只会收到 error
信号。completed
事件中。 会判断是否有发送过来的最新元素:若是有,就将最新元素发送出去,并执行 next
事件。
next
事件以后,会执行 completed
事件。若是没有最新元素,则仅对全部 observer.on
发送 completed
事件。示例:
let subject = Variable.init(1)
subject.value = 10
subject.value = 100
subject.asObservable().subscribe({ print("订阅到:\($0)")})
.disposed(by: disposeBag)
subject.value = 1000
复制代码
打印结果:
ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
订阅到:next(100)
订阅到:next(1000)
订阅到:completed
复制代码
源码分析:
Variable
虽然没有继承自
ObserverType
或者
Observable
。可是其有一个
_subject: BehaviorSubject<Element>
属性。因此,
Variable
的行为和
BehaviorSubject
是一致的。但由于不是继承自
ObserverType
,因此没有
on
函数,不能直接调用
on
函数发送信号。
BehaviorSubject
,并保存在 self._subject
中。value
作了一层封装,在 value
的 set
函数中,会调用 _subject
的 on
函数。完成信号的发送。官方推荐使用 BehaviorRelay 和 BehaviorSubject 做为替换。
BehaviorRelay 就是 BehaviorSubject 去掉终止事件
onError
和onCompleted
。
示例:
let subject = BehaviorRelay(value: 1)
subject.accept(10)
subject.subscribe({ print("订阅到:\($0)")})
.disposed(by: disposeBag)
subject.accept(100)
subject.accept(1000)
复制代码
打印结果:
订阅到:next(10)
订阅到:next(100)
订阅到:next(1000)
复制代码
源码分析:
BehaviorRelay
上方的注释,注释中说得很是清楚,
BehaviorRelay
是对
BehaviorSubject
的封装,可是和
BehaviorSubject
不同的地方在于,
BehaviorRelay
不会被
error
和
completed
事件终止。
既然已经有了 BehaviorSubject,又为什么须要BehaviorRelay 来对其进行封装呢? 通常来讲,若是须要知道 BehaviorSubject 当前的发送的信号值,只能在
subscribe
中获取,可是使用 BehaviorRelay 则能够方便的使用BehaviorRelay.value
获取到当前的信号,很是之方便。
以上就是对常见的 Subject 的一些分析,如有不足之处,请评论指正。