Rxswift (六)销毁者Dispose源码分析

Rxswift(一)函数响应式编程思想编程

RxSwift (二)序列核心逻辑分析swift

RxSwift (三)Observable的建立,订阅,销毁api

RxSwift(四)高阶函数数组

RxSwift(五)(Rxswift对比swift,oc用法)markdown

Rxswift (六)销毁者Dispose源码分析多线程

RxSwift(七)Rxswift对比swift用法闭包

RxSwift (十) 基础使用篇 1- 序列,订阅,销毁app

RxSwift学习之十二 (基础使用篇 3- UI控件扩展) @TOCide

Rxswift销毁者Dispose简介

  1. 先经过一张思惟导图初步了解一下销毁者Dispose它拥有什么,作了一些什么事情:
    image
  2. 本编文章主要是围绕上面这张图来展开,重点分析Dispose()是怎么销毁序列的。
  3. 从上图咱们能够看出销毁者后的第一个根节点是dispose和disposeBag.那他们分别是什么呢?答案将在下面讲解。

Rxswift销毁者类和重要函数介绍

1. DisposeBag

1.1 DisposeBag是什么

RxSwift和RxCocoa还有一个额外的工具来辅助处理ARC和内存管理:即DisposeBag。这是Observer对象的一个虚拟”包”,当它们的父对象被释放时,这个虚拟包会被丢弃。 当带有DisposeBag属性的对象调用deinit()时,虚拟包将被清空,且每个一次性(disposable)Observer会自动取消订阅它所观察的内容。这容许ARC像一般同样回收内存。 若是没有DisposeBag,会有两种结果:或者Observer会产生一个retain cycle,被无限期的绑定到被观察对象上;或者意外地被释放,致使程序崩溃。 因此要成为一个ARC的良民,记得设置Observable对象时,将它们添加到DisposeBag中。这样,它们才能被很好地清理掉。函数

当一个Observable(被观察者)被观察订阅后,就会产生一个Disposable实例,经过这个实例,咱们就能进行资源的释放了。 对于RxSwift中资源的释放,也就是解除绑定、释放空间,有两种方法,分别是显式释放以及隐式释放:

  • 显式释放 可让咱们在代码中直接调用释放方法进行资源的释放 以下面的实例:
let dispose = textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
dispose.dispose()
复制代码
  • 隐式释放 隐式释放则经过DisposeBag来进行,它相似于Objective-C ARC中的自动释放池机制,当咱们建立了某个实例后,会被添加到所在线程的自动释放池中,而自动释放池会在一个RunLoop周期后进行池子的释放与重建;DisposeBag对于RxSwift就像自动释放池同样,咱们把资源添加到DisposeBag中,让资源随着DisposeBag一块儿释放。

以下实例:

let disposeBag = DisposeBag()
func binding() {
       textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
           .addDisposableTo(self.disposeBag)
}
复制代码

上面代码中方法addDisposableTo会对DisposeBag进行弱引用,因此这个DisposeBag要被实例引用着,通常可做为实例的成员变量,当实例被销毁了,成员DisposeBag会跟着销毁,从而使得RxSwift在此实例上绑定的资源获得释放。

从上面的讲解咱们大体明白了DisposeBag就像咱们咱们OC内存管理里的自动释放池。他充当了一个垃圾回收袋的角色,你只需把序列加入了disposeBag,disposeBag就会在合适的时候帮咱们释放资源,那么它是怎么作到的呢?

1.2 DisposeBag的实现源码分析

1.2.1. 先看一下类图:

image

1.2.2. 具体分析源码流程

  1. 当咱们调用disposed()方法的时候,会调用Dispose类的insert()方法,将销毁者dispose加入的_disposables数组中。 具体源码以下:
public final class DisposeBag: DisposeBase {
    
    private var _lock = SpinLock()
    
