RxSwift源码分析(二)-Observable和AnonymousObservableSink解析

上一篇文章中,主要描述了RxSwift的核心逻辑,也就是一个序列从建立到订阅而后从发送消息到接收消息的整个流程是怎样串联起来的。还不太理解的同窗能够移步到上一篇文章了解一下。api

这篇文章主要来分析一下RxSwift的几个核心类和协议的实现和设计。bash

Observable类解析

Observable是可观察序列,是全部可观察序列的基类,咱们不会直接使用Observable这个类,通常都是使用子类。Observable也能够理解成抽象类,实际上不是抽象类,由于可观察序列最重要的一个订阅序列的方法subscribe必须在其子类中重写。ide

咱们先来看看Observable的源码:函数

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}
复制代码
  • Observable实现了一个协议ObservableType,并且ObservableType协议继承自ObservableConvertibleType协议,因此在Observable中实现了两个协议方法:subscribeasObservable
  • subscribe方法没有具体实现的逻辑,须要子类去实现。
  • asObservable方法返回的是self,看似用处不大,其实不是这样的。asObservable是很是有用的,若是一类是Observable的子类,咱们能够直接返回self,若是不是Observable的子类,咱们能够经过重写这个协议方法来返回一个Observable对象,这样保证了协议的一致性。在使用的时候咱们能够直接写相似self.asObservable().subscribe(observer)这样的代码,有利于保持代码的简洁性,是良好的封装性的体现。因此我以为这个设计很是的好,在咱们平常开发中也能够借鉴。
  • _ = Resources.incrementTotal()_ = Resources.decrementTotal()这两行代码实际上是RxSwift内部实现的一个引用计数。这部份内容我会在后面的文章中再详解。
  • composeMap<R>优化map的一个函数,不太理解用处。
  • Observable子类很是多,这里不一一去看,主要区别在于对subscribe方法的实现不同。

AnonymousObservableSink类解析

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
复制代码
  • AnonymousObservableSinkSink的子类,AnonymousObservableSink自己遵照ObseverType协议,与此同时实现了run方法,虽然没有实现subscribe方法,可是已经足够了,这样AnonymousObservableSink从某种程度来讲也是Observable
  • AnonymousObservableSink是Observer和Observable的衔接的桥梁,也能够理解成管道。它存储了_observer和销毁者_cancel。经过sink就能够完成从Observable到Obsever的转变。
  • run方法中的这行代码parent._subscribeHandler(AnyObserver(self)),其中parent是一个AnonymousObservable对象。_subscribeHandler这个block调用,代码会执行到建立序列时的block。而后会调用发送信号的代码obserber.onNext("发送信号"),而后代码会通过几个中间步骤会来到AnonymousObservableSink类的on方法。

有问题或者建议和意见,欢迎你们评论或者私信。 喜欢的朋友能够点下关注和喜欢,后续会持续更新文章。post

相关文章
相关标签/搜索