RxSwift (二)序列核心逻辑分析swift
RxSwift (三)Observable的建立,订阅,销毁设计模式
RxSwift(五)(Rxswift对比swift,oc用法)markdown
RxSwift (十) 基础使用篇 1- 序列,订阅,销毁函数
RxSwift学习之十二 (基础使用篇 3- UI控件扩展) @TOCoop
上一篇博客:Rxswift学习之(一)函数响应式编程思想只是简单的分析了序列的核心逻辑。本篇博客主要针对上一篇作一下更加深刻的探讨,若是有那些地方分析有误,还请留言:QQ:282889543,让咱们彼此提升,彼此成就。源码分析
总的来讲分析Rxswift的核心逻辑仍是按照三部曲:建立序列,订阅序列,销毁序列。核心思想是万物皆序列。
咱们先来看下建立Observable所涉及的类的继承关系: 以下图:
针对上面的类图,简单分析下类的关系和设计思想: 首先分层实施的很完全,每一层都只解决一件事情,一层层叠起来结构很是清晰: AnonymousObservable
-> Producer
-> Observable
-> ObservableType
-> ObservableConvertibleType
其次咱们简单分解一下每一个类都作了些什么:
Observable
类型协议,方法只有一个asObservable
,这有什么好处呢?asObservable
,它提供抽象方法subscribe,即咱们常说的订阅,只有外部订阅了该对象,才能真正实现对该对象进行观察。Observable
的全部方法,并实现subscribe 方法Producer
的全部方法,而且增长了属性let _subscribeHandler: SubscribeHandler
用来保存建立序列时传入的闭包,也就相对于拥有了调用这个序列的能力,此外它还实现run方法,这也是建立序列最核心关键的方法。在run()方法中它建立一个AnonymousObservableSink
final private类的对象,而这个对象sink能够称之为管子,它相似于manager的角色,拥有序列和订阅,销毁能力。这里有两个疑惑:问题1.
AnonymousObservableSink
为何要定义成final private类,不能被继承,也不能被外部访问? 问题2. 建立的Observable
是如何关联到订阅的?
这两个问题咱们后面再分析。
最后,咱们总结一下设计思想:
事实上用户所使用的
Observable
,都是Producer
的子类和AnonymousObservable
平行的子类,只不过用户不须要关心其具体实现罢了 每个相似AnonymousObservable
的类,还有一个与之相关的类AnonymousObservableSink
,Sink即管道,全部这些组合起来才能让其真正运行起来,AnonymousObservableSink
同时拥有序列,订阅功能,相似于咱们项目中常常用的manager角色。 整个设计向上经过组合协议的方式描述其特性,向下经过子类化的方式屏蔽其实现细节,相似于工厂模式,这样的类也能够叫类簇。
经过上面类继承关系,其实咱们不难理解序列的建立流程,它确实也是只有比较简单的几部,寥寥几行代码就搞定了,难点是上面抛出的几个问题:
下面咱们将经过一个简单的Rxswift的实例来分析一下序列的建立,订阅,销毁直接的流程和关系。
实例1:
//1. 建立序列 let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:发送信号 obserber.onNext("kyl发送了信号") obserber.onCompleted() return Disposables.create() } // 2:订阅信号 let _ = ob.subscribe(onNext: { (text) in print("订阅到:\(text)") }, onError: { (error) in print("error: \(error)") }, onCompleted: { print("完成") }) { print("销毁") } 复制代码
上面实例1 的这段代码能够用酷C老师的一个图来清晰的表达:
从上面的代码和关系图,咱们可能会产生这样一个疑惑:
问题3: 建立的ob序列,仅仅只是经过
ob.subscribe()
订阅了一下,为何咱们在ob建立时的尾随闭包(咱们这里给个名字叫闭包A
)里面调用了obserber.onNext("kyl发送了信号")
这个代码,咱们就能够订阅到let _ = ob.subscribe(onNext: { (text) in print("订阅到:\(text)") }
这里会打印:”订阅到:kyl发送了信号“。咱们没有看见他们之间有任何关联,怎么ob发送消息,subcribe()的onNext闭包就能够触发呢,这是为何呢?
咱们能够这里能够简单推理下:ob.subscribe()这个订阅方法确定作了一些事情,在某个地方调用了闭包A
,才能实现这个功能。具体是怎么样实现的呢?下面咱们将经过分析源码来解答这个疑惑。
从上面的代码咱们能够知道,建立序列就一行代码:let ob = Observable<Any>.create { (obserber) ->
而这一行代码实际上是作了好多事情的。
首先咱们经过一个流程图来初略的了解一下序列建立流程:
建立序列的Rxswift原码很简单,从上图能够看出,直接一行代码return AnonymousObservable(subscribe)
就结束了,这里咱们们并无找到咱们须要的答案,甚至咱们有点愈来愈晕感受。
final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler } override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } } 复制代码
咱们先作个深呼吸,放轻松,此路不通那咱们来尝试分析其余方向,不能在一棵树上吊死。下面咱们来分析一下订阅的流程。
回顾上面实例1中的订阅代码:let _ = ob.subscribe(onNext: { (text) in
这行代码又作了些什么事情呢?下面咱们经过源码来深刻分析一下:
subscribe()
的源码以下:public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ... 上面代码不是咱们要分析的重点,...表示忽略了这次的一段源码 /*注意,这次定义了一个AnonymousObserver()对象,以参数的形式, 构造方法里面传入了一个尾随闭包eventHandler, 在这个闭包里面,当收到event的不一样事件, 会触发并调用,咱们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包 */ let observer = AnonymousObserver<E> { event in ... switch event { case .next(let value): onNext?(value) //调用订阅时传入的ob.subscribe(onNext:闭包 case .error(let error): if let onError = onError { onError(error)//调用订阅时传入的ob.subscribe(onError:闭包 } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?()//调用订阅时传入的ob.subscribe(onCompleted:闭包 disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable )/*这里直接返回了Disposables对象,用来释放资源, 在它的构造函数里面直接调用了self.asObservable().subscribe(observer), 而asObservable()就是咱们建立的序列ob,也就是ob.subscribe(), 并传入了,在这段代码里面建立的局部变量let observer = AnonymousObserver<E>,*/ } 复制代码
经过上面源码咱们能够知道:subscribe()这个方法,以参数的形式传入了onNext()闭包,onError()闭包,onComplete()闭包,在函数里面建立了一个AnonymousObserver对象observer,这个对象建立的时候传入了一个闭包,当收到不一样event事件时,会分别调用咱们subscribe()传入的onNext,onError,onComplete这三个闭包。最重要一点是return Disposables.create( self.asObservable().subscribe(observer), disposable )
这句代码调用了咱们真正的subscribe()函数,并以参数的形式传入了AnonymousObserver对象,self.asObservable()就是咱们create()函数建立的序列ob, 而到此处咱们能够清晰的看到,咱们订阅时传入参数闭包和咱们建立的ob创建了一个链条。
这里咱们又有一个疑问:self.asObservable()为何就是咱们create()函数返回的ob呢?
要解答这个问题,我须要回顾一下上面分析的Observable类的继承关系:Observable
-> ObservableType
-> ObservableConvertibleType
即Observable继承ObservableType协议,ObservableType又继承ObservableConvertibleType协议,而咱们的ObservableConvertibleType提供了抽象方法asObservable(),咱们Observable类中实现了asObservable()这个方法,它直接返回self就它本身。
下面经过源码来证明:
/// /// It represents a push style sequence. public class Observable<Element> : ObservableType { ... public func asObservable() -> Observable<E> { return self } ... } 复制代码
分析了Rxswift订阅subscribe()
的源码感受很是nice, 咱们找到了咱们ob 建立时传入的闭包和咱们订阅时的闭包存在了一条链条关系,也就是只要ob发送了消息,那咱们的订阅者必定能够按照这个链条收到消息。可是咱们仍是不知道究竟是怎么调用的,怎么触发的。
并且咱们注意到self.asObservable().subscribe(observer)
也就是AnonymousObservable
调用了subscribe()
方法,可是在AnonymousObservable
类中咱们并无找到subscribe()的定义,因此咱们须要来看他的父类Producer
class Producer<Element> : Observable<Element> { override init() { super.init() } override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() /*重点在这里了,这里调用了run()方法,一切疑惑都清晰了,咱们知道了run()调用时传入了observer,而且建立了sink管子,而这个管子具有了序列的功能,能够调用on()方法。 */ let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { rxAbstractMethod() } } 复制代码
果真不出咱们所料,在Producer
中咱们找到了subscribe()的方法定义,到此咱们能够总结出很清晰的几条线索了:
Producer
实现了ObservableType
协议的subscribe()方法。在这个方法里面调用了self.run(observer, cancel: disposer)
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { //1.建立了一个sink管道对象,并将observer也就create()建立 //序列时传入的闭包传给了sink let sink = AnonymousObservableSink(observer: observer, cancel: cancel) //2. sink调用本身的run()方法,并把AnonymousObservable做为参数传入。 let subscription = sink.run(self) //返回一个元组,包含sink管道信息。 return (sink: sink, subscription: subscription) } 复制代码
(3)AnonymousObservableSink类中run()方法中调用parent._subscribeHandler(AnyObserver(self))
其中parent就是咱们(2)中sink.run(self)
传入的self,也就是AnonymousObservable对象;而且咱们前面已经知道_subscribeHandler就是建立序列时保存的那个经过create()函数参数传入的 闭包:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:发送信号 obserber.onNext("kyl发送了信号") obserber.onCompleted() return Disposables.create() }
。 如今已经很清晰了parent._subscribeHandler(AnyObserver(self))
执行闭包,这行代码就会调用obserber.onNext("kyl发送了信号")
这个行代码。
如今咱们能够经过一个流程图来总结咱们代码执行的流程:
上面的订阅序列流程分析:咱们弄明白了从订阅序列到调用create()函数时传入的参数闭包调用的逻辑,可是这个闭包发送onNext()信号后,怎么到订阅消息的onNext()闭包咱们还不是很清晰。所以咱们须要分析AnonymousObserver
咱们先来看下AnonymousObserver
类
AnonymousObserver
源码定义以下:final class AnonymousObserver<ElementType> : ObserverBase<ElementType> { typealias Element = ElementType typealias EventHandler = (Event<Element>) -> Void private let _eventHandler : EventHandler /*构造函数,保存了EventHandler尾随闭包*/ init(_ eventHandler: @escaping EventHandler) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif self._eventHandler = eventHandler } //覆写了onCore方法,调用了EventHandler闭包 override func onCore(_ event: Event<Element>) { return self._eventHandler(event) } #if TRACE_RESOURCES deinit { _ = Resources.decrementTotal() } #endif } 复制代码
从AnonymousObserver
源码中咱们并无找到onNext()方法,那咱们只能沿着它的继承链往上查找,这里须要了解一下类的继承关系:
经过分析类的继承关系,咱们得知:这样一个关系链:
AnonymousObserver对象的on()方法会调用onCore()方法,ObserverType里面有onNext,onError,onComplete方法。可是on()是如何调用的,什么时候调用的呢?
要解决这个疑问,咱们须要再次回到咱们建立序列的代码:
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> { return AnonymousObservable(subscribe) } 复制代码
建立序列的create()方法传入了一个subscribe闭包,并返回了AnonymousObservable对象。其中subscribe闭包就是咱们序列建立时参数形式传入 闭包。而且AnonymousObservable初始化时将这个闭包保存起来了self._subscribeHandler = subscribeHandler
AnonymousObservable 有一个run()方法,run方法里面建立了一个AnonymousObservableSink
对象sink,具体源码以下:
final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler } override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } } 复制代码
分析了这么久,绕了一圈,终于发现关键就在AnonymousObservableSink
管子这个对象里面了。sink这是个神奇的管子。它就保存了序列,也保存了订阅,还保存了用于销毁的disposed 也就是同时拥有了建立序列,订阅序列,销毁序列功能。
咱们来分析下AnonymousObservableSink
的源码:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType { typealias E = O.E //这里的Parent就是咱们上面分析的AnonymousObservable,很是重要 typealias Parent = AnonymousObservable<E> // state private let _isStopped = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif // 构造方法,传入了observer序列,和Cancelable override init(observer: O, cancel: Cancelable) { super.init(observer: observer, cancel: cancel) } //这里实现 了ObserverType协议的on()方法 func on(_ event: Event<E>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif switch event { case .next: if load(self._isStopped) == 1 { return } //调用了父类的发布,self.forwardOn()会调用本身的on()方法 self.forwardOn(event) case .error, .completed: if fetchOr(self._isStopped, 1) == 0 { self.forwardOn(event) self.dispose() } } } func run(_ parent: Parent) -> Disposable { /*调用了_subscribeHandler闭包,这个闭包就是咱们以前建立序列时传入闭包。 parent就是传入进来的序列,这里序列的闭包里传入了self而且强转为AnyObserver 这里将self传给了闭包_subscribeHandler,这样_subscribeHandler也就具有了subcribe的能力。 */ return parent._subscribeHandler(AnyObserver(self)) } } 复制代码
其中Sink类的源码以下:
class Sink<O : ObserverType> : Disposable { fileprivate let _observer: O fileprivate let _cancel: Cancelable fileprivate let _disposed = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif init(observer: O, cancel: Cancelable) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif self._observer = observer self._cancel = cancel } final func forwardOn(_ event: Event<O.E>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(self._disposed, 1) { return } //这里调用了传入observer.on()方法, self._observer.on(event) } final func forwarder() -> SinkForward<O> { return SinkForward(forward: self) } final var disposed: Bool { return isFlagSet(self._disposed, 1) } func dispose() { fetchOr(self._disposed, 1) self._cancel.dispose() } deinit { #if TRACE_RESOURCES _ = Resources.decrementTotal() #endif } } 复制代码
从源码分析咱们得知:
咱们的sink保存了咱们的序列,当咱们调用ob.onNext()发送信号时,因为咱们的sink已经持有了ob, 这样sink会调用on()方法,在on()方法里面会调用self.forwardOn(event),而在fowardOn()里面会调用self._observer.on(event)。这样个人疑问就解决了,答案就是sink调用了on()方法。
这里咱们再来总结一下总的流程:
create()
返回了一个ob
, 这个ob就是序列,建立的时候传递了一个闭包A
。在闭包A中调用了ob.onNext()
发送了信号。ob.subscribe()
方法,这个方法会建立一个AnonymousObserver
对象,并调用了self.asObservable().subscribe(observer)
。self.asObservable()
实际就是咱们的ob
, 也就是ob调用了subscribe().而AnonymousObserver中没有找到subscribe()。AnonymousObserver
的父类中找到了subscribe(),发现subscribe()调用了AnonymousObserver的run()
方法。sink.run(self)
,sink是AnonymousObservableSink的对象,而在sink的run()方法中parent._subscribeHandler(AnyObserver(self))
调用了建立序列时保存的闭包A (parent就是AnonymousObserver),这样就解释了订阅时,回调了A闭包的缘由。public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ... 上面代码不是咱们要分析的重点,...表示忽略了这次的一段源码 /*注意,这次定义了一个AnonymousObserver()对象,以参数的形式, 构造方法里面传入了一个尾随闭包eventHandler, 在这个闭包里面,当收到event的不一样事件, 会触发并调用,咱们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包 */ let observer = AnonymousObserver<E> { event in ... switch event { case .next(let value): onNext?(value) //调用订阅时传入的 复制代码
这里调用ob.subscribe()的时候,咱们建立了AnonymousObserver和咱们subscribe()传入的onNext()闭包作了一个绑定,当AnonymousObserver.onNext()
调用的时候一定会回调subscribe()传入的onNext()闭包。而10中的_observer对象指的就是let observer = AnonymousObserver