RxSwift底层核心逻辑、流程分析

介绍

咱们都知道,RxSwift很强,做为一名开发者,阅读开源框架的源码,能让咱们受益颇多,学习优秀的开发者的思路是颇有必要的。html

当咱们写这么一份代码时,会疑惑为何他两个闭包之间会可以联系起来。git

Observable<String>.create { ob -> Disposable in
            ob.onNext("tets")
            return Disposables.create()
            }.subscribe(onNext: { str in
                print(str)
            }, onError: { error in
                print(error)
            }, onCompleted: {
                print("complete")
            })
复制代码

本文结尾项目地址: github.com/GitHubYhb/R…
这是一份帮助阅读RxSwift底层运行流程原理的RxSwift源码 我往其中加了一些打印的信息,从create到onNext每一步在哪里作了什么都很清楚。 github

下载项目以后 -> 运行 RxExample-iOS


那么废话很少说。直接开干。swift

Observable

Observable 原型是 Observable<Element>api

public class Observable<Element> : ObservableType {
    // ...部分省略
    
    // ObservableType 协议定义的方法。 
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    // ObservableConvertibleType 定义的方法。
    public func asObservable() -> Observable<Element> {
        return self
    }
}
复制代码

它遵循了ObservableType协议。
ObservableType协议中,要求遵循者须要有subscribe()方法。bash

public protocol ObservableType: ObservableConvertibleType {
    // ...省略一堆注释
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
extension ObservableType {
    //遵循ObservableConvertibleType 的方法
    public func asObservable() -> Observable<Element> {
       // ...省略实现代码
    }
}
复制代码

同时ObservableType协议,又遵循了ObservableConvertibleType协议。 在ObservableConvertibleType协议中,建立了叫Element的关联类型。
以及要求遵循者要有 asObservable()方法闭包

public protocol ObservableConvertibleType {
    associatedtype Element
    
    // E 从新命名 Element
    @available(*, deprecated, message: "Use `Element` instead.")
    typealias E = Element

    func asObservable() -> Observable<Element>
}
复制代码

由下图咱们能够看出来这三者之间的关系框架

看懂了Observable,咱们再来看看create(),找到Create.swift文件ide

create()

extension ObservableType {
    // ... 省略一堆注释
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
}
复制代码

吐槽:这句代码是真的又长又难看懂。函数

create() 是什么

首先,create()ObservableType的扩展
上文已经有提到ObservableType是一个protocol协议

因此,create()也是ObservableType下定义的一个方法,跟subscribe()同样。

解析

create()中有一个参数,名为subscribe的闭包,而且用_作了省略。
返回型是Disposable
看上去就是一长串,很不友好,为了更好的理解,请看我下面的代码

// 仿照 ObservableType 建立一个协议,并定义一个create方法
protocol TestProtocol {
    func create(_ subscribeBlock: @escaping (String) -> String) -> UILabel
}
// UILabel遵循这个协议
extension UILabel: TestProtocol{
    func create(_ subscribeBlock: @escaping (String) -> String) -> UILabel {
        //调用block
        let newStr = subscribeBlock("block干活")
        print("block返回 == " + newStr)
        return UILabel.init()
    }
}
复制代码

这份代码中
(String) -> String
------------ ↓ 对应 ------------
(AnyObserver<Element>) -> Disposable
而后来测试一下这段代码

let lb = UILabel.init()
lb.create { str in
    print("block传递出来 == " + str)
    return "test"
}
复制代码

看打印结果

block传递出来 == block干活
block返回 == test
复制代码

相信看到这里,你们内心已经有点明白的意思了。

