RxSwift (二)序列核心逻辑分析swift
RxSwift (三)Observable的建立,订阅,销毁api
RxSwift(五)(Rxswift对比swift,oc用法)markdown
RxSwift (十) 基础使用篇 1- 序列,订阅,销毁app
RxSwift学习之十二 (基础使用篇 3- UI控件扩展) @TOCide
RxSwift和RxCocoa还有一个额外的工具来辅助处理ARC和内存管理:即DisposeBag。这是Observer对象的一个虚拟”包”,当它们的父对象被释放时,这个虚拟包会被丢弃。 当带有DisposeBag属性的对象调用deinit()时,虚拟包将被清空,且每个一次性(disposable)Observer会自动取消订阅它所观察的内容。这容许ARC像一般同样回收内存。 若是没有DisposeBag,会有两种结果:或者Observer会产生一个retain cycle,被无限期的绑定到被观察对象上;或者意外地被释放,致使程序崩溃。 因此要成为一个ARC的良民,记得设置Observable对象时,将它们添加到DisposeBag中。这样,它们才能被很好地清理掉。函数
当一个Observable(被观察者)被观察订阅后,就会产生一个Disposable实例,经过这个实例,咱们就能进行资源的释放了。 对于RxSwift中资源的释放,也就是解除绑定、释放空间,有两种方法,分别是显式释放以及隐式释放:
let dispose = textField.rx_text .bindTo(label.rx_sayHelloObserver) dispose.dispose() 复制代码
DisposeBag
来进行,它相似于Objective-C ARC中的自动释放池机制,当咱们建立了某个实例后,会被添加到所在线程的自动释放池中,而自动释放池会在一个RunLoop周期后进行池子的释放与重建;DisposeBag对于RxSwift就像自动释放池同样,咱们把资源添加到DisposeBag中,让资源随着DisposeBag一块儿释放。以下实例:
let disposeBag = DisposeBag() func binding() { textField.rx_text .bindTo(label.rx_sayHelloObserver) .addDisposableTo(self.disposeBag) } 复制代码
上面代码中方法addDisposableTo会对DisposeBag进行弱引用,因此这个DisposeBag要被实例引用着,通常可做为实例的成员变量,当实例被销毁了,成员DisposeBag会跟着销毁,从而使得RxSwift在此实例上绑定的资源获得释放。
从上面的讲解咱们大体明白了DisposeBag就像咱们咱们OC内存管理里的自动释放池。他充当了一个垃圾回收袋的角色,你只需把序列加入了disposeBag,disposeBag就会在合适的时候帮咱们释放资源,那么它是怎么作到的呢?
public final class DisposeBag: DisposeBase { private var _lock = SpinLock() // state fileprivate var _disposables = [Disposable]() fileprivate var _isDisposed = false /// Constructs new empty dispose bag. public override init() { super.init() } /// Adds `disposable` to be disposed when dispose bag is being deinited. /// /// - parameter disposable: Disposable to add. public func insert(_ disposable: Disposable) { self._insert(disposable)?.dispose() } private func _insert(_ disposable: Disposable) -> Disposable? { //这里为了为了防止多线程下出现抢占资源问题,须要加锁控制同步访问 self._lock.lock(); defer { self._lock.unlock() } if self._isDisposed {//判断若是调用过了_dispose()说明已经被释放过了,不须要再释放,保证对称性,则直接返回 return disposable } //保存到数组中 self._disposables.append(disposable) return nil } /// This is internal on purpose, take a look at `CompositeDisposable` instead. private func dispose() { // 1.获取到全部保存的销毁者 let oldDisposables = self._dispose() // 2.遍历每一个销毁者,掉用每个销毁者的dispose()释放资源 for disposable in oldDisposables { disposable.dispose() } } private func _dispose() -> [Disposable] { self._lock.lock(); defer { self._lock.unlock() } // 获取到全部保存的销毁者 let disposables = self._disposables self._disposables.removeAll(keepingCapacity: false) self._isDisposed = true //这个变量用来记录是否垃圾袋数组被清空过 return disposables } deinit { //当DisposeBag自身对象被销毁时,调用本身的dispose(),遍历销毁数组中全部保存的销毁者, self.dispose() } } 复制代码
dispose()
方法是,DisposeBag
调用insert()
方法将咱们的须要销毁的序列保存起来存放在_disposables
数组中。_disposables
数组,依次调用他们本身的dispose()方法。func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 { this.lock() let oldValue = this.value this.value |= mask this.unlock() return oldValue } 复制代码
源码很简单,可是做用不小。代码中this
是传入的AtomicInt值,其内部仅有一个value值。 fetchOr 先将 this.value copy一份,做为结果返回。而将 this.value 和 mask 作或 (|) 运算。而且将或运算的结果赋值给 this.value。
this.value | mask | oldValue | 或 运算后this.value | 返回值 |
---|---|---|---|---|
0 | 1 | 0 | 1 | 0 |
1 | 1 | 1 | 1 | 1 |
0 | 2 | 0 | 2 | 0 |
1 | 2 | 1 | 3 | 1 |
就是作了一次或运算,实际的10进制结果不变,只是改变了里面的二进制位,能够用来作标志位,只是C语言里面常常用的方法,即一个Int类型处理自己的值可使用外,还能够经过按位与,或,来改变它的标志位,达到传递值的目的,这样每一个位均可以取代一个bool类型,常常用做枚举。
运算符 | 二进制 | 十进制 | 说明 |
---|---|---|---|
0000 0001 | 1 | ||
0000 0010 | 2 | ||
或运算 | 0000 0011 | 3 |
要知道这个答案,咱们只能经过源码来一步步分析:
实例1:
func limitObservable(){ // 建立序列 let ob = Observable<Any>.create { (observer) -> Disposable in observer.onNext("kongyulu") return Disposables.create { print("销毁释放了")} // dispose.dispose() } // 序列订阅 let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }, onError: { (error) in print("订阅到了:\(error)") }, onCompleted: { print("完成了") }) { print("销毁回调") } print("执行完毕") //dispose.dispose() } 复制代码
dispose.dispose()
这行代码去掉注释,而后从新运行,输出结果以下:
dispose.dispose()
就能够了呢?实例2:
func limitObservable(){ // 建立序列 let ob = Observable<Any>.create { (observer) -> Disposable in observer.onNext("kongyulu") observer.onCompleted() return Disposables.create { print("销毁释放了")} // dispose.dispose() } // 序列订阅 let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }, onError: { (error) in print("订阅到了:\(error)") }, onCompleted: { print("完成了") }) { print("销毁回调") } print("执行完毕") //dispose.dispose() } 复制代码
上面的实例2 的代码相对与实例1 就多了一行代码:observer.onCompleted()
: 咱们再来看一下输出结果:
observer.onCompleted()
代码后,就也打印了销毁回调,销毁释放了,这是什么逻辑呢? why?
再分析Dispose源码前,咱们必须先深刻理解序列的建立,订阅流程这个是基础,只有理解了这个,才能真正理解Dispose的原理。 这个其实在以前的博客已经分析过了,详情能够参考我以前的博客:序列核心逻辑
为了便于更好的理解,我在这里还再一次理一下具体的流程:
let ob = Observable<Any>.create { (observer) -> Disposable in 这里面是一个闭包咱们称为闭包A }
时,实际会来到Create.swift文件的第20行:public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> { return AnonymousObservable(subscribe) } 复制代码
AnonymousObservable(subscribe)
对象,并将咱们的闭包A传入到了AnonymousObservable的构造函数里面,AnonymousObservable将这个闭包A保存到了let _subscribeHandler: SubscribeHandler
这个变量中存起来了。_subscribeHandler
这个变量保存了序列ob建立时 传入的闭包A (其中闭包A要求传入AnyObserver
类型做为参数)final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable //这个变量保存了序列ob建立时 传入的闭包A (其中闭包A要求传入AnyObserver类型做为参数) let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler //这个变量保存了序列ob建立时 传入的闭包A } ...下面代码先不看省略掉 } 复制代码
let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)" }
这行代码进行序列ob的订阅操做,这行代码,咱们跟进源码能够查看到:在ObservableType+Extensions.swift文件的第39行:public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable ... 此处代码先不分析省略 let observer = AnonymousObserver<Element> { 这个里面是一个尾随闭包的内容这里先不分析 } return Disposables.create( self.asObservable().subscribe(observer), disposable ) } 复制代码
(4)从上面subscribe()
的源码能够看到,在函数中建立了一个AnonymousObserver的对象,而后直接就return Disposables.create()结束了。
(5)这里咱们并无发现订阅和咱们的闭包A有任何关系,那关键就在self.asObservable().subscribe(observer)
这行代码里面了,我来分析一下这行代码到底作了些什么。
(6)咱们要理解(5)中的这行代码,就须要先理解一下类的集成关系: AnonymousObservable
-> Producer
-> Observable
-> ObservableType
-> ObservableConvertibleType
详情以下图:
(7)经过继承关系,咱们能够顺着继承链往上找父类,咱们能够找到是在Observable
类中定义了这个asObservable()
方法:
public class Observable<Element> : ObservableType { ...此处省略不关注的代码 public func asObservable() -> Observable<Element> { return self } ...此处省略不关注的代码 } 复制代码
self.asObservable().subscribe(observer)
这行代码的self就是咱们建立的序列ob, 因此self.asObservable()
返回的就是ob咱们最开始建立的可观察序列。self.asObservable().subscribe(observer)
中的observer就是咱们在public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
方法实现中建立的局部变量:let observer = AnonymousObserver<Element> { 这个里面是一个尾随闭包的内容这里先不分析 }
咱们将这个局部变量传入了Observable的subscribe()
方法。subscribe()
方法作了些什么了。let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
的时候,实际调用了 ObservableType
协议的subscribe()方法,在这个方法里面咱们建立了一个AnonymousObserver
对象,并经过self.asObservable().subscribe(observer)
传入了ob.susbscribe(observer) (注意:这里的ob就是咱们create()建立的AnonymousObservable对象,而observer就是subscribe时建立临时局部AnonymousObserver对象,这些上面已经分析过了)。override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() //下面这行代码是重点,调用了本身的run()方法,并传入了两个参数: //参数1:observer:就是咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象 //参数2:disposer:就SinkDisposer()对象后将销毁会再分析。 let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } 复制代码
Producer
实现的subscribe()
接口里面,调用了本身的run()
方法,并在run()
方法里面传入了observer:就是咱们self.asObservable().subscribe(observer)
传入的AnonymousObserver
对象。那接下来我看一下run()作了一些什么事情:func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { rxAbstractMethod() } 复制代码
rxAbstractMethod()
,而这个rxAbstractMethod()只是一个抽象方法。那咱们的子类AnonymousObservable中确定覆写了run()方法。接下来咱们再看一下AnonymousObservable
的run()
的源码:override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { //建立了一个管子AnonymousObservableSink,并传给了管子两个参数: //参数1:observer:就是咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象 //参数2:disposer:就SinkDisposer()对象后将销毁会再分析。 let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } 复制代码
run()
源码中咱们能够看到:在AnonymousObservable
的run()
方法中。首先,建立了一个AnonymousObservableSink
对象sink
,并将observer
(也就是咱们self.asObservable().subscribe(observer)
传入的AnonymousObserver
对象)传入;其次,调用了sink.run(self)
方法返回了subscription,而后直接饭后一个元组,也就是run()方法返回了一个元组:(sink: sink, subscription: subscription)
。 可是咱们的重点是在sink管子上面。AnonymousObservableSink
是一个相似于manager角色,它保存了序列,订阅者,销毁者三个信息,还具有调度能力。咱们序列和订阅者就是通这个管子来作桥梁,实现通信。AnonymousObservableSink
管子作了一些什么呢?咱们来看一下AnonymousObservableSink
的源码:final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType { typealias Element = Observer.Element //这里给了一个别名:Parent就是AnonymousObservable序列 typealias Parent = AnonymousObservable<Element> // state private let _isStopped = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif override init(observer: Observer, cancel: Cancelable) { //传入了observer:就是咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象 super.init(observer: observer, cancel: cancel) } func on(_ event: Event<Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif switch event { case .next: if load(self._isStopped) == 1 {//若是已经执行过.error, .completed,就不会继续执行self.forwardOn(event)代码,意思就是只有对象生命周期内执行过.complete,.error事件,就不会再执行forwardOn,除非从新激活改变条件值。 return } self.forwardOn(event) case .error, .completed: //fetchOr()这个方法上面已经讲解过,做用就是控制确保只会执行一次 if fetchOr(self._isStopped, 1) == 0 {//若是从没有执行过就执行一次,不然不执行。以确保下面代码在对象生命周期内,不管on()调用多少次,都只会执行一次。 self.forwardOn(event) self.dispose() } } } //这是一个很重要的方法, func run(_ parent: Parent) -> Disposable { //这里传入parent就是AnonymousObservable序列,也就是咱们最开始create()序列ob,_subscribeHandler就是咱们建立序列时传入的闭包A(闭包A就相对一个函数,要求传入一个参数,这个参数就是AnyObserver(self)) return parent._subscribeHandler(AnyObserver(self)) } } 复制代码
self.asObservable().subscribe(observer)
传入的AnonymousObserver
对象。self.forwardOn(event)
方法。每次若是onNext事件都会调用一次forwardOn()
。可是.error, .completed事件最多只会调用一次forwardOn()
。AnonymousObserver
对象经过咱们AnonymousObservableSink对象sink,也就是AnyObserver(self)
中的self
包装成一个AnyObserver结构体以后,做为参数传入闭包A,这样就将咱们的序列和订阅者创建了联系。AnonymousObserver
实际上不是,传入闭包A的时一个AnyObserver结构体AnyObserver(self)
做为参数传给了闭包A,当咱们在闭包A里面调用这行代码时:observer.onNext("kongyulu")
时,因为通过ob.subscribe()订阅以后,AnyObserver(self)
就是咱们的observer.了,而此时的observer是一个结构体,它拥有了咱们的管子AnonymousObservableSink
对象的on()方法。observer.onNext("kongyulu")
序列消息时,实际上会经过咱们的管子AnonymousObservableSink.on()
来调度,最终调度咱们订阅时的闭包:onNext()闭包B:let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
。AnonymousObservableSink.on()
时如何从observer.onNext("kongyulu")
调度到咱们闭包B?**AnyObserver(self)
作了什么:先看一下AnyObsevrer的源码public struct AnyObserver<Element> : ObserverType { /// Anonymous event handler type. public typealias EventHandler = (Event<Element>) -> Void //这里定义了别名EventHandler就是一个传入事件的闭包 private let observer: EventHandler /// Construct an instance whose `on(event)` calls `eventHandler(event)` /// /// - parameter eventHandler: Event handler that observes sequences events. public init(eventHandler: @escaping EventHandler) { //self.observer保存了AnonymousObservableSink对象 self.observer = eventHandler } /// Construct an instance whose `on(event)` calls `observer.on(event)` /// /// - parameter observer: Observer that receives sequence events. //初始化时要求传入一个ObserverType,而这个是(17)点分析中AnyObserver(self)代码中的self,实际上就是AnonymousObservableSink对象, public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element { //这段代码直接保存了AnonymousObservableSink.on()方法 //self.observer实际就是一个on()方法 self.observer = observer.on } /// Send `event` to this observer. /// /// - parameter event: Event instance. public func on(_ event: Event<Element>) { //这里调用on方法实际就是调用AnonymousObservableSink.on(event)方法 return self.observer(event) } /// Erases type of observer and returns canonical observer. /// /// - returns: type erased observer. public func asObserver() -> AnyObserver<Element> { return self } } 复制代码
(19)经过上面AnyObserver源码分析,咱们得知AnyObserver
初始化时保存了咱们管子AnonymousObservableSink
的on()
方法,而且本身有一个on方法,在他本身的on方法里面再去调用AnonymousObservableSink.on()方法。这样就只是包装了一层不让外界知道咱们的AnonymousObservableSink
类,为啥这样设计呢?这样设计有几点好处:
AnonymousObservableSink
类,他们不关心咱们AnonymousObservableSink
类时如何实现的,使用者只须要用这个接口on()就好了,至于on()是如何实现的,经过谁实现并不须要关心。AnyObserver
并无拥有咱们AnonymousObservableSink
对象,它只是拥有了AnonymousObservableSink
的on()接口,只须要AnonymousObservableSink
实现这个on()接口该作的事情就能够了。至于AnonymousObservableSink
内部怎么改(只要on()接口不改)的都不会影响到AnyObserver
。(20)如今咱们的重点就在on()方法上面:
observer.onNext("kongyulu")
这行代码时,实际就会调用:AnyObserver.onNext()
方法。(因为咱们AnyObserver继承了ObserverType协议,也就拥有了ObserverType
的onNext()
方法,此处若是不清楚能够往上回看类继承关系)(21)AnyObserver.onNext()
调用的时候会调用本身的on()
方法:
ObserverType的接口定义
extension ObserverType { public func onNext(_ element: Element) { self.on(.next(element))//这里会调回到AnyObserver的on()方法,AnyObserver继承ObserverType,重写了on()接口 } public func onCompleted() { self.on(.completed) } public func onError(_ error: Swift.Error) { self.on(.error(error)) } } 复制代码
AnyObserver.on()
方法会调用 AnonymousObservableSink.on()
方法。AnonymousObservableSink.on(event)
会调用 AnonymousObservableSink.forwardOn(event)
forwardOn()
方法,咱们找到它的父类Sink里面实现了forwardOn()
源码以下:class Sink<Observer: ObserverType> : Disposable { fileprivate let _observer: Observer fileprivate let _cancel: Cancelable fileprivate let _disposed = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif init(observer: Observer, cancel: Cancelable) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif //初始化保存了self._observer实际就是:咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象 self._observer = observer self._cancel = cancel } final func forwardOn(_ event: Event<Observer.Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(self._disposed, 1) { return } // 这里实际调用了`AnonymousObserver.on()`方法。 self._observer.on(event) } ... 这次代码省略,不须要关注 } 复制代码
Sink.forwardOn()
实际调用了AnonymousObserver.on()
,说白了就是:咱们最开始实例1的observer.onNext("kongyulu")
这行代码执行时,ob.onNext() 先调用AnyObserver.on()
,AnyObserver.on()
又会调用AnonymousObservableSink.on()
,AnonymousObservableSink.on()
又会调用AnonymousObservableSink.forwardOn()
,接着AnonymousObservableSink.forwardOn()
又会调用AnonymousObservableSink父类的Sink.forwardOn()
,最后由Sink.forwardOn()
调用了AnonymousObserver.on()
。AnonymousObserver.on()
方法定义:final class AnonymousObserver<Element>: ObserverBase<Element> { //次处给尾随闭包取了一个别名 typealias EventHandler = (Event<Element>) -> Void private let _eventHandler : EventHandler init(_ eventHandler: @escaping EventHandler) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif //这里保存了一个传入的尾随闭包:这个尾随闭包就是咱们ob.subscribe()时建立let observer = AnonymousObserver<Element> { event in这里是个尾随闭包B} 这里传入_eventHandler保存的就是尾随闭包B self._eventHandler = eventHandler } override func onCore(_ event: Event<Element>) { //这里回调了咱们的尾随闭包B return self._eventHandler(event) } #if TRACE_RESOURCES deinit { _ = Resources.decrementTotal() } #endif } 复制代码
class ObserverBase<Element> : Disposable, ObserverType { private let _isStopped = AtomicInt(0) func on(_ event: Event<Element>) { switch event { case .next: if load(self._isStopped) == 0 { self.onCore(event)//这里实际调用的是子类的onCore() } case .error, .completed: if fetchOr(self._isStopped, 1) == 0 { self.onCore(event) } } } func onCore(_ event: Event<Element>) { rxAbstractMethod() } func dispose() { fetchOr(self._isStopped, 1) } } 复制代码
ObserverBase.on()
最终调用了AnonymousObserver.onCore()
,而在AnonymousObserver.onCore()
里回调了_eventHandler(event)闭包B,而闭包B就是咱们最初ob.subscribe()序列订阅时建立AnonymousObserver的尾随闭包,这样这个尾随闭包最终调用了咱们订阅的onNext()方法。这样就解释了:实例1中,执行observer.onNext("kongyulu")
这行代码就会回调let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
从而打印了 “订阅到了:kongyulu”具体AnonymousObserver {B}的尾随闭包B的代码以下:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ...这次无关代码先省略 //特别注意:AnonymousObserver<Element> { B}括号里面的尾随闭包咱们称为B,最终会经过AnonymousObserver.onCore()函数调用闭包B let observer = AnonymousObserver<Element> { event in ...这次无关代码先省略 switch event { case .next(let value): onNext?(value) //这里调用onNext(value)实际就是 case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) } 复制代码
闭包A
)建立时将闭包A保存在AnonymousObservable
里变量_subscribeHandler
。闭包B
)订阅序列时,会首先建立一个AnonymousObserver
对象,而且会带一个尾随闭包C
。而后经过self.asObservable().subscribe(AnonymousObserver)
通过一系列转化将AnyObserver
传递给了闭包A
。self.asObservable().subscribe(AnonymousObserver)
实际就是ob.subscribe(AnonymousObserver)
ob.subscribe(AnonymousObserver)
实际就是Producer.subscribe(AnonymousObserver)
Producer.subscribe(AnonymousObserver)
会调用self.run(AnonymousObserver)
self.run(AnonymousObserver)
会建立一个AnonymousObservableSink
管子对象sink,而后调用sink.run(AnonymousObservable)
调用了管子的run()方法,并将ob传入了管子sink.
而咱们管子的sink.run(AnonymousObservable)
方法里面调用了parent._subscribeHandler(AnyObserver(self))
实际就是ob._subscribeHandler(AnyObserver(AnonymousObservableSink))
也就是调用了闭包A
而咱们闭包A
须要传入一个参数就是AnyObserver(AnonymousObservableSink)
,实际上AnyObserver
只是一个结构体,它保存了AnonymousObservableSink.on()
方法。
当咱们在闭包A
里面调用observer.onNext("kongyulu")
实际上就是AnyObserver.onNext("kongyulu")
,而AnyObserver.onNext("kongyulu")
会调用AnyObserver.on()
AnyObserver.on()
接着又调用AnonymousObservableSink.on(event)
这里event里面
AnonymousObservableSink类中AnonymousObservableSink.on(event)
接着又会去调用它本身的forwardOn(event)
也就是AnonymousObservableSink.forwardOn(event)
AnonymousObservableSink.forwardOn(event)
其实是调用它父类Sink.forwardOn(event)
而在Sink父类初始化的时候已经保存了AnonymousObserver
对象_observer。
Sink.forwardOn(event)
会调用的是AnonymousObserver.on(event)
AnonymousObserver.on(event)
实际会调用本身父类的ObserverBase.on(event)
ObserverBase.on(event)
实际又会调用子类的AnonymousObserver.onCore(event)
AnonymousObserver.onCore(event)
会调用self._eventHandler(event)
而这里_eventHandler就是保存AnonymousObserver建立时传入的尾随闭包C
这样就回调了闭包C
闭包C
中又根据event的事件不一样,回调了闭包B
,例如如event=.onNext事件,就会回调闭包B
onNext{},也就是let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }, onError: { (error) in print("订阅到了:\(error)") }, onCompleted: { print("完成了") }) { print("销毁回调") }
里面的这段代码:onNext: { (anything) in print("订阅到了:\(anything)")
从而就会打印:“订阅到了:kongyulu”
(28)
(29)
最后这里经过一个流程图来表达整个建立,订阅过程
上面讲解了序列的建立,订阅流程,在分析建立序列,订阅序列的源码时,咱们已经隐隐约约的看到了咱们开篇分析的dispose(),貌似在整个源码中各处都有着dispose的代码,那么序列究竟是怎么销毁的呢?
为了解决这个疑问,咱们下面将经过分析源码,来探索一下序列的销毁流程。
这里先看一张序列生命周期时序图:
方式一: 经过发送事件,让序列生命周期自动结束来释放序列资源。 一个序列只要发出了 error 或者 completed 事件,它的生命周期将结束,那么全部内部资源都会被释放,不须要咱们手动释放。(这个结论在本篇博客讨论实例1和实例2的时候已经验证了,只要发送了completed和error事件,就会调用onComplete并打印“销毁了”信息)
方式二: 经过主动调用dispose()来释放。例如你须要提早释放序列资源或取消订阅的话,那么你能够对返回的可被清除的资源(Disposable) 调用 dispose 方法。
方式三: 经过垃圾袋DisposeBag来回收资源,达到自动释放,这是官方推荐的方式。官方推荐使用清除包(DisposeBag)来管理订阅的生命周期,通常是把资源加入到一个全局的DisposeBag里面,它跟随着页面的生命周期,当页面销毁时DisposeBag也会随之销毁,同时DisposeBag里面的资源也会被一一释放。(这个结论在上面的DisposeBag分析中也证明了)
咱们先来回顾一下本篇博客开始分析的实例1的代码 实例1:
func limitObservable(){ // 建立序列 let ob = Observable<Any>.create { (observer) -> Disposable in observer.onNext("kongyulu") return Disposables.create { print("销毁释放了")} // dispose.dispose() } // 序列订阅 let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }, onError: { (error) in print("订阅到了:\(error)") }, onCompleted: { print("完成了") }) { print("销毁回调") } print("执行完毕") //dispose.dispose() } 复制代码
dispose.dispose()
这行代码去掉注释,而后从新运行,输出结果以下:
Observable<Any>.create()
方法有一个尾随闭包,须要返回一个实现了Disposable
协议的实例。而就是经过return Disposables.create { print("销毁释放了")}
这行代码返回的。由此咱们确认Disposables.create { print("销毁释放了")}
很是重要,咱们先来发分析一下Disposables.create
源码。public struct Disposables { private init() {} } 复制代码
看这个结构体连初始化方法都是私有的,说明它不能被继承,因而咱们推测Disposables.create()
必定经过扩展的方式实现的。因此咱们在项目中搜索extension Disposables {
关键字,能够找到以下:
extension Disposables { /// Constructs a new disposable with the given action used for disposal. /// /// - parameter dispose: Disposal action which will be run upon calling `dispose`. public static func create(with dispose: @escaping () -> Void) -> Cancelable { return AnonymousDisposable(disposeAction: dispose)//这里dispose就是咱们传入的尾随闭包 } } 复制代码
经过上面源码,咱们看到直接一行return AnonymousDisposable(disposeAction: dispose)
就结束了,而dispose
就是咱们实例1中 Disposables.create { print("销毁释放了")} // dispose.dispose() }
这行代码里面的尾随闭包: { print("销毁释放了")}
这里咱们给他一个别名成为:闭包D
AnonymousDisposable
类实现一探究竟:fileprivate final class AnonymousDisposable : DisposeBase, Cancelable { public typealias DisposeAction = () -> Void private let _isDisposed = AtomicInt(0) private var _disposeAction: DisposeAction? /// - returns: Was resource disposed. public var isDisposed: Bool { return isFlagSet(self._isDisposed, 1) } fileprivate init(_ disposeAction: @escaping DisposeAction) { self._disposeAction = disposeAction super.init() } // Non-deprecated version of the constructor, used by `Disposables.create(with:)` fileprivate init(disposeAction: @escaping DisposeAction) { self._disposeAction = disposeAction super.init() } /// Calls the disposal action if and only if the current instance hasn't been disposed yet. /// /// After invoking disposal action, disposal action will be dereferenced. fileprivate func dispose() { if fetchOr(self._isDisposed, 1) == 0 { if let action = self._disposeAction { self._disposeAction = nil action() } } } } 复制代码
{ print("销毁释放了")}
dispose()
方法,经过fetchOr(self._isDisposed, 1) == 0
这行代码控制dispose()
里面的内容只会被执行一次。(不管dispose()
方法被执行多少次,if let action = self._disposeAction { self._disposeAction = nil action() }
这段代码最多会被执行一次)dispose()
方法中先把self._disposeAction
赋值给临时变量action
,而后置空self._disposeAction
,再执行action()
。这样操做的缘由是若是_disposeAction
闭包是一个耗时操做,也可以保证_disposeAction
可以当即释放。在AnonymousDisposable
里面咱们只看到了一些常规的保存等操做,结合咱们最开始分析序列的建立流程经验(AnonymousDisposable
就相似于AnonymousObservable
),咱们能够推断核心代码实现确定在订阅这一块。
接下来,咱们进入到observable.subscribe()
方法来探究一些subscribe()
的源码实现。
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { //1.这里定义disposable局部变量 let disposable: Disposable //2.建立了Disposables对象 if let disposed = onDisposed { disposable = Disposables.create(with: disposed) } else { disposable = Disposables.create() } //3.建立了一个AnonymousObserver对象,有一个重要的尾随闭包 let observer = AnonymousObserver<Element> { event in switch event { case .next(let value): onNext?(value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() //这里当收到error事件就会回收释放资源 case .completed: onCompleted?() disposable.dispose() //这里当收到completed事件就会回收释放资源 } } return Disposables.create( self.asObservable().subscribe(observer), disposable//这里将咱们建立的局部变量传给了self.asObservable().subscribe,也就是咱们的Producer.subscribe ) } 复制代码
分析上面subscribe()
源码,结合开始的分析,咱们能够得出如下结论:
subscribe()
建立了一个Disposable
对象,并保存了销毁回调闭包,当执行销毁时,会把消息回调出去。disposable.dispose()
释放资源。return Disposables.create( self.asObservable().subscribe(observer), disposable )
,这里返回的Disposable
对象就是咱们外面手动调用dispose.dispose()
方法的dispose
对象,或者说是加入到全局的DisposeBag
的销毁者。return Disposables.create( self.asObservable().subscribe(observer), disposable )
时关键点,咱们接下进入:Disposables.create()
源码:public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable { return BinaryDisposable(disposable1, disposable2)//返回一个二元销毁者对象。 } 复制代码
上面代码咱们看到create()直接返回了一个BinaryDisposable
二元销毁者类对象,并将disposable1
,disposable2
传入给了BinaryDisposable
。
disposable1
就是self.asObservable().subscribe(observer)
也就是Producer..subscribe(observer)
返回的disposerdisposable2
就是咱们subscribe()中建立局部变量let disposable: Disposable
BinaryDisposable
类究竟是什么:private final class BinaryDisposable : DisposeBase, Cancelable { private let _isDisposed = AtomicInt(0) // state private var _disposable1: Disposable? private var _disposable2: Disposable? /// - returns: Was resource disposed. var isDisposed: Bool { return isFlagSet(self._isDisposed, 1) } init(_ disposable1: Disposable, _ disposable2: Disposable) { self._disposable1 = disposable1 self._disposable2 = disposable2 super.init() } func dispose() { if fetchOr(self._isDisposed, 1) == 0 { self._disposable1?.dispose() self._disposable2?.dispose() self._disposable1 = nil self._disposable2 = nil } } } 复制代码