    // state
    fileprivate var _disposables = [Disposable]()
    fileprivate var _isDisposed = false
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()
    }

    /// Adds `disposable` to be disposed when dispose bag is being deinited.
    ///
    /// - parameter disposable: Disposable to add.
    public func insert(_ disposable: Disposable) {
        self._insert(disposable)?.dispose()
    }
    
    private func _insert(_ disposable: Disposable) -> Disposable? {
        //这里为了为了防止多线程下出现抢占资源问题,须要加锁控制同步访问
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isDisposed {//判断若是调用过了_dispose()说明已经被释放过了,不须要再释放,保证对称性,则直接返回
            return disposable
        }
        //保存到数组中
        self._disposables.append(disposable)

        return nil
    }

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose() {
        // 1.获取到全部保存的销毁者
        let oldDisposables = self._dispose()

        // 2.遍历每一个销毁者,掉用每个销毁者的dispose()释放资源
        for disposable in oldDisposables {
            disposable.dispose()
        }
    }

    private func _dispose() -> [Disposable] {
        self._lock.lock(); defer { self._lock.unlock() }

        // 获取到全部保存的销毁者
        let disposables = self._disposables
        
        self._disposables.removeAll(keepingCapacity: false)
        self._isDisposed = true //这个变量用来记录是否垃圾袋数组被清空过
        
        return disposables
    }
    
    deinit {
        //当DisposeBag自身对象被销毁时,调用本身的dispose(),遍历销毁数组中全部保存的销毁者,
        self.dispose()
    }
}
复制代码
  1. 上面的源码流程经过一个图来标识
    image
  2. 总结一下上面的DisposeBag处理流程:
  • 当咱们调用序列的dispose()方法是,DisposeBag调用insert()方法将咱们的须要销毁的序列保存起来存放在_disposables数组中。
  • 当咱们的DisposeBag销毁时,如定义的局部变量出了做用域后,就会被销毁,首先会调用咱们的deinit()方法 如上图4,在deinit()里面会执行本身的dispose()方法,而后变量以前保存的全部须要释放的_disposables数组,依次调用他们本身的dispose()方法。

2. fetchOr()函数

  1. fetchOr 函数的做用相似于标记,先来看一下fetchOr()函数的源码:
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value |= mask
    this.unlock()
    return oldValue
}
复制代码

源码很简单,可是做用不小。代码中this 是传入的AtomicInt值,其内部仅有一个value值。 fetchOr 先将 this.value copy一份,做为结果返回。而将 this.value 和 mask 作或 (|) 运算。而且将或运算的结果赋值给 this.value。

  1. 咱们经过一个表来理解这个函数的执行结果:
this.value mask oldValue 或 运算后this.value 返回值
0 1 0 1 0
1 1 1 1 1
0 2 0 2 0
1 2 1 3 1

就是作了一次或运算,实际的10进制结果不变,只是改变了里面的二进制位,能够用来作标志位,只是C语言里面常常用的方法,即一个Int类型处理自己的值可使用外,还能够经过按位与,或,来改变它的标志位,达到传递值的目的,这样每一个位均可以取代一个bool类型,常常用做枚举。

运算符 二进制 十进制 说明
0000 0001 1
0000 0010 2
或运算 0000 0011 3
  1. 经过上面的分析,我得知 fetchOr ()函数的做用就是,能够确保每段代码只被执行一次,就至关于一个标志位,若是初始值为0 ,若是传入参数1,假设这段代码重复执行5次,只有第一次会从0变为1,后面四次调用都是为1,不会发送变化。

Dispose核心逻辑

Dispose 实例代码分析

  • 学过Rxswift的童鞋都知道dispose()调用后,会向咱们oc里面的引用计数器同样,释放咱们的资源。释放的时候咱们还能够监听到被销毁的事件回调。那么有没有思考过Dispose是如何作到的呢?

要知道这个答案,咱们只能经过源码来一步步分析:

  • 首先,咱们来看一段实例代码:

实例1:

