在上一篇RxSwift核心逻辑简介中已经简单分析了RxSwift的核心逻辑。swift
可是其中有一个点,解析得不是很清楚。那就是可观察序列和订阅者之间究竟是如何联系在一块儿的?那么这一篇就详细分析一下连接二者的管道——AnonymousObservableSink。api
以前已经知道了,第二步订阅信号之后,代码会来到这里。 闭包
run
函数中,会初始化
AnonymousObservableSink
,同时传递参数
observer和
cancel,
cancel是垃圾处理器,处理内存回收,不是本篇的重点,先忽略。咱们重点关注
observer。
AnonymousObservableSink
<—> observer先看看observer的来源。其是执行subscribe
函数内部初始化的匿名观察者AnonymousObserver
的实例对象。 ide
这个实例对象在AnonymousObservableSink初始化时做为初始化参数传入。咱们再来看AnonymousObservableSink
类的源码。函数
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
复制代码
类AnonymousObservableSink
在初始化时调用的是父类的初始化方法,咱们再找到其父类的源码。post
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
final func forwarder() -> SinkForward<O> {
return SinkForward(forward: self)
}
final var disposed: Bool {
return isFlagSet(self._disposed, 1)
}
func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
复制代码
经过父类的初始化方法能够看出,observer对象最终被保存在Sink
类的_observer
变量中。fetch
所以子类AnonymousObservableSink
就拥有了观察者observer。this
AnonymousObservableSink
<—> 可观察序列AnonymousObservableSink
被初始化后,会执行sink.run(self)
函数,这里的self即为第一步建立可观察序列中,建立的可观察序列。spa
经过run
函数,AnonymousObservableSink
与可观察序列之间就创建起了联系。3d
observer.onNext()
中的observer究竟是什么?咱们知道,在第三步==发送信号==会执行observer.onNext()
函数。这个observer会是咱们建立的可观察序列吗?咱们来分析一下源码。
在AnonymousObservableSink
对象执行run
函数时
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
复制代码
会调用parent的_subscribeHandler
。在上一篇中已经分析了_subscribeHandler
,就是第一步建立可观察序列时传入的闭包。而给闭包返回的参数是一个AnyObserver(self)
。因此,observer.onNext()
中的observer
不是咱们建立的可观察序列,而是AnyObserver
。
查看源码。
public struct AnyObserver<Element> : ObserverType {
/// The type of elements in sequence that observer can observe.
public typealias E = Element
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<E> {
return self
}
}
复制代码
AnyObserver
在初始化时,执行的是self.observer = observer.on
。这是什么意思呢?
前面已经分析了。AnyObserver
init时传入AnyObserver
的是AnonymousObservableSink
的实例对象。那么observer.on
实际上是AnonymousObservableSink.on
,而经过上面的AnonymousObservableSink
源码能够知道,on
实际上是一个函数。因此,AnyObserver
中的observer
是一个函数。
既然observer.onNext()
中的observer
不是可观察序列,而是AnyObserver
对象,那么第三步发送信号究竟是如何执行的呢?
observer.onNext()
分析点击onNext()
查看源码。
ObserverType
的
self.on(.next(element))
函数。可是
ObserverType
是一个协议,只有函数声明,没有实现,因此,最后调用的是
AnyObserver.on(.next(element))
函数。
从上面AnyObserver
的源码能够看出。on
函数,最终调用的实际上是self.observer(event)
,而在上一节的分析中,咱们已经知道。self.observer
其实保存的是AnonymousObservableSink
中的on
函数。
因此最后会回到AnonymousObservableSink
中调用self.forwardOn(event)
。而AnonymousObservableSink
中并无forwardOn
函数。那么老规矩,当前类中没有的方法,那么就去找它的父类Sink
。
在父类Sink
中找到forwardOn()
函数,函数中执行了self._observer.on(event)
。咱们以前已经分析过了,self._observer
其实就是订阅信号时建立的匿名观察者AnonymousObserver
。
因此,最后会调用AnonymousObserver.on
。可是AnonymousObserver
没有on()
函数。因此查找其父类ObserverBase
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<E>) {
rxAbstractMethod()
}
func dispose() {
fetchOr(self._isStopped, 1)
}
}
复制代码
分析发现,最后仍然会回到子类AnonymousObserver
而且调用其onCore()
函数。
而onCore()
函数中执行的是self._eventHandler(event)
。在上一篇中已经分析了self._eventHandler
就是订阅信号时传入的闭包。
因此,通过一系列的数据传递,observer.onNext()
,最终会回调到订阅信号时传入的闭包中。以此完成数据的传递和流转。