一个帮助咱们简化异步编程的Swift框架。git
Observable
:产生事件Observer
:响应事件Operator
:建立变化组合事件Disposable
:管理绑定(订阅)的生命周期Schedulers
:线程队列调配Observable
与Observer
之间的关系let _ = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}.subscribe(onNext: { (num) in
print("receive num \(num)")
}, onError: { (error) in
print("error: \(error.localizedDescription)")
}, onCompleted: {
print("receive complete")
})
复制代码
如上代码出现两个重要的方法create
和subscribe
。顾名思义,create
方法是建立一个Observable
对象,而subscribe
方法是建立一个订阅事件。咱们先关注下create
方法如何建立一个Observable
对象。github
//Create.swift
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
复制代码
首先看它传入的参数为一个闭包:AnyObserver<Element> -> Disposable
,而后返回的是一个Observable<Element>
对象。对比咱们的例子,咱们能够肯定Element
为咱们指定的Int
,即泛型Element
表示数据源类型。编程
上面返回的是一个AnonymousObservable
对象,并将闭包做为参数传入。swift
//Create.swift
final private class AnonymousObservable<Element> : Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
}
复制代码
AnonymousObservable
将传入的闭包赋值给变量_subscribeHandler
。至此建立完了一个Observable
对象。而后执行其subscribe
方法:api
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
} else {
disposable = Disposables.create()
}
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(self.asObservable().subscribe(observer), disposable)
}
复制代码
这一段代码比较长,咱们先从参数下手,能够看到参数中包括onNext
(产生下一个事件)、onError
(产生错误)、onCompleted
(产生完成)和onDisposed
四个不一样的闭包。咱们先暂时无论Disposed
部份内容,直接看到下面相关代码:闭包
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
复制代码
上面代码建立了一个AnonymousObserver
对象,并将参数的闭包事件与自身产生的event
事件关联在一块儿。框架
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler: EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
}
复制代码
AnonymousObserver
对象将上面关联的一个事件转换闭包做为参数存储到变量_eventHandler
中。其实能够简单地理解AnonymousObserver
对象将上面subscribe
方法中的参数闭包存储起来了。异步
再回到subscribe
方法,看到最后一句代码:ide
return Disposables.create(self.asObservable().subscribe(observer), disposable)
复制代码
咱们关注到这里self.asObservable().subscribe(observer)
,首先调用了asObservable()
方法:异步编程
//ObservableConvertibleType.swift
public protocol ObservableConvertibleType {
associatedtype Element
func asObservable() -> Observable<Element>
}
//Observable.swift
public class Observable<Element> : ObservableType {
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
public func asObservable() -> Observable<Element> {
return self
}
}
复制代码
如上所示,咱们能够看到asObservable()
方法返回的本身自己Observable
。但咱们看到这里对应的subscribe
方法为"抽象方法",上面咱们建立的是AnonymousObservable
对象,在它的父类Producer
中实现了:
class Producer<Element> : Observable<Element> {
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
复制代码
一样咱们先暂时无论Scheduler
相关内容,这里调用了self.run()
方法,但它自己并未实现该方法,一样咱们在AnonymousObservale
中能够找到:
final private class AnonymousObservable<Element> : Producer<Element> {
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Element == Observer.Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
复制代码
在run
方法中建立了一个AnonymousObservableSink
方法,而后调用了它的run
方法。
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
private let _isStopped = AtomicInt(0)
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
复制代码
咱们先看到其run
方法,能够看到它执行了parent._subscribeHandler(AnyObserver(self))
,这一句很关键,这里的parent
其实指的是咱们在调用create
方法时,建立的AnonymousObservable
对象。所以,这里的_subscribeHandler
就是咱们create
方法传递的参数闭包。咱们能够看到这里建立了一个AnyObserver
对象传入到闭包中。
回到例子中的闭包内容:
{ (observer) -> Disposable in
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}
复制代码
这里调用了onNext
方法产生元素1
:
public protocol ObserverType {
associatedtype Element
func on(_ event: Event<Element>)
}
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
}
复制代码
这里调用了on
方法传递元素,而咱们上面知道这里的ObserverType
是AnyObserver
对象:
public struct AnyObserver<Element> : ObserverType {
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
}
复制代码
接着调用了self.observer(event)
将事件传递下去,而这里的observer
是在建立parent._subscribeHandler(AnyObserver(self))
时传入的。即self.observer = observer.on => AnonymousObservableSink.on
。
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
复制代码
所以,事件会传递到AnonymousObservableSink
中,并经过fowardOn
方法继续传递事件.
class Sink<Observer: ObserverType>: Disposable {
fileprivate let _observer: Observer
fileprivate let _cancel: Cancelable
init(observer: Observer, cancel: Cancelable) {
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<Observer.Element>) {
if isFlagSet(self._diposed, 1) {
return
}
self._observer.on(event)
}
}
复制代码
这里调用了self._observer.on(event)
方法传递事件,而这里的_observer
对象就是咱们在调用subscribe
方法时,传递进来的AnonymousObserver
对象。而AnonymousObserver
自己没有实现on
方法,而是在父类ObserverBase
中实现了:
class ObserverBase<Element> : Disposable, ObserverType {
private let _isStopped = AtomicInt(0)
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
复制代码
最后调用了onCore
方法传递事件:
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler: EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
复制代码
而这里的_eventHandler
便是咱们调用subscribe
方法时,建立的闭包(将外部的Event
和内部的Event
关联)
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
....
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(self.asObservable().subscribe(observer), disposable)
}
复制代码
所以,咱们会在外部接收到receive num 1
的事件消息。