在RxSwift中主要有以下四个成员:api
Observable
Observer
Scheduler
Dispose
若是这四个都弄明白了,那么能够说整个RxSwift也就弄明白了。这篇文章来具体分析调度者 - Scheduler
安全
Schedulers
是
Rx
实现多线程的核心模块,它主要用于控制任务在哪一个线程或队列运行,它内部的实现是对
GCD
和
OperationQueue
进行了封装。 若是你曾经使用过 GCD, 那你对如下代码应该不会陌生,功能都是从多线程获取数据而后到主线程刷新UI:
// 后台取得数据,主线程处理结果
DispatchQueue.global(qos: .userInitiated).async {
let data = try? Data(contentsOf: url)
DispatchQueue.main.async {
self.data = data
}
}
复制代码
若是用 RxSwift
来实现,大体是这样的:bash
let rxData: Observable<Data> = ...
rxData
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] data in
self?.data = data
})
.disposed(by: disposeBag)
复制代码
感觉了Scheduler
的使用以后,来看看里面具体是如何实现的。多线程
CurrentThreadScheduler闭包
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/// The singleton instance of the current thread scheduler.
public static let instance = CurrentThreadScheduler()
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer { key.deallocate() }
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
}()
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// Gets a value that indicates whether the caller must call a `schedule` method.
public static fileprivate(set) var isScheduleRequired: Bool {
get {
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
......
}
复制代码
isScheduleRequired
用来表示是否必须调用schedule
方法,利用对 queue
的set,get方法的观察,绑定咱们的当前队列与静态字符串,实现同一线程数据共享。SerialDispatchQueueScheduler并发
SerialDispatchQueueScheduler
抽象了串行 DispatchQueue
。若是你须要执行一些串行任务,能够切换到这个 Scheduler
运行。ConcurrentDispatchQueueScheduler异步
ConcurrentDispatchQueueScheduler
抽象了并行 DispatchQueue
。若是你须要执行一些并发任务,能够切换到这个 Scheduler
运行。OperationQueueSchedulerasync
OperationQueueScheduler
抽象了 NSOperationQueue
。它具有 NSOperationQueue
的一些特色,例如,你能够经过设置 maxConcurrentOperationCount
,来控制同时执行并发任务的最大数量。public class OperationQueueScheduler: ImmediateSchedulerType {
public let operationQueue: OperationQueue
public let queuePriority: Operation.QueuePriority
/// Constructs new instance of `OperationQueueScheduler` that performs work on `operationQueue`.
///
/// - parameter operationQueue: Operation queue targeted to perform work on.
/// - parameter queuePriority: Queue priority which will be assigned to new operations.
public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
self.operationQueue = operationQueue
self.queuePriority = queuePriority
}
......
}
复制代码
OperationQueueScheduler
对象时,须要传入 OperationQueue
和 优先级queuePriority
,做为初始化参数。MainScheduleride
MainScheduler
表明主线程。若是你须要执行一些和 UI 相关的任务,就须要切换到该 Scheduler
运行。public final class MainScheduler : SerialDispatchQueueScheduler {
private let _mainQueue: DispatchQueue
let numberEnqueued = AtomicInt(0)
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
}
复制代码
MainScheduler
继承了SerialDispatchQueueScheduler
串行队列,由于主队列原本就是一个特殊的串行队列。而后在初始化对象时,肯定了队列类型为主队列self._mainQueue = DispatchQueue.main
。根据前面的示例来分析下subscribeOn
和observeOn
的具体实现 使用 subscribeOn函数
subscribeOn
来决定数据序列的构建函数在哪一个 Scheduler
上运行。以上例子中,因为获取 Data 须要花很长的时间,因此用 subscribeOn
切换到 后台 Scheduler
来获取 Data。这样能够避免主线程被阻塞。Observable
建立,应用操做符以及发出通知都会在 Subscribe
方法调用的 Scheduler
执行。subscribeOn
操做符将改变这种行为,它会指定一个不一样的 Scheduler
来让 Observable
执行。public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
return SubscribeOn(source: self, scheduler: scheduler)
}
复制代码
Observable
,咱们就知道原来subscribeOn
是把源序列封装成了一个中间层序列SubscribeOn
。final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
复制代码
RxSwift
核心逻辑的讲解的文章,就可以知道,当序列被订阅的时候,代码必定会执行到run
方法来。(还不太了解的朋友能够查看我前面的关于RxSwift核心逻辑的文章) 进入到SubscribeOnSink.run
方法func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
复制代码
self.parent.scheduler.schedule()
,self.parent.scheduler
就是调用SubscribeOn
方法做为参数传进来的队列,而后执行schedule
方法。self.scheduleInternal(state, action: action)
self.configuration.schedule(state, action: action)
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
复制代码
action(state)
就是从外面传进来的尾随闭包,因此代码会开始执行闭包,就会执行let subscription = self.parent.source.subscribe(self)
,对源序列进行订阅,因此必然会来到Producer
的subscribe
方法。override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
复制代码
schedule
方法里面
defer
是延迟调用,保证在return
以前调用action(state)
,在闭包里面会执行ObservableSequenceSink.run
方法,最后代码又会来到这个schedule
方法,因为上一次进来时把isScheduleRequired
设置成了false,因此代码会执行代码块3(如图)ScheduledItem
对象,而后加入到队列中func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = self._scheduler.schedule(state) { state -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
// 这里由于在递归环境,加了一把锁递归锁,保障安全
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
......
}
复制代码
action
,也就是外界传给递归调度者的闭包,后面就是发送信号的常规流程self.forwardOn(.next(next))
。RxSwift
的代码调用很是的繁琐,嵌套很深,各类闭包,因此须要慢慢地一遍一遍的打断点去仔细斟酌使用 observeOn
observeOn
来决定在哪一个 Scheduler
监听这个数据序列。以上例子中,经过使用 observeOn
方法切换到主线程来监听而且处理结果。observeOn
操做符将指定一个不一样的 Scheduler
来让 Observable
通知观察者。observeOn
大致流程和思想跟subscribeOn
差很少,因此这里就不一一分析了。Scheduler
的继承关系图:
Schedulers
是 Rx
实现多线程的核心模块,它主要用于控制任务在哪一个线程或队列运行subscribeOn
来决定数据序列的构建函数在哪一个 Scheduler
上运行observeOn
来决定在哪一个 Scheduler
监听这个数据序列subscribeOn
和observeOn
都会建立一个中间层序列,因此内部也有一个订阅响应序列的流程,中间层的 sink
就是源序列的观察者有问题或者建议和意见,欢迎你们评论或者私信。 喜欢的朋友能够点下关注和喜欢,后续会持续更新文章。