func limitObservable(){
        // 建立序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("销毁释放了")} // dispose.dispose()
        }
        // 序列订阅
        let dispose = ob.subscribe(onNext: { (anything) in
            print("订阅到了:\(anything)")
        }, onError: { (error) in
            print("订阅到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("销毁回调")
        }
        print("执行完毕")
        //dispose.dispose()
    }
复制代码
  1. 上面的代码执行结果以下:
    image
  2. 经过上面的结果咱们知道,这个建立的序列没有被销毁,即没有打印“销毁释放了”,也没有打印“销毁回调”。这是为何呢?这个问题咱们后面再经过分析源码Rx源码就知道了。
  3. 如今咱们把上面代码的那行注释放开dispose.dispose() 这行代码去掉注释,而后从新运行,输出结果以下:
    image
  4. 经过上面的代码咱们看到了,建立的序列销毁了,销毁回调也执行了。那为何加上了dispose.dispose() 就能够了呢?
  5. 此外咱们再来修改一下咱们的代码:

实例2:

func limitObservable(){
        // 建立序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            observer.onCompleted()
            return Disposables.create { print("销毁释放了")} // dispose.dispose()
        }
        // 序列订阅
        let dispose = ob.subscribe(onNext: { (anything) in
            print("订阅到了:\(anything)")
        }, onError: { (error) in
            print("订阅到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("销毁回调")
        }
        print("执行完毕")
        //dispose.dispose()
    }
复制代码

上面的实例2 的代码相对与实例1 就多了一行代码:observer.onCompleted() : 咱们再来看一下输出结果:

image
这里咱们能够看到咱们多加了一行 observer.onCompleted()代码后,就也打印了销毁回调,销毁释放了,这是什么逻辑呢? why?

  • 下面就让咱们带着三个问题去探索一下Rxswift底层是如何实现的

Dispose 流程源码解析

再分析Dispose源码前,咱们必须先深刻理解序列的建立,订阅流程这个是基础,只有理解了这个,才能真正理解Dispose的原理。 这个其实在以前的博客已经分析过了,详情能够参考我以前的博客:序列核心逻辑

为了便于更好的理解,我在这里还再一次理一下具体的流程:

1. 序列建立,订阅流程

  • (1) 当咱们执行代码let ob = Observable<Any>.create { (observer) -> Disposable in 这里面是一个闭包咱们称为闭包A } 时,实际会来到Create.swift文件的第20行:
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
复制代码
  • (2) create()函数返回一个AnonymousObservable(subscribe)对象,并将咱们的闭包A传入到了AnonymousObservable的构造函数里面,AnonymousObservable将这个闭包A保存到了let _subscribeHandler: SubscribeHandler这个变量中存起来了。_subscribeHandler这个变量保存了序列ob建立时 传入的闭包A (其中闭包A要求传入AnyObserver类型做为参数)
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    //这个变量保存了序列ob建立时 传入的闭包A (其中闭包A要求传入AnyObserver类型做为参数)
    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler //这个变量保存了序列ob建立时 传入的闭包A
    }
    ...下面代码先不看省略掉
}
复制代码
  • (3)咱们调用let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)" } 这行代码进行序列ob的订阅操做,这行代码,咱们跟进源码能够查看到:在ObservableType+Extensions.swift文件的第39行:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            ... 此处代码先不分析省略
            
            let observer = AnonymousObserver<Element> { 
             这个里面是一个尾随闭包的内容这里先不分析
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
复制代码
  • (4)从上面subscribe()的源码能够看到,在函数中建立了一个AnonymousObserver的对象,而后直接就return Disposables.create()结束了。

  • (5)这里咱们并无发现订阅和咱们的闭包A有任何关系,那关键就在self.asObservable().subscribe(observer)这行代码里面了,我来分析一下这行代码到底作了些什么。

  • (6)咱们要理解(5)中的这行代码,就须要先理解一下类的集成关系: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType 详情以下图:

    image

  • (7)经过继承关系,咱们能够顺着继承链往上找父类,咱们能够找到是在Observable类中定义了这个asObservable()方法:

public class Observable<Element> : ObservableType {
    ...此处省略不关注的代码
    
    public func asObservable() -> Observable<Element> {
        return self
    }
    
    ...此处省略不关注的代码
}

复制代码
  • (8)经过源码分析,我得知 asObservable()就是返回self ,而(3)的代码调用是的self.asObservable().subscribe(observer) 这行代码的self就是咱们建立的序列ob, 因此self.asObservable()返回的就是ob咱们最开始建立的可观察序列。self.asObservable().subscribe(observer) 中的observer就是咱们在public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) 方法实现中建立的局部变量:let observer = AnonymousObserver<Element> { 这个里面是一个尾随闭包的内容这里先不分析 } 咱们将这个局部变量传入了Observable的subscribe()方法。
  • (9)接着咱们就要分享Observable的subscribe()方法作了些什么了。
  • (10)当咱们调用实例2中的这行代码:let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") } 的时候,实际调用了 ObservableType协议的subscribe()方法,在这个方法里面咱们建立了一个AnonymousObserver对象,并经过self.asObservable().subscribe(observer) 传入了ob.susbscribe(observer) (注意:这里的ob就是咱们create()建立的AnonymousObservable对象,而observer就是subscribe时建立临时局部AnonymousObserver对象,这些上面已经分析过了)。
  • (11)然而经过上面的类图,咱们能够看到在ob(AnonymousObservable)类中并无一个subscribe()的方法,那么咱们只能先找它的父类Producer.
  • (12)经过前面的类图分析,能够知道Producer继承Observerable可观察序列,遵循了ObservableType协议(这个协议定义一个subscribe()接口),因此咱们Producer中一定会实现这个接口。我来看一下源码:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            //下面这行代码是重点,调用了本身的run()方法,并传入了两个参数:
            //参数1:observer:就是咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
            //参数2:disposer:就SinkDisposer()对象后将销毁会再分析。
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
复制代码
  • (13)经过上面源码分析,咱们得知Producer实现的subscribe()接口里面,调用了本身的run()方法,并在run()方法里面传入了observer:就是咱们self.asObservable().subscribe(observer) 传入的AnonymousObserver对象。那接下来我看一下run()作了一些什么事情:
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
复制代码
  • (14)从上面Producer中的run()方法,咱们能够知道在这个方法并无作任何事情,就一行rxAbstractMethod(),而这个rxAbstractMethod()只是一个抽象方法。那咱们的子类AnonymousObservable中确定覆写了run()方法。接下来咱们再看一下AnonymousObservablerun()的源码:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        //建立了一个管子AnonymousObservableSink,并传给了管子两个参数:
         //参数1:observer:就是咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
            //参数2:disposer:就SinkDisposer()对象后将销毁会再分析。
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
复制代码
  • (15)在上面的run()源码中咱们能够看到:在AnonymousObservablerun()方法中。首先,建立了一个AnonymousObservableSink对象sink,并将observer(也就是咱们self.asObservable().subscribe(observer) 传入的AnonymousObserver对象)传入;其次,调用了sink.run(self) 方法返回了subscription,而后直接饭后一个元组,也就是run()方法返回了一个元组:(sink: sink, subscription: subscription)。 可是咱们的重点是在sink管子上面。AnonymousObservableSink是一个相似于manager角色,它保存了序列,订阅者,销毁者三个信息,还具有调度能力。咱们序列和订阅者就是通这个管子来作桥梁,实现通信。
  • (16)接下来咱们分析AnonymousObservableSink管子作了一些什么呢?咱们来看一下AnonymousObservableSink的源码:
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    //这里给了一个别名:Parent就是AnonymousObservable序列
    typealias Parent = AnonymousObservable<Element>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: Observer, cancel: Cancelable) {
    //传入了observer:就是咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
        super.init(observer: observer, cancel: cancel)
    }

