RxSwift (二)序列核心逻辑分析

Rxswift(一)函数响应式编程思想编程

RxSwift (二)序列核心逻辑分析swift

RxSwift (三)Observable的建立,订阅,销毁设计模式

RxSwift(四)高阶函数api

RxSwift(五)(Rxswift对比swift,oc用法)markdown

Rxswift (六)销毁者Dispose源码分析闭包

RxSwift(七)Rxswift对比swift用法ide

RxSwift (十) 基础使用篇 1- 序列,订阅,销毁函数

RxSwift学习之十二 (基础使用篇 3- UI控件扩展) @TOCoop

RxSwift序列核心逻辑

上一篇博客:Rxswift学习之(一)函数响应式编程思想只是简单的分析了序列的核心逻辑。本篇博客主要针对上一篇作一下更加深刻的探讨,若是有那些地方分析有误,还请留言:QQ:282889543,让咱们彼此提升,彼此成就。源码分析

总的来讲分析Rxswift的核心逻辑仍是按照三部曲:建立序列,订阅序列,销毁序列。核心思想是万物皆序列。

1. 序列的建立

Observable可观察者序列

咱们先来看下建立Observable所涉及的类的继承关系: 以下图:

针对上面的类图,简单分析下类的关系和设计思想: 首先分层实施的很完全,每一层都只解决一件事情,一层层叠起来结构很是清晰: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType

其次咱们简单分解一下每一个类都作了些什么:

  • ObservableConvertibleType:顾名思义便可转换为Observable 类型协议,方法只有一个asObservable,这有什么好处呢?
  1. 用户不须要关注其具体是哪一个类型对象
  2. 让用户更多的关注其核心功能
  • ObservableType:也是个协议,继承了ObservableConvertibleType协议的asObservable,它提供抽象方法subscribe,即咱们常说的订阅,只有外部订阅了该对象,才能真正实现对该对象进行观察。
  • Observable:真正的类,能够称之为元类,对于用户来讲Observable 的功能是完整的,由于它已经具有了全部的用户所须要的功能,尽管有些方法并无获得实现还是抽象方法。 Producer: 它继承了Observable的全部方法,并实现subscribe 方法
  • AnonymousObservable:它继承了Producer的全部方法,而且增长了属性let _subscribeHandler: SubscribeHandler用来保存建立序列时传入的闭包,也就相对于拥有了调用这个序列的能力,此外它还实现run方法,这也是建立序列最核心关键的方法。在run()方法中它建立一个AnonymousObservableSink final private类的对象,而这个对象sink能够称之为管子,它相似于manager的角色,拥有序列和订阅,销毁能力。这里有两个疑惑:

问题1. AnonymousObservableSink为何要定义成final private类,不能被继承,也不能被外部访问? 问题2. 建立的Observable是如何关联到订阅的?

这两个问题咱们后面再分析。

最后,咱们总结一下设计思想:

事实上用户所使用的 Observable ,都是 Producer 的子类和AnonymousObservable平行的子类,只不过用户不须要关心其具体实现罢了 每个相似AnonymousObservable的类,还有一个与之相关的类AnonymousObservableSink,Sink即管道,全部这些组合起来才能让其真正运行起来,AnonymousObservableSink同时拥有序列,订阅功能,相似于咱们项目中常常用的manager角色。 整个设计向上经过组合协议的方式描述其特性,向下经过子类化的方式屏蔽其实现细节,相似于工厂模式,这样的类也能够叫类簇。

序列建立的流程

经过上面类继承关系,其实咱们不难理解序列的建立流程,它确实也是只有比较简单的几部,寥寥几行代码就搞定了,难点是上面抛出的几个问题:

下面咱们将经过一个简单的Rxswift的实例来分析一下序列的建立,订阅,销毁直接的流程和关系。

实例1

