ReactiveX序列——RxSwift
Swift是苹果公司新推出的一门现代化的编程语言,而且将其开源出来了,Swift具备不少的优势,这也使得这门语言推出的短期引发了很大反应的缘由,在最近的2016年3月的编程语言排行榜处于第14位,甚至超过了OC(15位)。可见Swift的在开发者心中的地位。html
RxSwift的观察者对象(Observable)
在RxSwift中,能够有多种建立Observable对象的方法,主要有如下几种:java
- asObservable
- create
- deferred
- empty
- error
- toObservable/from
- interval
- never
- just
- of
- range
- repeatElement
- timer
要弄明白Observable就要先弄清楚Observable是继承了哪些类和协议,从源码开始分析:
首先第一个是ObservableConvertibleType:react
/** Type that can be converted to observable sequence (`Observer<E>`). */ public protocol ObservableConvertibleType { /** Type of elements in sequence. */ typealias E /** Converts `self` to `Observable` sequence. - returns: Observable sequence that represents `self`. */ func asObservable() -> Observable<E> }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
从ObservableConvertibleType协议源码能够看出,它定义了一个typealias类型别名和asObservable方法,类型别名是用来定义将要处理的类型(例如String,Int等等),而asObervable这个咱们在后面会具体叙述。其次是ObservableType,它继承了ObservableConvertibleType,ObservableType主要干了两个事情,第一个是建立出subscribe方法,它是用来执行订阅事件的(onNext、onError/onComplete),第二个就是简易实现asObservable方法(经过extension ObservableType 实现),asObservable主要是经过Observable.create(subscrible())实现的。再上来就是Observable,它是一个类,继承了ObservableType协议接口。
下面咱们分别对以上几种建立Observable对象作详细的介绍。ios
- asObservable方法:
asObservable实际上是至关于clone方法,其内部实现以下:sql
public func asObservable() -> Observable<E> { return self }
- 1
- 2
- 3
从这里看,它return self也就是本身,这就意味着,你必须先有Observable对象才能调用asObservable方法。例如:编程
var obs = Observable<String>.create { (observer) -> Disposable in observer.on(.Next("hahah")) observer.on(.Next("deasd")) observer.on(.Completed) return NopDisposable.instance } let observable = obs.asObservable() observable.subscribeOn(MainScheduler.instance) .subscribe{ event in print(event.debugDescription) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
第二个是subscribe方法,这个方法具体实现调用了一个“抽象”方法,这个“抽象”方法就是打印出来一个错误日志而且中止运行。json
public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable { abstractMethod() }
- 1
- 2
- 3
- 4
固然,这个Observable类中方法,可是extension Observable实际上是有不少用法的。这也是咱们上面提到建立Observable的各类方法。
二、create方法 swift
public static func create(subscribe: (AnyObserver<E>) -> Disposable) -> Observable<E> { return AnonymousObservable(subscribe) }
- 1
- 2
- 3
- 4
这是一个“静态方法”(在class中用static关键字标注,在struct和enum中使用class关键字标注),这个方法的参数是一个函数(一般咱们会用闭包的方式),函数的参数是AnyObserver,返回的是Disposable。AnyObserver其实就是订阅者,Disposable是一个协议接口,里面只有一个dispose方法,用来释放一些资源。整个create方法返回的是一个AnonymousObservable(匿名Observable),AnonymousObservable继承自Producer,Producer实现了线程调度功能,能够安排某个线程来执行run方法。所以create方法返回的AnonymousObservable是能够运行在指定线程中Observable。完整的create例子:数组
var obs = Observable<String> .create ({ (observer) -> Disposable in observer.on(.Next("hahah")) observer.on(.Next("deasd")) observer.on(.Completed) return NopDisposable.instance }) .observeOn(MainScheduler.instance) .subscribe({event in if let str = event.element { print(str) } }) //.dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
最后obs变量是一个Disposable类型变量,能够继续调用dispose方法释放资源。整个代码输出结果:bash
hahah deasd
- 1
- 2
三、empty方法
public static func empty() -> Observable<E> { return Empty<E>() }
- 1
- 2
- 3
empty方法是一个空方法,里面没有onNext事件处理,只会处理onComplete方法。empty建立Observable对象比较简单。代码例子:
let obs1 = Observable<String>.empty() obs1.subscribe( onNext: {str in print(str)}, onError: { (errorType) -> Void in print(errorType) }, onCompleted: { () -> Void in print("complete") }) { () -> Void in print("dispose") }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
输出结果:
complete dispose
- 1
- 2
- 3
这个例子中有四个闭包,其中最后一个是尾随闭包,并且这些闭包都是可选类型。固然你也能够以下写法:
let obs1 = Observable<String>.empty() obs1.subscribe( onNext: {str in print(str) }, onError: { (errorType) -> Void in print(errorType) }, onCompleted: { () -> Void in print("complete") }, onDisposed: {() -> Void in print("dispose") })
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
四、never方法
public static func never() -> Observable<E> { return Never() }
- 1
- 2
- 3
官方解释是返回一个无终止的观察者事件序列,能够用来表示无限持续时间。尽管咱们给安排了next事件,但实际上,他是不会执行的。不会输出onNext
Observable<String>
.never()
.subscribeNext( { (str) -> Void in print("onNext") }) //.dispose()
- 1
- 2
- 3
- 4
- 5
- 6
五、just方法
public static func just(element: E, scheduler: ImmediateSchedulerType) -> Observable<E> { return JustScheduled(element: element, scheduler: scheduler) }
- 1
- 2
- 3
just方法只能处理单个事件,简单来讲,咱们使用just方法不能将一组数据一块儿处理,只能一个一个处理。例如:
Observable<String>
.just("just test") .subscribeOn(MainScheduler.instance) .subscribeNext({ (str) -> Void in print(str) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
输出结果:
just test
- 1
just方法是一个多态方法,容许在传入参数时候指定线程,例如:
它指定当前线程完成subscribe相关事件。
Observable<String>
.just("just with Scheduler", scheduler: CurrentThreadScheduler.instance) .subscribeNext({ (str) -> Void in print(str) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
六、error方法
public static func error(error: ErrorType) -> Observable<E> { return Error(error: error) }
- 1
- 2
- 3
error方法是返回一个只能调用onError方法的Observable序列。其中的onNext和OnComleted方法是不会执行的。例如:
public static func error(error: ErrorType) -> Observable<E> { return Error(error: error) } Observable<String> .error(RxError.Timeout) .subscribe( onNext: { (str) -> Void in print(str) print("onNext") }, onError: { (error)-> Void in print(error) }, onCompleted: { () -> Void in print("onCompleted") }, onDisposed: { () -> Void in print("dispose") }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
最后的输出结果是:
Sequence timeout dispose
- 1
- 2
七、of方法
能够说of方法是just方法的升级版,它能够将一序列的事情组合起来一块儿处理。极大方便了开发者对数组(Array)、字典(Dictionary)进行分布处理。
public static func of(elements: E ..., scheduler: ImmediateSchedulerType? = nil) -> Observable<E> { return Sequence(elements: elements, scheduler: scheduler) } Observable<String> .of("d1","d2", "d3", "d4") .subscribe( { (event) -> Void in if let els = event.element { print(els) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
这里解释一下subscribe(on: Event->Void)方法,例子中event.element在调用get属性的时候其实会执行一个onNext方法,它返回的是一个可选类型,所以要用if let解析处理。固然若是代码改为以下,那么是不会输出结果的,由于event.error执行的是错误监听(也就是执行的onError方法,所以不会输出结果)。of和just同样,存在一个多态方法,能够带入线程控制。
Observable<String>
.of("d1","d2", "d3", "d4") .subscribe( { (event) -> Void in if let els = event.error{ print(els) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
八、deferred方法
deferred方法是延时建立Observable对象,当subscribe的时候才去建立,它为每个bserver建立一个新的Observable; deferred采用一个Factory函数型做为参数,Factory函数返回的是Observable类型。这也是其延时建立Observable的主要实现。
public static func deferred(observableFactory: () throws -> Observable<E>) -> Observable<E> { return Deferred(observableFactory: observableFactory) }
- 1
- 2
- 3
- 4
整个deferred方法的原理如上图,从图中能够看出,deferred不是第一步建立Observable,而是在subscriber的时候建立的。(图中红色的是error,绿色的是next事件)
九、generate方法
public static func generate(initialState initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable<E> { return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler) }
- 1
- 2
- 3
generate方法是一个迭代器,它一直循环onNext事件,直到condition不知足要求退出。generate有四个参数,第一个是最开始的循环变量,第二个是条件,第三个是迭代器,这个迭代器每次运行都会返回一个E类型,做为下一次是否执行onNext事件源,而是否正的要执行则看是否知足condition条件。其实咱们能够理解generate就是一个循环体(其内部实现也正是一个循环,代码在:GenerateSink的run方法中)。例子:
Observable<String>
.generate(
initialState: "ah", condition: ({ (str) -> Bool in return str.hasPrefix("ah") }), iterate: ({ (str1) -> String in return "h" + str1 })) //.subscribeOn(MainScheduler.instance) .subscribe ({ (event) -> Void in if let res = event.element { print(res) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
输出结果:
ah
- 1
上面这个例子说的是,初始的变量是“ah”,第一个条件知足,执行onNext事件,同时生成一个hah,不知足条件,不执行onNext事件。generate是一个具备高度可变的of方法,它同时兼备了后面要介绍的过滤(filter)特性。固然generate还有一个多态方法,容许传入执行线程。这个线程是为循环体而生的,并非为subscrible而生的。
十、repeatElement方法
public static func repeatElement(element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> { return RepeatElement(element: element, scheduler: scheduler) }
- 1
- 2
- 3
repeatElement方法是一个无限循环的,它会一直循环onNext方法。固然这种循环是能够指定线程的。例子:
Observable<String>
.repeatElement("daa") .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
其中subscribeNext是一个尾随闭包。
十一、using方法
public static func using<R: Disposable>(resourceFactory: () throws -> R, observableFactory: R throws -> Observable<E>) -> Observable<E> { return Using(resourceFactory: resourceFactory, observableFactory: observableFactory) }
- 1
- 2
- 3
using方法是经过Factory方法生成一个对象(resourceFactory)再转换成Observable,中间咱们要使用Factory方法,上面已经介绍过一次Factory方法。using方法相对其余的方法比较复杂和特殊,缘由是using方法是由咱们构建出资源和构建清除资源的,中间经过一个转换生成Observable对象。
Observable<String>
.using( { () -> Student<String> in return Student(source: Observable<String>.just("jarlene"), disposeAction: { () -> () in print("hah") }) }, observableFactory: { (r) -> Observable<String> in return r.asObservable() }) .subscribeNext( { (ss) -> Void in print(ss) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
其中Student类继承了两个协议:ObservableConvertibleType和Disposable;ObservableConvertibleType是为了生成Observable对象(经过调用asObservable方法),Disposable是为了清除资源。源码以下:
class Student<E>: ObservableConvertibleType, Disposable{ private let _source: Observable<E> private let _dispose: AnonymousDisposable init(source: Observable<E>, disposeAction: () -> ()) { _source = source _dispose = AnonymousDisposable(disposeAction) } func dispose() { _dispose.dispose() } func asObservable() -> Observable<E> { return _source } var name :String{ get { return self.name } set { self.name = newValue } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
在上面例子中,咱们采用Observable.just方法生成了一个Observable对象传递给Student对象,同时也定义了一个释放资源的方法。等到调用dispose()方法,就会执行咱们定义的释放资源的方法。例子结果为:
jarlene hah
- 1
- 2
十二、range方法
public static func range(start start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> { return RangeProducer<E>(start: start, count: count, scheduler: scheduler) }
- 1
- 2
- 3
range方法其实方便版of方法,其功能和of差很少,咱们只要输出start和count而后就能生成一组数据,让他们执行onNext。值得注意的是,range方法只生成Observable型。在调用bindNext的时候能够将其对应成其余相应的类型。
例如:
let arr: [String] = ["ad", "cd", "ef", "gh"] Observable<Int> .range(start: 1, count: 3) .subscribeNext { (n) -> Void in print(arr[n]) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
结果
cd ef gh
- 1
- 2
- 3
1三、toObservable(from)
public func toObservable(scheduler: ImmediateSchedulerType? = nil) -> Observable<Generator.Element> { return Sequence(elements: self, scheduler: scheduler) }
- 1
- 2
- 3
- 4
toObservable方法是扩展自Array,是将一个一个array转换成Observable,其内部实调用了一个序列Sequence,其用法很简单。
let arr: [String] = ["ab", "cd", "ef", "gh"] arr.toObservable() .subscribeNext { (s) -> Void in print(s) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
运行结果:
ab
cd ef gh
- 1
- 2
- 3
- 4
1四、interval/timer
public static func interval(period: RxTimeInterval, scheduler: SchedulerType) -> Observable<E> { return Timer(dueTime: period, period: period, scheduler: scheduler ) } public static func timer(dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType) -> Observable<E> { return Timer( dueTime: dueTime, period: period, scheduler: scheduler ) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
interval方法是定时产生一个序列,interval第一个参数就是时间间隔,第二个参数是指定线程。 能够看出interval是range和repeateElement的结合。timer方法和interval方法相似。差异在于timer能够设置间隔时间和持续时间,而interval的间隔时间和持续时间是同样的。
至此,咱们将Observable对象基本的产生方法都讲述完了,下一节开始咱们详细讲述Observer的建立以及制做器Producer,其次将详细叙述Producer和事件方法onNext、onError、onComplete之间的联系,以及Producer是怎么调度线程来完成线程控制的。
RxSwift的观察者对象的变换(Transform Observable)和过滤(Filter Observable)
对观察着对象进行变换使得一个对象变换成另外一个对象,这个是RxSwift核心之一,所以对于熟悉RxSwift特别重要。RxSwift存在如下变换方法:
- buffer
- flatMap
- flatMapFirst
- flatMapLatest
- map
- scan
- window
过滤方法
- debounce / throttle
- distinctUntilChanged
- elementAt
- filter
- sample
- skip
- take
- takeLast
- single
下面咱们分别对以上几种对Observable对象变换作详细的介绍(不所有阐述)。
一、 buffer方法:
buffer方法是extension ObservableType中的一个方法,它的做用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。整体来讲就是通过必定时间,将必定个数的事件组合成一个数组,一块儿处理,在组合的过程当中,你能够选择线程。
public func buffer(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<[E]> { return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("ab", "cd", "ef", "gh") .buffer(timeSpan: 1, count: 2, scheduler: MainScheduler.instance) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
输出结果
["ab", "cd"] ["ef", "gh"]
- 1
- 2
二、flatMap
flatMap也是扩展自ObservableType,它的做用是将一种类型变换成另外一种类型。flatMap的参数是一个方法,这个方法的输入参数与Observable的E是同一种类型,输出依然是Observable类型。
public func flatMap<O: ObservableConvertibleType>(selector: (E) throws -> O) -> Observable<O.E> { return FlatMap(source: asObservable(), selector: selector) }
- 1
- 2
- 3
- 4
咱们看一个例子,例子中首先是一组Observable,经过flatMap后仍是一组Observable,可是flatMap做用是,若是元素中遇到“a”字母开头的,那么它就从新组装一个数组,这个数组是只有元素和“a”;若是元素不是“a”字母开头的就与“b”字母组装成另外一个数组。这两种状况都经过调用toObservable返回Observable。flatMapFirst、flatMapLast、flatMapWithIndex都是相似的做用,这里就不重复。
Observable<String>
.of("ab", "cd", "aef", "gh") .flatMap({ (element: String) -> Observable<String> in if element.hasPrefix("a") { let sd : [String] = [element, "a"] return sd.toObservable() } else { let sd : [String] = [element, "b"] return sd.toObservable() } }) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
结果
ab
a cd b aef a gh b
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
三、map
map方法是经过其实flatMap的简化版本,它返回的能够是任何类型。其中R是返回类型。
public func map<R>(selector: Self.E throws -> R) -> RxSwift.Observable<R>
- 1
例子:
Observable<String>
.of("ab", "cd", "aef", "gh") .map({ (str) -> String in return str+"ss" }) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
结果
abss cdss aefss ghss
- 1
- 2
- 3
- 4
四、scan方法
scan方法有两个参数,第一个参数是种子,第二个参数是加速器。所谓的种子就是最初的状态,加速器就是将每一次运行的结果延续到下一次。scan方法也是扩展自ObservableType
public func scan<A>(seed: A, accumulator: (A, E) throws -> A) -> Observable<A> { return Scan(source: self.asObservable(), seed: seed, accumulator: accumulator) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "b", "c", "d", "e") .scan("s", accumulator: { (a, b) -> String in return a+b }) .subscribeNext({ (n) -> Void in print(n) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
这个例子中是将全部的字符依次串起来,运行结果是:
sa sab sabc sabcd sabcde
- 1
- 2
- 3
- 4
- 5
五、window
window方法一样扩展自ObservableType,它有三个参数,第一个是时间间隔,第二个是数量,第三个是线程。时间间隔指的的是window方法开窗的时间间隔;第二个参数数量指的的是每次经过窗口的个数;线程就是这种操做执行在什么线程上。起源码以下:
public func window(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<Observable<E>> { return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) }
- 1
- 2
- 3
- 4
须要特别注意的是window方法以后,返回的是Observable
Observable<String>
.of("ab", "bc", "cd", "de", "ef") .window(timeSpan: 1, count: 2, scheduler: MainScheduler.instance) .subscribeNext({ (n) -> Void in n.subscribeNext({ (ss) -> Void in print(ss) }) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
结果:
ab
bc
cd de ef
- 1
- 2
- 3
- 4
- 5
变换的方法基本就这些,可是开发者能够经过自定义的方式扩展变换的方法以达到所需的目的。接下来咱们看看过滤方法。
一、debounce / throttle
debounce/throttle 方法在规定的时间中过滤序列元素,就如上图描述的同样,当debounce打开的时候,恰好那个黄色的序列元素过来,那么它就不会通知到事件(onNext、onError、onComplete)上去。下面是debounce方法源码。
public func throttle(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<E> { return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "b", "c", "d", "e", "f") .debounce(1, scheduler: MainScheduler.instance) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
输出结果
f
- 1
二、distinctUntilChanged
distinctUntilChanged 主要是过滤相邻两个元素是否重复,重复的话就过滤掉其中之一。
public func distinctUntilChanged<K>(keySelector: (E) throws -> K, comparer: (lhs: K, rhs: K) throws -> Bool) -> Observable<E> { return DistinctUntilChanged(source: self.asObservable(), selector: keySelector, comparer: comparer) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "a", "c", "e", "e", "f") .distinctUntilChanged({ (lhs, rhs) -> Bool in return lhs==rhs }) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
输出结果:
a c e f
- 1
- 2
- 3
- 4
三、elementAt
elementAt方法其实就挑选出所须要的序列元素,上图描述的很清楚。
这个方法很简单。没有什么难点。当index超界的时候,throwOnEmpty参数是否抛出异常。
public func elementAt(index: Int) -> Observable<E> { return ElementAt(source: asObservable(), index: index, throwOnEmpty: true) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff") .elementAt(2) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
输出结果
cs
- 1
四、filter
filter方法很简单,指出过滤条件就行,知足过滤条件的就能执行事件通知,不然不行
public func filter(predicate: (E) throws -> Bool) -> Observable<E> { return Filter(source: asObservable(), predicate: predicate) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff") .filter({ (ss) -> Bool in return ss.hasPrefix("a") }) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
输出结果
aa av
- 1
- 2
接下来的几个方法都是相似的,这里不就在详细叙述啦。
RxSwift的Observable事件处理以及线程调度
由第一部分能够知道,几乎在建立全部的Observable的时候都要用到Producer,而在事件处理(onNext、onError、onComplete)过程当中常常要用到线程调度(Scheduler),它们之间存在一种很巧妙的设计。首先先看看Producer源码。
class Producer<Element> : Observable<Element> { override init() { super.init() } override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { if !CurrentThreadScheduler.isScheduleRequired { return run(observer) } else { return CurrentThreadScheduler.instance.schedule(()) { _ in return self.run(observer) } } } func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { abstractMethod() } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
Producer是继承了Observable类,咱们在建立Observable类时候都用到了Producer,那么Producer主要作了两件事情,第一个实现subscribe方法,在subscribe方法中传入了observer参数,observer类型是ObserverType,在上一部分介绍了ObserverType中有一个类型别名E,那么在Producer的范型element就必须和ObserverType中类型别名E同样。回过头来讲subscribe,咱们首先看CurrentThreadScheduler 的源码,CurrentThreadScheduler 是继承ImmediateSchedulerType协议,它里面就定义了一个方法:
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable
- 1
而这个方法在CurrentThreadScheduler 具体实现了。
public class CurrentThreadScheduler : ImmediateSchedulerType { typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>> /** The singleton instance of the current thread scheduler. */ public static let instance = CurrentThreadScheduler() static var queue : ScheduleQueue? { get { return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance) } set { NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance) } } /** Gets a value that indicates whether the caller must call a `schedule` method. */ public static private(set) var isScheduleRequired: Bool { get { let value: CurrentThreadSchedulerValue? = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance) return value == nil } set(isScheduleRequired) { NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerValueInstance, forKey: CurrentThreadSchedulerKeyInstance) } } /** Schedules an action to be executed as soon as possible on current thread. If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be automatically installed and uninstalled after all work is performed. - parameter state: State passed to the action to be executed. - parameter action: Action to be executed. - returns: The disposable object used to cancel the scheduled action (best effort). */ public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable { if CurrentThreadScheduler.isScheduleRequired { CurrentThreadScheduler.isScheduleRequired = false let disposable = action(state) defer { CurrentThreadScheduler.isScheduleRequired = true CurrentThreadScheduler.queue = nil } guard let queue = CurrentThreadScheduler.queue else { return disposable } while let latest = queue.value.dequeue() { if latest.disposed { continue } latest.invoke() } return disposable } let existingQueue = CurrentThreadScheduler.queue let queue: RxMutableBox<Queue<ScheduledItemType>> if let existingQueue = existingQueue { queue = existingQueue } else { queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1)) CurrentThreadScheduler.queue = queue } let scheduledItem = ScheduledItem(action: action, state: state) queue.value.enqueue(scheduledItem) return scheduledItem } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
其实主要是根据CurrentThreadScheduler.isScheduleRequired参数来选择是否须要当前线程运行,若是须要,首调用action方法,而这个action方法其实就是onNext、onError、onCompelete方法。而后作了一个延迟清除(defer)和一个判断(guard)。而后循环一个queue其实主要是看看是否还有没有执行完的onNext时间。latest.invoke()其实就是作action(state),而后返回Disposable。若是不须要,则组合queue,生成Disposable返回。接下来咱们看看怎么设置线程执行的,首选看看subscribleOn方法,这个方法就是指定接下来事情要发生在那个线程中,具体看一下代码:
public func observeOn(scheduler: ImmediateSchedulerType) -> Observable<E> { if let scheduler = scheduler as? SerialDispatchQueueScheduler { return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler) } else { return ObserveOn(source: self.asObservable(), scheduler: scheduler) } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
方法是定义在extension ObservableType 中的,它指定ObservableType 运行线程,这里面指定了两种运行方式,第一种是运行ObserveOnSerialDispatchQueue,第二种是ObserveOn这两个都继承自Producer,上面咱们已经叙述了Producer,无论是ObserveOnSerialDispatchQueue仍是ObserveOn都重写了run方法,他们返回的都是ObserverBase。ObserverBase其实就是在执行onNext、onError、onComplete方法。
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private var _isStopped: AtomicInt = 0 func on(event: Event<E>) { switch event { case .Next: if _isStopped == 0 { onCore(event) } case .Error, .Completed: if !AtomicCompareAndSwap(0, 1, &_isStopped) { return } onCore(event) } } func onCore(event: Event<E>) { abstractMethod() } func dispose() { _isStopped = 1 } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
onCore方法是由继承者实现,好比在ObserveOnSink类中及具体实现了onCore方法
override func onCore(event: Event<E>) { let shouldStart = _lock.calculateLocked { () -> Bool in self._queue.enqueue(event) switch self._state { case .Stopped: self._state = .Running return true case .Running: return false } } if shouldStart { _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run) } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
这个onCore方法是判断当前运行到那一步(onNext,onError,onComplete)。如今咱们回过头来看Producer中的subscribe其实就是执行事件,只不过这个事件是在某个线程上执行的。咱们能够绘制一个简单的流程图描述这些。
Observable执行subscribleOn方法,产生一个新的Observable,这个新Observable是Produce,他继承了Observable,当Observable执行subscrible方法的时候,会根据线程来执行,若是指定了线程,那么就会经过run方法去执行事件。若是没有指定线程,就用当前线程执行run方法去执行事件。固然若是要用到变换或者过滤,也能够经过指定线程来执行变换和过滤,其原理是同样的。
RxSwift的观察者对象的合并(Conbinate Observable)和连接器(Connect Observable)
对观察着合并就是将多个观察着(Observables)合并起来处理,使用起来更方便。它主要由如下方法:
- merge
- startWith
- switchLatest
- combineLatest
- zip
连接器
- multicast
- publish
- refCount
- replay
- shareReplay
固然为了将多个相同类型观察者对象合并起来处理,能够极大减小重复代码的工做量。从本节开始咱们将会叙述观察者对象的合并和发布。
一、merge
从图中很容易看出merge方法就是将多个Observable对象合并处理。
public func merge() -> Observable<E.E> { return Merge(source: asObservable()) }
- 1
- 2
- 3
例子:
Observable.of( Observable.of("a", "b"), Observable.of("c", "d"), Observable.of("e", "f"), Observable.of("g", "h")) .merge() .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
结果:
a b c d e f g h
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
merge方法看起来特别简单。
二、startWith
startWith方法能够说是定制开始位置的,是一种比较特殊的merge方法。
public func startWith(elements: E ...) -> Observable<E> { return StartWith(source: self.asObservable(), elements: elements) }
- 1
- 2
- 3
- 4
startWith方法其实就是指定一个特殊的开头,
例子:
Observable.of( Observable.of("a", "b"), Observable.of("c", "d"), Observable.of("e", "f"), Observable.of("g", "h")) .merge() .startWith("x") .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
结果:
x
a b c d e f g h
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
三、switchLatest
RxSwift中switchLatest至关与其余语言的switch方法,从图中能够很明显的看出来,第一个序列的最后一个元素被去掉了,没有执行OnNext方法。
public func switchLatest() -> Observable<E.E> { return Switch(source: asObservable()) }
- 1
- 2
- 3
例子:
let var1 = Variable(0) let var2 = Variable(200) // var3 is like an Observable<Observable<Int>> let var3 = Variable(var1.asObservable()) let d = var3 .asObservable() .switchLatest() .subscribeNext { (str) -> Void in print(str) } var1.value = 1 var1.value = 2 var1.value = 3 var1.value = 4 var3.value = var2.asObservable() var2.value = 201 var1.value = 5 var1.value = 6 var1.value = 7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
结果:
0 1 2 3 4 200 201
- 1
- 2
- 3
- 4
- 5
- 6
- 7
Variable是一个类,它里面包含了一个Observable对象(BehaviorSubject),另外Variable中仍是实现了asObservable方法,而这个方法返回的就是里面的Observable对象。Variable源码很简单,这里不作特别的介绍。至于var1.value=5\6\7没有执行,这个正是switchLatest的做用,当var1的做用完成后,切换到var2的Observable。那么var1后续变化,是不会通知到var3的。
四、combineLatest
combineLatest和其余方法同样都是扩展自ObservableType协议,
public func combineLatest<R>(resultSelector: [Generator.Element.E] throws -> R) -> Observable<R> { return CombineLatestCollectionType(sources: self, resultSelector: resultSelector) }
- 1
- 2
- 3
例子:
let var1 = Variable(0) let var2 = Variable(200) // var3 is like an Observable<Observable<Int>> let var3 = Variable(var1.asObservable()) let d = var3 .asObservable() .switchLatest() .subscribeNext { (str) -> Void in print(str) } var1.value = 1 var1.value = 2 var1.value = 3 var1.value = 4 var3.value = var2.asObservable() var2.value = 201 var1.value = 5 var1.value = 6 var1.value = 7 Observable.combineLatest(var1.asObservable(), var2.asObservable()) { (as1, as2) -> Int in return as1 + as2 } .subscribeNext { (mon) -> Void in print(mon) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
输出结果:
208
- 1
简单的分析一下,就能够看出来,var2.value = 201; var1.value = 7,最后就是208;combineLatest是将多个Observable方法按照必定意愿组合起来。它提供了开发者组合的方法,本身实现就好了。
五、zip
zip和combineLatest差很少,能够将多个Observable合并起来处理,上面的例子一样能够用zip来实现,具体看例子,下面是zip方法的源码
public static func zip<O1: ObservableType, O2: ObservableType>
(source1: O1, _ source2: O2, resultSelector: (O1.E, O2.E) throws -> E) -> Observable<E> { return Zip2( source1: source1.asObservable(), source2: source2.asObservable(), resultSelector: resultSelector ) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
例子:
Observable.zip(var1.asObservable(), var2.asObservable()) { (s1, s2) -> Int in return s1+s2 } .subscribeNext { (mon) -> Void in print(mon) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
当全部的工做都作完以后,咱们要的观察着对象进行发布,那么这个时候就要用到Connect,它是连接观察着对象和被观察着之间的一个连接器。它主要由如下方法:
一、multicast/publish
multicast和publish方法同样,它们都是经过发布/多播将Observable发出去,固然发出去必需要有一个链接(connect)的过程。只有连接的对象才会收到publish/multicast的通知。下面是源码:
public func multicast<S: SubjectType where S.SubjectObserverType.E == E>(subject: S) -> ConnectableObservable<S.E> { return ConnectableObservableAdapter(source: self.asObservable(), subject: subject) } public func publish() -> ConnectableObservable<E> { return self.multicast(PublishSubject()) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
例子:
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe { print("Subject \($0)") } let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .multicast(subject1) _ = int1 .subscribe { print("first subscription \($0)") } int1.connect()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
这个例子经过使用一个interval方法一直放送,而后经过multicast将subject1通知(多播)出去,int1.subscribe来接受subject1的变化。
输出的结果:
Subject Next(0) first subscription Next(0) Subject Next(1) first subscription Next(1) Subject Next(2) first subscription Next(2) Subject Next(3) first subscription Next(3) Subject Next(4) first subscription Next(4)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
固然publish也有相似的功能。
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe { print("Subject \($0)") } let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() _ = int1 .subscribe { print("first subscription \($0)") } int1.connect()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
输出结果:
first subscription Next(0) first subscription Next(1) first subscription Next(2) first subscription Next(3) first subscription Next(4) first subscription Next(5) first subscription Next(6) first subscription Next(7) first subscription Next(8) first subscription Next(9) first subscription Next(10)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
publish方法只是将发布出去的结果的变化告知,原变化没有告知出来,固然这个事publish和multicast一点小区别。
二、refCount
refCount是结合了publish方法使用的,当Observable发布出去,经过一个引用计数(refCount)方法来记录,其实refCount就是至关于connect方法。refCount源码以下,
public func refCount() -> Observable<E> { return RefCount(source: self) }
- 1
- 2
- 3
例子:
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() .refCount() .subscribeNext({ (ss) -> Void in print(ss) })
- 1
- 2
- 3
- 4
- 5
- 6
输出结果:
0 1 2 3 4 ...
- 1
- 2
- 3
- 4
- 5
- 6
三、replay/shareReplay
其实replay和multicast是同样,在其源码中其实也是调用multicast方法。起源码以下:
public func replay(bufferSize: Int) -> ConnectableObservable<E> { return self.multicast(ReplaySubject.create(bufferSize: bufferSize)) } public func shareReplay(bufferSize: Int) -> Observable<E> { if bufferSize == 1 { return ShareReplay1(source: self.asObservable()) } else { return self.replay(bufferSize).refCount() } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
只不过,以前multicast传入的PublishSubject,这里是ReplaySubject,二者其实区别不大。
ReplaySubject中调用了ReplayMany,ReplayMany有一个事件队列来轮循事件。ReplaySubject和ReplayMany比较简单,这里再也不叙述。
例子:
let int2 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .replay(1) _ = int2.subscribeNext({ (ss) -> Void in print(ss) }) int2.connect() _ = int2.subscribeNext({ (ss) -> Void in print("a..\(ss)") })
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
输出结果:
0 a..0 1 a..1 2 a..2 3 a..3 4 a..4 5 a..5
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
其中shareReplay和replay是一回事,从源码中酒能够看出来,shareReplay是replay和refCount的组合,这里再也不叙述。
至此为止,关于Swift的一些基本用法和基本的概念都讲述完毕,固然还有一些相关的扩展,可是这个都和上面讲述原理是同样的,你们能够参考源码理解。
RxSwift、RxCocoa
从本节开始讲叙述与RxSwift配套使用的RxCocoa,RxCocoa主要是针对上层Ui作扩展,这些扩展主要是将上面所叙述的东西结合Ui控件使用。将RxSwift和RxCocoa结合起来使用对ios开发能够节省很大开发的时间,RxCocoa扩展性和RxSwift同样灵活,能够针对不一样的业务进行不一样的扩展,很方便开发者使用。因为本人对于ios sdk不是很了解。全部这里不具体讨论
总结
整篇博客写下来字数达到2w多,能够看成一个小论文了,做为一个swift新手,我花了10天左右的时间熟悉swift基本语法,为了可以更加熟悉swift,因而选择研究RxSwift,已达到更加熟练掌握swift的目的,整篇博客下下来花费时间就长达一个多月,不少swift的基本东西看完以后就忘记,又不得不从新看起语法来。不过总算完成了,固然研究完RxSwift,我更加能够熟练的swift进行ios开发。以后我会继续研究ReactiveX其余语言。期待愈来愈好吧。
参考连接:http://m.blog.csdn.net/article/details?id=50823558