func on(_ event: Event<Element>) {
    #if DEBUG
        self._synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { self._synchronizationTracker.unregister() }
    #endif
    switch event {
    case .next:
        if load(self._isStopped) == 1 {//若是已经执行过.error, .completed,就不会继续执行self.forwardOn(event)代码,意思就是只有对象生命周期内执行过.complete,.error事件,就不会再执行forwardOn,除非从新激活改变条件值。
            return
        }
        self.forwardOn(event)
    case .error, .completed:
    //fetchOr()这个方法上面已经讲解过,做用就是控制确保只会执行一次
        if fetchOr(self._isStopped, 1) == 0 {//若是从没有执行过就执行一次,不然不执行。以确保下面代码在对象生命周期内,不管on()调用多少次,都只会执行一次。
            self.forwardOn(event)
            self.dispose()
        }
    }
}

//这是一个很重要的方法,
    func run(_ parent: Parent) -> Disposable {
    //这里传入parent就是AnonymousObservable序列,也就是咱们最开始create()序列ob,_subscribeHandler就是咱们建立序列时传入的闭包A(闭包A就相对一个函数,要求传入一个参数,这个参数就是AnyObserver(self))
        return parent._subscribeHandler(AnyObserver(self))
    }
}
复制代码
  • (17)经过上面AnonymousObservableSink的源码,咱们得知有一下几点结论:
    • AnonymousObservableSink.init初始化时传入了observer:就是咱们self.asObservable().subscribe(observer) 传入的AnonymousObserver对象。
    • AnonymousObservableSink有一个on()方法,这个方法根据传入的参数event作了不一样的处理,但都会至少调用一次self.forwardOn(event)方法。每次若是onNext事件都会调用一次forwardOn()。可是.error, .completed事件最多只会调用一次forwardOn()
    • AnonymousObservableSink的run()方法是核心方法,是它会回调咱们最开始create()建立时传递的闭包A,并将咱们调用ob.subscribe()订阅时,函数内部建立的AnonymousObserver对象经过咱们AnonymousObservableSink对象sink,也就是AnyObserver(self)中的self包装成一个AnyObserver结构体以后,做为参数传入闭包A,这样就将咱们的序列和订阅者创建了联系。
    • **特别注意:**不少人认为传入咱们闭包A 的就是AnonymousObserver 实际上不是,传入闭包A的时一个AnyObserver结构体
    • 经过AnonymousObservableSink的run()方法咱们成功把咱们最开始的ob.subscibe()订阅时建立的闭包经过AnyObserver(self)做为参数传给了闭包A,当咱们在闭包A里面调用这行代码时:observer.onNext("kongyulu")时,因为通过ob.subscribe()订阅以后,AnyObserver(self)就是咱们的observer.了,而此时的observer是一个结构体,它拥有了咱们的管子AnonymousObservableSink对象的on()方法。
    • 在实例1中:当咱们发送observer.onNext("kongyulu")序列消息时,实际上会经过咱们的管子AnonymousObservableSink.on()来调度,最终调度咱们订阅时的闭包:onNext()闭包Blet dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
    • 那么如今最大的疑问就是**:AnonymousObservableSink.on()时如何从observer.onNext("kongyulu")调度到咱们闭包B?**
  • (18)要分析上面这个问题,咱们须要先来分析一下结构体AnyObserver(self)作了什么:先看一下AnyObsevrer的源码