//1. 建立序列
   let ob = Observable<Any>.create { (obserber) -> Disposable in
            // 3:发送信号
            obserber.onNext("kyl发送了信号")
            obserber.onCompleted()
            return Disposables.create()
        }
    
        // 2:订阅信号
        let _ = ob.subscribe(onNext: { (text) in
            print("订阅到:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("销毁")
        }
复制代码

上面实例1 的这段代码能够用酷C老师的一个图来清晰的表达:

从上面的代码和关系图,咱们可能会产生这样一个疑惑:

问题3: 建立的ob序列,仅仅只是经过ob.subscribe()订阅了一下,为何咱们在ob建立时的尾随闭包(咱们这里给个名字叫闭包A)里面调用了obserber.onNext("kyl发送了信号")这个代码,咱们就能够订阅到let _ = ob.subscribe(onNext: { (text) in print("订阅到:\(text)") } 这里会打印:”订阅到:kyl发送了信号“。咱们没有看见他们之间有任何关联,怎么ob发送消息,subcribe()的onNext闭包就能够触发呢,这是为何呢?

咱们能够这里能够简单推理下:ob.subscribe()这个订阅方法确定作了一些事情,在某个地方调用了闭包A,才能实现这个功能。具体是怎么样实现的呢?下面咱们将经过分析源码来解答这个疑惑。

从上面的代码咱们能够知道,建立序列就一行代码:let ob = Observable<Any>.create { (obserber) -> 而这一行代码实际上是作了好多事情的。

首先咱们经过一个流程图来初略的了解一下序列建立流程:

建立序列的Rxswift原码很简单,从上图能够看出,直接一行代码return AnonymousObservable(subscribe)就结束了,这里咱们们并无找到咱们须要的答案,甚至咱们有点愈来愈晕感受。

  • AnonymousObservable类源码
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
复制代码

咱们先作个深呼吸,放轻松,此路不通那咱们来尝试分析其余方向,不能在一棵树上吊死。下面咱们来分析一下订阅的流程。

2.订阅

回顾上面实例1中的订阅代码:let _ = ob.subscribe(onNext: { (text) in这行代码又作了些什么事情呢?下面咱们经过源码来深刻分析一下:

  • Rxswift订阅subscribe()的源码以下:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            
            ... 上面代码不是咱们要分析的重点,...表示忽略了这次的一段源码
            /*注意,这次定义了一个AnonymousObserver()对象,以参数的形式, 构造方法里面传入了一个尾随闭包eventHandler, 在这个闭包里面,当收到event的不一样事件, 会触发并调用,咱们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包 */
            let observer = AnonymousObserver<E> { event in
          
                ...
                
                switch event {
                case .next(let value):
                    onNext?(value) //调用订阅时传入的ob.subscribe(onNext:闭包
                case .error(let error):
                    if let onError = onError {
                        onError(error)//调用订阅时传入的ob.subscribe(onError:闭包
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()//调用订阅时传入的ob.subscribe(onCompleted:闭包
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )/*这里直接返回了Disposables对象,用来释放资源, 在它的构造函数里面直接调用了self.asObservable().subscribe(observer), 而asObservable()就是咱们建立的序列ob,也就是ob.subscribe(), 并传入了,在这段代码里面建立的局部变量let observer = AnonymousObserver<E>,*/
    }
复制代码

经过上面源码咱们能够知道:subscribe()这个方法,以参数的形式传入了onNext()闭包,onError()闭包,onComplete()闭包,在函数里面建立了一个AnonymousObserver对象observer,这个对象建立的时候传入了一个闭包,当收到不一样event事件时,会分别调用咱们subscribe()传入的onNext,onError,onComplete这三个闭包。最重要一点是return Disposables.create( self.asObservable().subscribe(observer), disposable )这句代码调用了咱们真正的subscribe()函数,并以参数的形式传入了AnonymousObserver对象,self.asObservable()就是咱们create()函数建立的序列ob, 而到此处咱们能够清晰的看到,咱们订阅时传入参数闭包和咱们建立的ob创建了一个链条。

这里咱们又有一个疑问:self.asObservable()为何就是咱们create()函数返回的ob呢?

要解答这个问题,我须要回顾一下上面分析的Observable类的继承关系:Observable -> ObservableType -> ObservableConvertibleType 即Observable继承ObservableType协议,ObservableType又继承ObservableConvertibleType协议,而咱们的ObservableConvertibleType提供了抽象方法asObservable(),咱们Observable类中实现了asObservable()这个方法,它直接返回self就它本身。

下面经过源码来证明:

///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {

    ...
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    ...
}
复制代码

分析了Rxswift订阅subscribe()的源码感受很是nice, 咱们找到了咱们ob 建立时传入的闭包和咱们订阅时的闭包存在了一条链条关系,也就是只要ob发送了消息,那咱们的订阅者必定能够按照这个链条收到消息。可是咱们仍是不知道究竟是怎么调用的,怎么触发的。

并且咱们注意到self.asObservable().subscribe(observer)也就是AnonymousObservable调用了subscribe()方法,可是在AnonymousObservable类中咱们并无找到subscribe()的定义,因此咱们须要来看他的父类Producer

  • Producer的源码以下:
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            /*重点在这里了,这里调用了run()方法,一切疑惑都清晰了,咱们知道了run()调用时传入了observer,而且建立了sink管子,而这个管子具有了序列的功能,能够调用on()方法。 */
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

复制代码

果真不出咱们所料,在Producer中咱们找到了subscribe()的方法定义,到此咱们能够总结出很清晰的几条线索了

  • (1)经过前面的类继承关系能够知道是Producer实现了ObservableType协议的subscribe()方法。在这个方法里面调用了self.run(observer, cancel: disposer)
  • (2) self.run()实际上就是AnonymousObservable.run(), 这这个方法里面作了三件事情:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
//1.建立了一个sink管道对象,并将observer也就create()建立
//序列时传入的闭包传给了sink
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        //2. sink调用本身的run()方法,并把AnonymousObservable做为参数传入。
        let subscription = sink.run(self)
        //返回一个元组,包含sink管道信息。
        return (sink: sink, subscription: subscription)
    }
复制代码
  • (3)AnonymousObservableSink类中run()方法中调用parent._subscribeHandler(AnyObserver(self)) 其中parent就是咱们(2)中sink.run(self)传入的self,也就是AnonymousObservable对象;而且咱们前面已经知道_subscribeHandler就是建立序列时保存的那个经过create()函数参数传入的 闭包:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:发送信号 obserber.onNext("kyl发送了信号") obserber.onCompleted() return Disposables.create() }。 如今已经很清晰了parent._subscribeHandler(AnyObserver(self)) 执行闭包,这行代码就会调用obserber.onNext("kyl发送了信号")这个行代码。

  • 如今咱们能够经过一个流程图来总结咱们代码执行的流程:

上面的订阅序列流程分析:咱们弄明白了从订阅序列到调用create()函数时传入的参数闭包调用的逻辑,可是这个闭包发送onNext()信号后,怎么到订阅消息的onNext()闭包咱们还不是很清晰。所以咱们须要分析AnonymousObserver

咱们先来看下AnonymousObserver

  • AnonymousObserver源码定义以下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    /*构造函数,保存了EventHandler尾随闭包*/
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler
    }

    //覆写了onCore方法,调用了EventHandler闭包
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}

复制代码

AnonymousObserver源码中咱们并无找到onNext()方法,那咱们只能沿着它的继承链往上查找,这里须要了解一下类的继承关系:

  • AnonymousObserver的继承关系:

经过分析类的继承关系,咱们得知:这样一个关系链:

AnonymousObserver对象的on()方法会调用onCore()方法,ObserverType里面有onNext,onError,onComplete方法。可是on()是如何调用的,什么时候调用的呢?

要解决这个疑问,咱们须要再次回到咱们建立序列的代码:

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
复制代码

建立序列的create()方法传入了一个subscribe闭包,并返回了AnonymousObservable对象。其中subscribe闭包就是咱们序列建立时参数形式传入 闭包。而且AnonymousObservable初始化时将这个闭包保存起来了self._subscribeHandler = subscribeHandler AnonymousObservable 有一个run()方法,run方法里面建立了一个AnonymousObservableSink对象sink,具体源码以下:

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
复制代码

分析了这么久,绕了一圈,终于发现关键就在AnonymousObservableSink管子这个对象里面了。sink这是个神奇的管子。它就保存了序列,也保存了订阅,还保存了用于销毁的disposed 也就是同时拥有了建立序列,订阅序列,销毁序列功能。

咱们来分析下AnonymousObservableSink的源码:

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    //这里的Parent就是咱们上面分析的AnonymousObservable,很是重要
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

// 构造方法,传入了observer序列,和Cancelable
    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

//这里实现 了ObserverType协议的on()方法
    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            //调用了父类的发布,self.forwardOn()会调用本身的on()方法
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
    /*调用了_subscribeHandler闭包,这个闭包就是咱们以前建立序列时传入闭包。 parent就是传入进来的序列,这里序列的闭包里传入了self而且强转为AnyObserver 这里将self传给了闭包_subscribeHandler,这样_subscribeHandler也就具有了subcribe的能力。 */
        return parent._subscribeHandler(AnyObserver(self))
    }
}
复制代码