  1. 在咱们调用完create()以后,闭包先返回给咱们一个AnyObserver<Element>类型的参数
  2. 咱们经过这个参数作了操做以后,就会再返回给闭包一个Disposable,给它操做。

那么这个AnyObserver<Element> 是什么呢

AnyObserver<Element>

咱们就看AnyObserver,它遵循了ObserverType协议
注意:这个协议跟上文的ObservableType不同,字母位数都不同的 - 0 -

public struct AnyObserver<Element> : ObserverType {
    //... 省略
}
复制代码

先看看ObserverType

public protocol ObserverType {
    associatedtype Element
    @available(*, deprecated, message: "Use `Element` instead.")
    typealias E = Element
    func on(_ event: Event<Element>)
}
extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
复制代码

从代码中咱们能够看到ObserverTypeObservableType都定义了一个关联类Element
不一样的地方在于定义的方法是func on(_ event: Event<Element>),而且在拓展中定义了三个方便开发者使用的方法,也就是咱们平时经常使用的onNext()、onCompleted() 、onError()

也就是说,咱们能够拐弯抹角的这么玩。

Observable<String>.create { ob -> Disposable in
    // 下面三行代码一个意思
    ob.on(Event<String>.next("any"))
    ob.on(.next("test"))
    ob.onNext("tets")
    return Disposables.create()
}
复制代码

既然已经到了Event了,不如先进去一探究竟

public enum Event<Element> {
    case next(Element)

    case error(Swift.Error)

    case completed
}
复制代码

Event是一个枚举,包含了咱们熟悉的三个枚举值,next/error/completed
还有一系列的扩展方法 好比map()这里先不一一列举了。

回到AnyObserver
看看完整的代码。

public struct AnyObserver<Element> : ObserverType {
    // 声明闭包 EventHandler
    public typealias EventHandler = (Event<Element>) -> Void
    // 私有常量 observer
    private let observer: EventHandler
    // 初始化方法 1
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    // 初始化方法 2
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    // on 方法
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
    // asObserver 方法
    public func asObserver() -> AnyObserver<Element> {
        return self
    }
}
extension AnyObserver {
    typealias s = Bag<(Event<Element>) -> Void>
}

复制代码

那么create()方法声明的部分就先到这里。接下来咱们看看,他后续返回AnonymousObservable(subscribe)

AnonymousObservable()

AnonymousObservable 翻译过来就是 匿名观察序列
看下面代码能发现:

  1. 这是一个Create.swift中的私有类
  2. AnonymousObservable继承自Producer<Element>
  3. 咱们上文中就是return了AnonymousObservable()初始化方法。
final private class AnonymousObservable<Element>: Producer<Element> {
    ...先不看他都干了什么
}
复制代码

既然是继承自Producer<Element>,那么不如先看看Producer<Element>

class Producer<Element> : Observable<Element> {
    ... 也先不看
}
复制代码

最终,他仍是Observable<Element>,函数要求的返回格式。

回到AnonymousObservable()

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
复制代码

上文有说到create()中return了AnonymousObservable()初始化方法。
那么咱们把注意力放到初始化方法
这里他把咱们传进来的闭包保存了起来。
那么,咱们是否是能够猜测,他保存起来以后,是否是能够想何时用就何时用呢。

因此当前,咱们return了AnonymousObservable()初始化方法以后,create()已经执行完毕。

那么如今咱们能够这么理解,下面这段代码

Observable<String>
    .create { (ob) -> Disposable in
        ob.on(Event<String>.next("any"))
        return Disposables.create()
    }.subscribe(onNext: { str in
    }, onError: { (error) in
    }, onCompleted: {
    })
复制代码

等同于

AnonymousObservable(subscribe)
    .subscribe(onNext: { str in
    }, onError: { (error) in
    }, onCompleted: {
    })
复制代码

下一步就是subscribe()

subscribe()

在说以前,我得先跟你们说一下,接下来咱们会遇到的subscribe()有两个,如图所示。

咱们这里就先把他们标记一下。

而后开始吧。

我作了一些打印,帮助我能更好的理解整个订阅过程。
这张图上主要作了两点。

