在上一篇文章中,主要描述了RxSwift的核心逻辑,也就是一个序列从建立到订阅而后从发送消息到接收消息的整个流程是怎样串联起来的。还不太理解的同窗能够移步到上一篇文章了解一下。api
这篇文章主要来分析一下RxSwift的几个核心类和协议的实现和设计。bash
Observable
是可观察序列,是全部可观察序列的基类,咱们不会直接使用Observable
这个类,通常都是使用子类。Observable
也能够理解成抽象类,实际上不是抽象类,由于可观察序列最重要的一个订阅序列的方法subscribe
必须在其子类中重写。ide
咱们先来看看Observable
的源码:函数
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
复制代码
Observable
实现了一个协议ObservableType
,并且ObservableType
协议继承自ObservableConvertibleType
协议,因此在Observable
中实现了两个协议方法:subscribe
和asObservable
。subscribe
方法没有具体实现的逻辑,须要子类去实现。asObservable
方法返回的是self,看似用处不大,其实不是这样的。asObservable
是很是有用的,若是一类是Observable
的子类,咱们能够直接返回self,若是不是Observable
的子类,咱们能够经过重写这个协议方法来返回一个Observable
对象,这样保证了协议的一致性。在使用的时候咱们能够直接写相似self.asObservable().subscribe(observer)
这样的代码,有利于保持代码的简洁性,是良好的封装性的体现。因此我以为这个设计很是的好,在咱们平常开发中也能够借鉴。_ = Resources.incrementTotal()
和_ = Resources.decrementTotal()
这两行代码实际上是RxSwift内部实现的一个引用计数。这部份内容我会在后面的文章中再详解。composeMap<R>
优化map的一个函数,不太理解用处。Observable
子类很是多,这里不一一去看,主要区别在于对subscribe
方法的实现不同。final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
复制代码
AnonymousObservableSink
是Sink
的子类,AnonymousObservableSink
自己遵照ObseverType
协议,与此同时实现了run
方法,虽然没有实现subscribe
方法,可是已经足够了,这样AnonymousObservableSink
从某种程度来讲也是Observable
。AnonymousObservableSink
是Observer和Observable的衔接的桥梁,也能够理解成管道。它存储了_observer
和销毁者_cancel
。经过sink就能够完成从Observable到Obsever的转变。run
方法中的这行代码parent._subscribeHandler(AnyObserver(self))
,其中parent是一个AnonymousObservable
对象。_subscribeHandler
这个block调用,代码会执行到建立序列时的block。而后会调用发送信号的代码obserber.onNext("发送信号")
,而后代码会通过几个中间步骤会来到AnonymousObservableSink
类的on
方法。有问题或者建议和意见,欢迎你们评论或者私信。 喜欢的朋友能够点下关注和喜欢,后续会持续更新文章。post