public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    
    public typealias EventHandler = (Event<Element>) -> Void
    //这里定义了别名EventHandler就是一个传入事件的闭包
    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    
    public init(eventHandler: @escaping EventHandler) {
    //self.observer保存了AnonymousObservableSink对象
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    //初始化时要求传入一个ObserverType,而这个是(17)点分析中AnyObserver(self)代码中的self,实际上就是AnonymousObservableSink对象,
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
    //这段代码直接保存了AnonymousObservableSink.on()方法
    //self.observer实际就是一个on()方法
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
    //这里调用on方法实际就是调用AnonymousObservableSink.on(event)方法
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<Element> {
        return self
    }
}
复制代码
  • (19)经过上面AnyObserver源码分析,咱们得知AnyObserver初始化时保存了咱们管子AnonymousObservableSinkon()方法,而且本身有一个on方法,在他本身的on方法里面再去调用AnonymousObservableSink.on()方法。这样就只是包装了一层不让外界知道咱们的AnonymousObservableSink类,为啥这样设计呢?这样设计有几点好处:

    • 起到彻底封装效果,外界彻底不须要知道咱们的管子AnonymousObservableSink类,他们不关心咱们AnonymousObservableSink类时如何实现的,使用者只须要用这个接口on()就好了,至于on()是如何实现的,经过谁实现并不须要关心。
    • 起到解耦的效果,AnyObserver 并无拥有咱们AnonymousObservableSink对象,它只是拥有了AnonymousObservableSink的on()接口,只须要AnonymousObservableSink实现这个on()接口该作的事情就能够了。至于AnonymousObservableSink内部怎么改(只要on()接口不改)的都不会影响到AnyObserver
  • (20)如今咱们的重点就在on()方法上面:

    • 当咱们实例1中执行:observer.onNext("kongyulu")这行代码时,实际就会调用:AnyObserver.onNext()方法。(因为咱们AnyObserver继承了ObserverType协议,也就拥有了ObserverTypeonNext()方法,此处若是不清楚能够往上回看类继承关系)
  • (21)AnyObserver.onNext()调用的时候会调用本身的on()方法:

ObserverType的接口定义

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))//这里会调回到AnyObserver的on()方法,AnyObserver继承ObserverType,重写了on()接口
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
复制代码
  • (22) AnyObserver.on() 方法会调用 AnonymousObservableSink.on()方法。
  • (23)AnonymousObservableSink.on(event)会调用 AnonymousObservableSink.forwardOn(event)
  • (24)而在AnonymousObservableSink中没有定义forwardOn()方法,咱们找到它的父类Sink里面实现了forwardOn() 源码以下:
class Sink<Observer: ObserverType> : Disposable {
    fileprivate let _observer: Observer
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
//初始化保存了self._observer实际就是:咱们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        // 这里实际调用了`AnonymousObserver.on()`方法。
        self._observer.on(event)
    }

   ... 这次代码省略,不须要关注
}

复制代码
  • (25)从上面的源码咱们能够看到:Sink.forwardOn()实际调用了AnonymousObserver.on(),说白了就是:咱们最开始实例1的observer.onNext("kongyulu")这行代码执行时,ob.onNext() 先调用AnyObserver.on()AnyObserver.on()又会调用AnonymousObservableSink.on()AnonymousObservableSink.on()又会调用AnonymousObservableSink.forwardOn(),接着AnonymousObservableSink.forwardOn()又会调用AnonymousObservableSink父类的Sink.forwardOn(),最后由Sink.forwardOn()调用了AnonymousObserver.on()
  • (26)到这里咱们思路基本清晰了,咱们再回到AnonymousObserver.on()方法定义:
  1. 首先咱们查看类定义以下,并无找到由on()方法:
final class AnonymousObserver<Element>: ObserverBase<Element> {
//次处给尾随闭包取了一个别名
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
//这里保存了一个传入的尾随闭包:这个尾随闭包就是咱们ob.subscribe()时建立let observer = AnonymousObserver<Element> { event in这里是个尾随闭包B} 这里传入_eventHandler保存的就是尾随闭包B
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
    //这里回调了咱们的尾随闭包B
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}
复制代码
  1. 因而咱们来找它的父类ObserverBase:
class ObserverBase<Element> : Disposable, ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)//这里实际调用的是子类的onCore()
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self._isStopped, 1)
    }
}
复制代码
  1. 咱们经过分析父类源码得知ObserverBase.on()最终调用了AnonymousObserver.onCore(),而在AnonymousObserver.onCore()里回调了_eventHandler(event)闭包B,而闭包B就是咱们最初ob.subscribe()序列订阅时建立AnonymousObserver的尾随闭包,这样这个尾随闭包最终调用了咱们订阅的onNext()方法。这样就解释了:实例1中,执行observer.onNext("kongyulu")这行代码就会回调let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") } 从而打印了 “订阅到了:kongyulu”

具体AnonymousObserver {B}的尾随闭包B的代码以下:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
           
           ...这次无关代码先省略 
           //特别注意:AnonymousObserver<Element> { B}括号里面的尾随闭包咱们称为B,最终会经过AnonymousObserver.onCore()函数调用闭包B
            let observer = AnonymousObserver<Element> { event in
                ...这次无关代码先省略 
                switch event {
                case .next(let value):
                    onNext?(value) //这里调用onNext(value)实际就是
                    
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
复制代码
  • (27)经过(26)点分析咱们应该弄明白了整个订阅的流程了, 简单总结就是
  1. 咱们ob.create(闭包A)建立时将闭包A保存在AnonymousObservable 里变量_subscribeHandler
  2. 当咱们调用ob.subscribe(闭包B)订阅序列时,会首先建立一个AnonymousObserver对象,而且会带一个尾随闭包C。而后经过self.asObservable().subscribe(AnonymousObserver) 通过一系列转化将AnyObserver传递给了闭包A
  3. 其中2中说一系列转化能够简单解释为:
  • self.asObservable().subscribe(AnonymousObserver) 实际就是ob.subscribe(AnonymousObserver)

  • ob.subscribe(AnonymousObserver)实际就是Producer.subscribe(AnonymousObserver)

  • Producer.subscribe(AnonymousObserver)会调用self.run(AnonymousObserver)

  • self.run(AnonymousObserver) 会建立一个AnonymousObservableSink管子对象sink,而后调用sink.run(AnonymousObservable)调用了管子的run()方法,并将ob传入了管子sink.

  • 而咱们管子的sink.run(AnonymousObservable)方法里面调用了parent._subscribeHandler(AnyObserver(self))实际就是ob._subscribeHandler(AnyObserver(AnonymousObservableSink))也就是调用了闭包A

  • 而咱们闭包A须要传入一个参数就是AnyObserver(AnonymousObservableSink),实际上AnyObserver只是一个结构体,它保存了AnonymousObservableSink.on()方法。

  • 当咱们在闭包A里面调用observer.onNext("kongyulu")实际上就是AnyObserver.onNext("kongyulu"),而AnyObserver.onNext("kongyulu")会调用AnyObserver.on()

  • AnyObserver.on()接着又调用AnonymousObservableSink.on(event)这里event里面

  • AnonymousObservableSink类中AnonymousObservableSink.on(event) 接着又会去调用它本身的forwardOn(event)也就是AnonymousObservableSink.forwardOn(event)

  • AnonymousObservableSink.forwardOn(event) 其实是调用它父类Sink.forwardOn(event)而在Sink父类初始化的时候已经保存了AnonymousObserver对象_observer。

  • Sink.forwardOn(event)会调用的是AnonymousObserver.on(event)

  • AnonymousObserver.on(event)实际会调用本身父类的ObserverBase.on(event)

  • ObserverBase.on(event) 实际又会调用子类的AnonymousObserver.onCore(event)

  • AnonymousObserver.onCore(event) 会调用self._eventHandler(event)而这里_eventHandler就是保存AnonymousObserver建立时传入的尾随闭包C这样就回调了闭包C

  • 闭包C中又根据event的事件不一样,回调了闭包B,例如如event=.onNext事件,就会回调闭包B onNext{},也就是let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }, onError: { (error) in print("订阅到了:\(error)") }, onCompleted: { print("完成了") }) { print("销毁回调") } 里面的这段代码:onNext: { (anything) in print("订阅到了:\(anything)") 从而就会打印:“订阅到了:kongyulu”

  • (28)

  • (29)

最后这里经过一个流程图来表达整个建立,订阅过程

2. 序列建立,订阅图解

序列建立流程图:孔雨露(QQ:282889543)

3. 序列订阅流程

3.1 序列销毁方式

上面讲解了序列的建立,订阅流程,在分析建立序列,订阅序列的源码时,咱们已经隐隐约约的看到了咱们开篇分析的dispose(),貌似在整个源码中各处都有着dispose的代码,那么序列究竟是怎么销毁的呢?

为了解决这个疑问,咱们下面将经过分析源码,来探索一下序列的销毁流程。

这里先看一张序列生命周期时序图:

序列生命周期时序图
经过这张时序图,结合上面的序列建立,订阅的流程分析,我能够先得出序列会被销毁的3种方式:

  • 方式一经过发送事件,让序列生命周期自动结束来释放序列资源。 一个序列只要发出了 error 或者 completed 事件,它的生命周期将结束,那么全部内部资源都会被释放,不须要咱们手动释放。(这个结论在本篇博客讨论实例1实例2的时候已经验证了,只要发送了completed和error事件,就会调用onComplete并打印“销毁了”信息)

  • 方式二经过主动调用dispose()来释放。例如你须要提早释放序列资源或取消订阅的话,那么你能够对返回的可被清除的资源(Disposable) 调用 dispose 方法。

  • 方式三经过垃圾袋DisposeBag来回收资源,达到自动释放,这是官方推荐的方式。官方推荐使用清除包(DisposeBag)来管理订阅的生命周期,通常是把资源加入到一个全局的DisposeBag里面,它跟随着页面的生命周期,当页面销毁时DisposeBag也会随之销毁,同时DisposeBag里面的资源也会被一一释放。(这个结论在上面的DisposeBag分析中也证明了)

3.2 序列销毁实例分析

咱们先来回顾一下本篇博客开始分析的实例1的代码 实例1:

func limitObservable(){
        // 建立序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("销毁释放了")} // dispose.dispose()
        }
        // 序列订阅
        let dispose = ob.subscribe(onNext: { (anything) in
            print("订阅到了:\(anything)")
        }, onError: { (error) in
            print("订阅到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("销毁回调")
        }
        print("执行完毕")
        //dispose.dispose()
    }
复制代码
  1. 上面的代码执行结果以下:
    image
  2. 经过上面的结果咱们知道,这个建立的序列没有被销毁,即没有打印“销毁释放了”,也没有打印“销毁回调”。这是为何呢?这个问题咱们后面再经过分析源码Rx源码就知道了。
  3. 如今咱们把上面代码的那行注释放开dispose.dispose() 这行代码去掉注释,而后从新运行,输出结果以下:
    image
3.3 序列销毁源码分析
  1. 经过上面实例1的代码,首先能够看到,在建立序列Observable<Any>.create()方法有一个尾随闭包,须要返回一个实现了Disposable协议的实例。而就是经过return Disposables.create { print("销毁释放了")} 这行代码返回的。由此咱们确认Disposables.create { print("销毁释放了")}很是重要,咱们先来发分析一下Disposables.create源码。
  2. 进入到Disposables.create()源码:咱们想直接点击进去发现Disposables就是一个空结构体
public struct Disposables {
    private init() {}
}
复制代码

看这个结构体连初始化方法都是私有的,说明它不能被继承,因而咱们推测Disposables.create()必定经过扩展的方式实现的。因此咱们在项目中搜索extension Disposables {关键字,能够找到以下:

image
这样咱们找到第一个:AnonymousDisposable.swift文件进入找到第55行:

extension Disposables {

    /// Constructs a new disposable with the given action used for disposal.
    ///
    /// - parameter dispose: Disposal action which will be run upon calling `dispose`.
    public static func create(with dispose: @escaping () -> Void) -> Cancelable {
        return AnonymousDisposable(disposeAction: dispose)//这里dispose就是咱们传入的尾随闭包
    }

}
复制代码

经过上面源码,咱们看到直接一行return AnonymousDisposable(disposeAction: dispose)就结束了,而dispose 就是咱们实例1中 Disposables.create { print("销毁释放了")} // dispose.dispose() } 这行代码里面的尾随闭包: { print("销毁释放了")} 这里咱们给他一个别名成为:闭包D

  1. 不用思考,接下来咱们确定要进入AnonymousDisposable类实现一探究竟:
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
    public typealias DisposeAction = () -> Void

    private let _isDisposed = AtomicInt(0)
    private var _disposeAction: DisposeAction?

    /// - returns: Was resource disposed.
    public var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    fileprivate init(_ disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    // Non-deprecated version of the constructor, used by `Disposables.create(with:)`
    fileprivate init(disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    /// Calls the disposal action if and only if the current instance hasn't been disposed yet.
    ///
    /// After invoking disposal action, disposal action will be dereferenced.
    fileprivate func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }
}
复制代码
  1. 分析上面AnonymousDisposable类定义源码,咱们能够得出如下结论:
  • 初始化的时候把外界传过来的闭包进行保存,传入进来的闭包咱们就是咱们第2点中分析的闭包D{ print("销毁释放了")}
  • 有一个dispose()方法,经过fetchOr(self._isDisposed, 1) == 0这行代码控制dispose()里面的内容只会被执行一次。(不管dispose()方法被执行多少次,if let action = self._disposeAction { self._disposeAction = nil action() } 这段代码最多会被执行一次)
  • dispose()方法中先把self._disposeAction赋值给临时变量action,而后置空self._disposeAction,再执行action()。这样操做的缘由是若是_disposeAction闭包是一个耗时操做,也可以保证_disposeAction可以当即释放。
  1. AnonymousDisposable里面咱们只看到了一些常规的保存等操做,结合咱们最开始分析序列的建立流程经验(AnonymousDisposable就相似于AnonymousObservable),咱们能够推断核心代码实现确定在订阅这一块。

  2. 接下来,咱们进入到observable.subscribe()方法来探究一些subscribe()的源码实现。

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
    -> Disposable {
    //1.这里定义disposable局部变量
    let disposable: Disposable
    //2.建立了Disposables对象
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    }
    else {
        disposable = Disposables.create()
    }
    //3.建立了一个AnonymousObserver对象,有一个重要的尾随闭包
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
            else {
                Hooks.defaultErrorHandler(callStack, error)
            }
            disposable.dispose() //这里当收到error事件就会回收释放资源
        case .completed:
            onCompleted?()
            disposable.dispose() //这里当收到completed事件就会回收释放资源
        }
    }
    
    return Disposables.create(
        self.asObservable().subscribe(observer),
        disposable//这里将咱们建立的局部变量传给了self.asObservable().subscribe,也就是咱们的Producer.subscribe
    )
}
复制代码