  1. 建立了AnonymousObserver,这个跟AnonymousObservable不同,AnonymousObserver是观察者,而AnonymousObservable是被观察对象。虽然建立出来,可是并无进行调用。
  2. AnonymousObservable同样,把这个闭包做为一个临时变量。经过self.asObservable().subscribe(observer)传递出去。

那么被传递到哪里去了呢。咱们看到self,这是咱们当前的调用对象AnonymousObservable,他这里调用的subscribe(observer)是咱们上面作过标记的2

AnonymousObservablesubscribe()继承自Producer

这里主要是调用了run方法,又把传进来的observer传到别的地方,而后在当前的Producer里面的run是个抽象方法,咱们回到AnonymousObservable,他重写了这个run方法

能够看到他建立了一个AnonymousObservableSink,初始化时又传入了observer,一路跟进到Sink的初始化方法能够看到

AnonymousObservable时的操做相似,把 observer保存了起来。

在初始化AnonymousObservableSink后,有调用了另一个run方法。

在这个AnonymousObservableSink中,他重命名了AnonymousObservable
run方法接收到咱们传递过来的Parent(AnonymousObservable)
parent执行先前保存起来的闭包,并将AnyObserver(self)传递了进去,这样才能让保存起来的闭包有一个观察者来执行Event
这里的selfAnonymousObservableSink,经过AnyObserver转成观察者。

能够这么理解:经过AnyObserver()将先前保存起来的observer,提取出来
这时候,已经把两个保存起来的元素串联起来了。

开始执行闭包中的内容。
这是最开始我写好的闭包内容。

let ob = Observable<String>.create { ob -> Disposable in
            print("开始在咱们保存的闭包里面搞事,ob的值已经传进来了,就是AnyObserver(self)")
            ob.onNext("tets")
            return Disposables.create()
        }
        _ = ob.subscribe(onNext: { str in
            print(str)
        })
复制代码

那么,这个onNext是何时执行的呢?

onNext 何时执行

ob.onNext("tets")的本质是AnyObserver(self).onNext("tets")
selfAnonymousObservableSink
咱们须要看看AnyObserver(self)发生了什么

public struct AnyObserver<Element> : ObserverType {
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        print("调用AnyObserver(self)")
        self.observer = observer.on
    }
    ...
}
复制代码

初始化方法中,将observer.on保存了起来,也就是说
AnonymousObservableSinkon方法保存了起来
那么这个on方法都作了什么呢

func on(_ event: Event<Element>) {
        print("AnonymousObservableSink 的 on 方法 event == \(event.element!)")
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
复制代码

能够看到on方法又调用了一个forwardOn,再点进去看看

final func forwardOn(_ event: Event<Observer.Element>) {
        //...省略
        self._observer.on(event)
}
复制代码

记得咱们上文有提到在初始化AnonymousObservableSink时,把在subscribe()中建立的AnonymousObserver保存了起来,这里他就经过这个_observer,调用了on方法。
这时才跳到了AnonymousObserver

回到建立AnonymousObserver的地方

let observer = AnonymousObserver<Element> { event in
    ...省略不看
}
复制代码

看看AnonymousObserver的自己

final class AnonymousObserver<Element>: ObserverBase<Element> {
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}

复制代码

AnonymousObserver继承自ObserverBase,贯彻到底

class ObserverBase<Element> : Disposable, ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self._isStopped, 1)
    }
}
复制代码

咱们能够看到ObserverBase有一个on方法,还记得咱们最上面说过的吗

Observable<String>.create { ob -> Disposable in
    // 下面三行代码一个意思
    ob.on(Event<String>.next("any"))
    ob.on(.next("test"))
    ob.onNext("tets")
    return Disposables.create()
}
复制代码

这三行是同一个意思。也就是说,在咱们调用onNext时,会先调用on方法,在调用.next,而后就走到了self.onCore,当前的onCore也是一个抽象方法。
咱们回到AnonymousObserver重写onCore

override func onCore(_ event: Event<Element>) {
    return self._eventHandler(event)
}
复制代码

self._eventHandler(event)就会执行这段红框代码

直到这里,才执行到了onNext,才会到咱们本身在外部写的代码。

看起来可能很乱,这里作一下

总结

  1. 咱们写的闭包中ob.onNext()调用时,首先调用了AnonymousObservableSinkon方法
  2. AnonymousObservableSinkon方法调用了self.forwardOn(event)
  3. forwardOn(event)经过先前保存起来的_observer,调用了AnonymousObserveron方法
  4. AnonymousObserveron方法调用重写父类的self.onCore(event)
  5. onCore(event)调用AnonymousObserverself._eventHandler(event),这个_eventHandler就是上面红框里面的内容被保存了下来。
  6. 红框内接收到self._eventHandler(event)传递来的event,判断event类型,执行咱们本身在外部写的内容。

或许仍是很乱,我调整了一下个人打印内容,一步一步的都写得很清楚。

相关文章
相关标签/搜索