篇幅稍微有点长,了解程度不一样,能够跳过某些部分。api
- 若是对源码比较熟悉的,建议直接看图就好了,时序图更加清晰。第一次摸索有必要阅读文字内容。
- 贴出来的代码省略了没必要要的部分,用省略号代替。
RxSwift的基础用法就是很简单的几步:bash
//建立序列
let ob = Observable<Any>.create { (observer) -> Disposable in
//发送信号
observer.onNext("今日份麻酱凉皮")
observer.onCompleted()
return Disposables.create()
}
//订阅信号
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
控制台输出:
订阅到:今日份麻酱凉皮
完成
销毁
复制代码
在看源码以前,应该对接触到的类和协议有些认识,方便以后的理解。下面的关系图在须要的时候回头熟悉一下就行:闭包
究竟是什么在支撑如此便捷的调用?ide
第一句Observable<Any>.create
建立了一个可观察序列Observable
对象,第二句就是这个Observable
序列对象订阅了消息。函数
从输出能够看出,都是订阅到的消息。那么订阅时传入subscribe
的闭包是何时调用的呢?fetch
单从如今的几句代码,也能猜出是第一句代码的闭包中的observer.onNext
的调用引发的。可是,咱们也没有看到这个**create
函数中的闭包是在哪里执行的?**ui
为了可以清晰的描述,暂且称第一句
create
中的闭包为**create闭包
,第二句subscribe
中的几个闭包为subscribe闭包
**。spa
外面看不出来,那咱们只能进去RxSwift
里面探索下create
和subscribe
到底作了什么?线程
create
函数的实现:code
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
复制代码
原来这函数内部其实是建立了一个AnonymousObservable
匿名可观察序列对象。而以前的闭包也是给AnonymousObservable
对象初始化用了。
AnonymousObservable
类:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
复制代码
这里在初始化的时候把create闭包
赋值给了_subscribeHandler
属性。
到此为止,
Observable<Any>.create
函数实际上建立了一个AnonymousObservable
匿名可观察序列对象,并保存了create闭包
。
。。。貌似这条不是主线啊!没有找到任何一个问题的答案。
再来翻翻**subscribe
函数**:
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
......
let observer = AnonymousObserver<E> { event in
......
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
复制代码
这也是对定义在ObservableType
协议中的函数的实现,返回了一个Disposable
。这个Disposable
就是用来管理订阅的生命周期的,示例代码中并无体现出来,实际是在订阅信号的内部处理的。前面都没什么,后面建立了AnonymousObserver
,而且在AnonymousObserver
初始化时传入了闭包,并赋值给_eventHandler
属性。
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
复制代码
以前,
AnonymousObservable
匿名序列对象,保存了create闭包
。 此时,建立了AnonymousObserver
匿名观察者对象,保存了对subscribe闭包
的回调执行的EventHandler闭包
。
又一条支线。。。一路走来,都是在建立对象,保存闭包。两个主线疑问仍是无迹可寻。难道一开始就走上了歧路?非也!继续看下去就明白了什么叫「柳暗花明又一村」。
AnonymousObservable
的subscribe
函数中,在建立了AnonymousObserver
对象后,还返回了一个新建的Disposable
对象。重点就在这里的第一个参数:self.asObservable().subscribe(observer)
中。asObservable
仍是返回了self
,后面贴上的ObserverType
中能够看到。剩下的就是AnonymousObservable
的父类Producer
中的subscribe
了:
class Producer<Element> : Observable<Element> {
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
......
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
}
复制代码
因为是在当前线程中执行的,只看else
那部分。disposer
相关的不用关心。关键是observer
参数,这个参数中有对subscribe闭包
的处理的EventHandler闭包
。observer
传入了self.run(observer, cancel)
。因此,还要回头再看看**AnonymousObservable
类中的run
函数**:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
复制代码
这里又建立了一个AnonymousObservableSink
对象,observer
和cancel
继续往初始化函数中丢:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
......
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
复制代码
前面sink.run(self)
把self
传了进来,又对AnonymousObservable
起了别名Parent
。parent._subscribeHandler
不就是AnonymousObservable
在调用它最开始保存的那个create闭包
么?AnyObserver(self)
则把AnonymousObservableSink
做为AnyObserver
初始化的参数。
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
public func asObserver() -> AnyObserver<E> {
return self
}
}
复制代码
其中还把AnonymousObservableSink
的on
函数赋值给了AnyObserver
的属性observer
,observer
就是EventHandler
。这个EventHandler
在create闭包
中会用到。
这不就是第二个主线疑问(create
函数中的闭包什么时候调用)的答案么!
整理一下:
subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable
函数中建立Disposable
开始AnonymousObservable
调用subscribe(observer)
AnonymousObservable
调用run(observer, cancel)
AnonymousObservableSink(observer: observer, cancel: cancel)
,而且sink.run(self)
parent._subscribeHandler(AnyObserver(self))
这是一条从
subscribe闭包
-->create闭包
的线。
还没完,还有个**create闭包
中怎么触发subscribe闭包
的?**
又臭又长的写了这么多,这里就只看observer.onNext("今日份麻酱凉皮")
吧。点进去看: observer.onNext("今日份麻酱凉皮")
:
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
复制代码
onNext
中调用了on
,在前面AnyObserver
结构体定义中能够看出,on
函数中返回了self.observer(event)
,以前是把AnonymousObservableSink
的on
赋值给了这个self.observer
,因此,此时会走到AnonymousObservableSink
的on
函数中。这里面又调用了self.forwardOn(event)
,看下AnonymousObservableSink
的父类Sink
中定义的forwardOn
:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
......
init(observer: O, cancel: Cancelable) {
......
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
......
self._observer.on(event)
}
}
复制代码
forwardOn
中走了一句self._observer.on(event)
。这里的_observer
属性不就是AnonymousObservableSink
初始化时传入的AnonymousObserver
对象么!
继续跟AnonymousObserver
的on
:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
......
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
复制代码
这里没有on
,看父类ObserverBase
:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
复制代码
这里的on
函数中,有个.next
分支,调用了self.onCore(event)
。子类AnonymousObserver
实现的onCore
中又调用了self._eventHandler(event)
。
这个_eventHandler
是什么?不就是AnonymousObserver
初始化时保存的对subscribe闭包
处理的闭包么!因此create闭包
中的observer.onNext("今日份麻酱凉皮")
就能触发subscribe闭包
了。
这是一条从
AnonymousObservable
调用_subscribeHandler
(也就是create闭包
)时的参数AnyObserver
-->subscribe闭包
的线。
如今咱们看清楚了响应式的数据流:
看清楚了么?好像清楚的不太明显。毕竟好几个类、协议,又那么多函数调来调去的。加把劲再撸一撸。既然这么五花八门的调用流程搞清楚了,那就来弄清楚它主要都作了什么?
一图概千言,既然画了图,就少敲点键盘吧!
仔细看了流程图就会发现,出现的几个类中,干活的主要是Anonymous
开头的那三个类。咱们在外面调用的操做,实际上是在使用RxSwift
内部封装的一些类。
AnonymousObservable
,并保存create闭包
AnonymousObserver
,而且把对subscribe闭包
的操做封装成了EventHandler
AnonymousObservable
来干。
Disposable
中,由subscribe(observer)
,把AnonymousObserver
传给了AnonymousObservableSink
AnonymousObservableSink
才是信息处理的核心,由于他知道的太多了AnonymousObservableSink
有AnonymousObserver
观察者,AnonymousObserver
持有着EventHandler
。AnonymousObservableSink
在调用run
函数时也传入了AnonymousObservable
序列,AnonymousObservable
就是create闭包
的持有者。AnonymousObservableSink
初始化的时候,除了观察者外,还有个管理序列生命周期的Disposable
。AnonymousObservableSink
做为一个内部类,在被create闭包
当作参数回调给外界时须要转换为AnyObserver
,在这里AnyObserver
则是以闭包属性的形式保留了AnonymousObservableSink
的on
函数AnyObserver
经过这个属性值联系到AnonymousObservableSink
订阅信号:
Observable
-->AnonymousObservable
-->AnonymousObserver
-->AnonymousObservableSink
-->AnyObserver
-->create闭包
create闭包
中调用AnyObserver
的onNext
开始AnyObserver.observer
访问闭包中的AnonymousObservableSink
AnonymousObservableSink
拥有AnonymousObserver
AnonymousObserver
掌控EventHandler
发出信号:
create闭包
-->AnyObserver
-->AnonymousObservableSink
-->AnonymousObserver
-->subscribe闭包