分析上面subscribe()源码,结合开始的分析,咱们能够得出如下结论:

  • subscribe()建立了一个Disposable对象,并保存了销毁回调闭包,当执行销毁时,会把消息回调出去。
  • 在收到错误或者完成事件时会执行disposable.dispose()释放资源。
  • return Disposables.create( self.asObservable().subscribe(observer), disposable ),这里返回的Disposable对象就是咱们外面手动调用dispose.dispose()方法的dispose对象,或者说是加入到全局的DisposeBag的销毁者。
  1. 由6的分析,咱们清楚知道最后一行代码return Disposables.create( self.asObservable().subscribe(observer), disposable )时关键点,咱们接下进入:Disposables.create()源码:
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
    return BinaryDisposable(disposable1, disposable2)//返回一个二元销毁者对象。
}
复制代码

上面代码咱们看到create()直接返回了一个BinaryDisposable二元销毁者类对象,并将disposable1disposable2传入给了BinaryDisposable

  • 这里的disposable1就是self.asObservable().subscribe(observer) 也就是Producer..subscribe(observer)返回的disposer
  • disposable2就是咱们subscribe()中建立局部变量let disposable: Disposable
  1. 接着咱们来分析BinaryDisposable类究竟是什么:
private final class BinaryDisposable : DisposeBase, Cancelable {

    private let _isDisposed = AtomicInt(0)

    // state
    private var _disposable1: Disposable?
    private var _disposable2: Disposable?

    /// - returns: Was resource disposed.
    var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    init(_ disposable1: Disposable, _ disposable2: Disposable) {
        self._disposable1 = disposable1
        self._disposable2 = disposable2
        super.init()
    }

    func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            self._disposable1?.dispose()
            self._disposable2?.dispose()
            self._disposable1 = nil
            self._disposable2 = nil
        }
    }
}
复制代码
相关文章
相关标签/搜索