其中Sink类的源码以下:

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        //这里调用了传入observer.on()方法,
        self._observer.on(event)
    }

    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return isFlagSet(self._disposed, 1)
    }

    func dispose() {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}
复制代码

从源码分析咱们得知:

  • 咱们的sink保存了咱们的序列,当咱们调用ob.onNext()发送信号时,因为咱们的sink已经持有了ob, 这样sink会调用on()方法,在on()方法里面会调用self.forwardOn(event),而在fowardOn()里面会调用self._observer.on(event)。这样个人疑问就解决了,答案就是sink调用了on()方法。

  • 这里咱们再来总结一下总的流程:

  1. 建立序列时create()返回了一个ob, 这个ob就是序列,建立的时候传递了一个闭包A。在闭包A中调用了ob.onNext()发送了信号。
  2. 订阅序列时调用ob.subscribe()方法,这个方法会建立一个AnonymousObserver对象,并调用了self.asObservable().subscribe(observer)
  3. self.asObservable()实际就是咱们的ob, 也就是ob调用了subscribe().而AnonymousObserver中没有找到subscribe()。
  4. 咱们在AnonymousObserver的父类中找到了subscribe(),发现subscribe()调用了AnonymousObserver的run()方法。
  5. 在AnonymousObserver的run()方法中,建立了一个管子sink,并调用了sink.run(self),sink是AnonymousObservableSink的对象,而在sink的run()方法中parent._subscribeHandler(AnyObserver(self))调用了建立序列时保存的闭包A (parent就是AnonymousObserver),这样就解释了订阅时,回调了A闭包的缘由。
  6. 至于怎么调用onNext()方法也是经过sink来实现的。
  7. sink已经持有了ob ,当咱们在A闭包里面调用ob.onNext()发送信号时,实际会经过sink.on()来调用。首先sink.on()会调用forwardOn().
  8. 在forwardOn()中会调用self._observer.on(event)。
  9. _observer.on()会调用_observer.onCore()
  10. _observer.onCore(event)会根据event的类型判断是调用onNext(),onError(),onComplete()中间一个,因为咱们传递的是onNext事件,因此会调用onNext() ,而这个_observer.onNext()会调用咱们订阅时传入闭包subscribe(onNext:).
  11. 为何回调的缘由是:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            
            ... 上面代码不是咱们要分析的重点,...表示忽略了这次的一段源码
            /*注意,这次定义了一个AnonymousObserver()对象,以参数的形式, 构造方法里面传入了一个尾随闭包eventHandler, 在这个闭包里面,当收到event的不一样事件, 会触发并调用,咱们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包 */
            let observer = AnonymousObserver<E> { event in
          
                ...
                
                switch event {
                case .next(let value):
                    onNext?(value) //调用订阅时传入的
复制代码

这里调用ob.subscribe()的时候,咱们建立了AnonymousObserver和咱们subscribe()传入的onNext()闭包作了一个绑定,当AnonymousObserver.onNext()调用的时候一定会回调subscribe()传入的onNext()闭包。而10中的_observer对象指的就是let observer = AnonymousObserver

  • 仍是经过这张图来解释最简洁:

3. 销毁

RxSwift给咱们的展现的设计思惟

iOS 经常使用设计模式

相关文章
相关标签/搜索