RxSwift (二)序列核心逻辑分析github
RxSwift (三)Observable的建立,订阅,销毁编程
RxSwift(四)高阶函数swift
RxSwift(五)(Rxswift对比swift,oc用法)数组
RxSwift (十) 基础使用篇 1- 序列,订阅,销毁markdown
RxSwift学习之十二 (基础使用篇 3- UI控件扩展) @TOC网络
咱们知道Swift中有不少高阶函数,很是好用,并且效率都很高,如咱们常用的map,fliter,flatmap等等。详情能够参考我以前的一篇博客:Swift的高阶函数闭包
本篇文章主要讲解Rxswift中的高阶函数. 主要讲解高阶函数的使用,而后展开来探索具体源码实现。
本篇博客涉及的实例都放在一个项目源码里面:点击此处下载高阶函数使用源码
实例 1: 代码:
// *** startWith : 在开始从可观察源发出元素以前,发出指定的元素序列 func test_startWith() { print("*****startWith*****") Observable.of("1", "2", "3", "4") .startWith("A") .startWith("B") .startWith("C", "a", "b") .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) //效果: CabBA1234 } 复制代码
运行结果:
将员可观测序列中的元素组合成一个新的可观测序列,并将像每一个源可观测序列发出元素同样发出每一个元素。
实例 5: 代码:
// **** merge : 将源可观察序列中的元素组合成一个新的可观察序列,并将像每一个源可观察序列发出元素同样发出每一个元素 func test_merge() { print("*****merge*****") let subject1 = PublishSubject<String>() let subject2 = PublishSubject<String>() // merge Observable.of(subject1,subject2) .merge() .subscribe(onNext: { print($0)}) .disposed(by: disposeBag) //- 下面任何一个响应都会勾起新序列响应 subject1.onNext("K") subject1.onNext("o") subject2.onNext("n") subject2.onNext("g") subject1.onNext("Y") subject2.onNext("u") subject1.onNext("L") subject2.onNext("u") } 复制代码
运行结果:
实例 10: 代码:
// *** zip: 将多达8个源可观测序列组合成一个新的可观测序列, // 并将从组合的可观测序列中发射出对应索引处每一个源可观测序列的元素 func test_zip() { print("*****zip*****") let stringSubject = PublishSubject<String>() let intSubject = PublishSubject<Int>() Observable.zip(stringSubject, intSubject) { stringElement , intElement in "\(stringElement) \(intElement)" } .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) stringSubject.onNext("K") stringSubject.onNext("o") //到这里存储了K o 可是很差响应,除非有另外一个响应 intSubject.onNext(1) //勾出一个 intSubject.onNext(2) //勾出另外一个 stringSubject.onNext("o") //再存一个 intSubject.onNext(3) //勾出一个 //总结: 只有两个序列同时有值的 时候才会响应,不然只会存值。 } 复制代码
运行结果:
实例 15: 代码:
/// combineLatest:将8源可观测序列组合成一个新的观测序列, ///并将开始发出联合观测序列的每一个源的最新元素可观测序列一旦全部排放源序列至少有一个元素, ///而且当源可观测序列发出的任何一个新元素 func test_combineLatest() { print("*****combineLatest*****") let stringSub = PublishSubject<String>() let intSub = PublishSubject<Int>() //合并序列 Observable.combineLatest(stringSub, intSub) { strE, intE in "\(strE) \(intE)" } .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) stringSub.onNext("K") //存一个K stringSub.onNext("Y") //存了一个覆盖 - 和zip不同 intSub.onNext(1) //发现strOB 观察者存在值 Y(上面的Y覆盖了K) 则 响应 Y 1 intSub.onNext(2) //覆盖1 -> 2, 发现strOB有值YK 响应 Y 2 stringSub.onNext("Kongyulu") // 覆盖Y -> Kongyulu 发现intOB有值 2 响应:Kongyulu 2 //总结:1. combineLatest 比较zip 会覆盖 // 2. 应用很是频繁: 好比帐户和密码同时知足->才能登录. 不关系帐户密码怎么变化的只要查看最后有值就能够 loginEnable } 复制代码
运行结果:
实例 20: 代码:
// 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素 func test_switchLatest() { // switchLatest : 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素 print("*****switchLatest*****") let switchLatestSub1 = BehaviorSubject(value: "K") let switchLatestSub2 = BehaviorSubject(value: "1") //注意下面这句代码:这里选择了switchLatestSub1就不会再监听switchLatestSub2 let switchLatestSub = BehaviorSubject(value: switchLatestSub1) switchLatestSub.asObservable() .switchLatest() .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) switchLatestSub1.onNext("Y") switchLatestSub1.onNext("_") switchLatestSub2.onNext("2") switchLatestSub2.onNext("3") // 2,3都不会监听,可是默认保存有2 覆盖1 3覆盖2 } 复制代码
运行结果:
// 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素 func test_switchLatest() { // switchLatest : 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素 print("*****switchLatest*****") let switchLatestSub1 = BehaviorSubject(value: "K") let switchLatestSub2 = BehaviorSubject(value: "1") //注意下面这句代码:这里选择了switchLatestSub1就不会再监听switchLatestSub2 let switchLatestSub = BehaviorSubject(value: switchLatestSub1) switchLatestSub.asObservable() .switchLatest() .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) switchLatestSub1.onNext("Y") switchLatestSub1.onNext("_") switchLatestSub2.onNext("2") switchLatestSub2.onNext("3") // 2,3都不会监听,可是默认保存有2 覆盖1 3覆盖2 switchLatestSub.onNext(switchLatestSub2) //切换到 switchLatestSub2 switchLatestSub1.onNext("*") //因为上面切换到了switchLatestSub2,因此switchLatestSub1不会响应,不会输出* switchLatestSub1.onNext("Kongyulu")//这里不会响应,不会输出Kongyulu switchLatestSub2.onNext("4") /* 到这里会输出: *****switchLatest***** K Y _ 3 4 */ switchLatestSub.onNext(switchLatestSub1)// 若是再次切换到 switchLatestSub1会打印出 Kongyulu switchLatestSub2.onNext("5") /* 到这里会输出: *****switchLatest***** K Y _ 3 4 Kongyulu */ } 复制代码
运行结果:
实例 25: 代码:
/// 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。 func test_map() { // ***** map: 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。 print("*****map*****") let ob = Observable.of(1,2,3,4) ob.map { (number) -> Int in return number + 2 } .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 30: 代码:
///将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。 ///这也颇有用,例如,当你有一个可观察的序列,它自己发出可观察的序列, ///你想可以对任何一个可观察序列的新发射作出反应(序列中序列:好比网络序列中还有模型序列) func test_flatmap() { print("*****flatMap*****") let boy = LGPlayer(score: 100) let girl = LGPlayer(score: 90) let player = BehaviorSubject(value: boy) player.asObservable() .flatMap { $0.score.asObservable() } // 自己score就是序列 模型就是序列中的序列 .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) boy.score.onNext(60) player.onNext(girl) } 复制代码
运行结果:
struct LGPlayer { init(score: Int) { self.score = BehaviorSubject(value: score) } let score: BehaviorSubject<Int> } func test_flatmap() { print("*****flatMap*****") let boy = LGPlayer(score: 100) let girl = LGPlayer(score: 90) let player = BehaviorSubject(value: boy) player.asObservable() .flatMap { $0.score.asObservable() } // 自己score就是序列 模型就是序列中的序列 .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) boy.score.onNext(60) player.onNext(girl) boy.score.onNext(50) boy.score.onNext(40)// 若是切换到 flatMapLatest 就不会打印 girl.score.onNext(10) girl.score.onNext(0) } 复制代码
运行结果:
实例 35: 代码:
struct LGPlayer { init(score: Int) { self.score = BehaviorSubject(value: score) } let score: BehaviorSubject<Int> } /// flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观测序列发射元素 /// flatMapLatest其实是map和switchLatest操做符的组合。 func test_flatMapLatest() { print("*****flatMapLatest*****") let boy = LGPlayer(score: 100) let girl = LGPlayer(score: 90) let player = BehaviorSubject(value: boy) player.asObservable() .flatMapLatest { $0.score.asObservable() } // 自己score就是序列 模型就是序列中的序列 .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) boy.score.onNext(60) player.onNext(girl) boy.score.onNext(50) boy.score.onNext(40)// 若是切换到 flatMapLatest 就不会打印 girl.score.onNext(10) girl.score.onNext(0) } 复制代码
运行结果:
实例 40: 代码:
///从初始就带有一个默认值开始,而后对可观察序列发出的每一个元素应用累加器闭包,并以单个元素可观察序列的形式返回每一个中间结果 func test_scan() { print("*****scan*****") Observable.of(10,100,1000) .scan(2) { aggregateValue, newValue in aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2 } .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 45: 代码:
///仅从知足指定条件的可观察序列中发出那些元素 func test_fliter() { // **** filter : 仅从知足指定条件的可观察序列中发出那些元素 print("*****filter*****") Observable.of(1,2,3,4,5,6,7,8,9,0) .filter{$0 % 2 == 0} .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 50: 代码:
///抑制可观察序列发出的顺序重复元素 func test_distinctUntilChanged() { // ***** distinctUntilChanged: 抑制可观察序列发出的顺序重复元素 print("*****distinctUntilChanged*****") Observable.of("1", "2", "2", "2", "3", "3", "4") .distinctUntilChanged() .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 55: 代码:
///仅在可观察序列发出的全部元素的指定索引处发出元素 func test_elementAt() { // **** elementAt: 仅在可观察序列发出的全部元素的指定索引处发出元素 print("*****elementAt*****") Observable.of("C", "o", "o", "c", "I") .elementAt(3) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 60: 代码:
///只发出可观察序列发出的第一个元素(或知足条件的第一个元素)。若是可观察序列发出多个元素,将抛出一个错误。 func test_single() { // *** single: 只发出可观察序列发出的第一个元素(或知足条件的第一个元素)。若是可观察序列发出多个元素,将抛出一个错误。 print("*****single*****") Observable.of("kongyulu", "yuhairong") .single() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
代码:
func test_single2() { // *** single: 只发出可观察序列发出的第一个元素(或知足条件的第一个元素)。若是可观察序列发出多个元素,将抛出一个错误。 print("*****single*****") Observable.of("kongyulu", "yuhairong") .single{ $0 == "kongyulu"} .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 65: 代码:
///只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个 func test_take() { // **** take: 只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个 print("*****take*****") Observable.of("kongyulu", "yuhairong","yifeng", "yisheng") .take(2)//这里取前面两个 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
代码:
func test_take2() { // **** take: 只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个 print("*****take*****") Observable.of("kongyulu", "yuhairong","yifeng", "yisheng") .take(3) //这里取前面三个 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 70: 代码:
//仅从可观察序列的末尾发出指定数量的元素 func test_takeLast() { // *** takeLast: 仅从可观察序列的末尾发出指定数量的元素 print("*****takeLast*****") Observable.of("kongyulu", "yuhairong","yifeng", "yisheng") .takeLast(3)//取从末尾开始算起的3个元素 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 75: 代码:
///只要指定条件的值为true,就从可观察序列的开始发出元素 func test_takeWhile() { // **** takeWhile: 只要指定条件的值为true,就从可观察序列的开始发出元素 print("*****takeWhile*****") Observable.of(1, 2, 3, 4, 5, 6) .takeWhile { $0 < 3 } //取出知足条件的元素 (1,2) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 80: 代码:
/// 从源可观察序列发出元素,直到参考可观察序列发出元素 /// 这个要重点,应用很是频繁 好比我页面销毁了,就不能获取值了(cell重用运用) func test_takeUntil() { // ***** takeUntil: 从源可观察序列发出元素,直到参考可观察序列发出元素 print("*****takeUntil*****") let sourceSequence = PublishSubject<String>() let referenceSequence = PublishSubject<String>() sourceSequence .takeUntil(referenceSequence) .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) sourceSequence.onNext("kongyulu") sourceSequence.onNext("yifeng") sourceSequence.onNext("yisheng") //referenceSequence.onNext("yuhairong") // 条件一出来,下面就走不了 sourceSequence.onNext("test1") sourceSequence.onNext("test2") sourceSequence.onNext("test3") } 复制代码
运行结果:
referenceSequence.onNext("yuhairong")
放开后,控制台打印的结果为:实例 85: 代码:
///从源可观察序列发出元素,直到参考可观察序列发出元素 /// 这个要重点,应用很是频繁 textfiled 都会有默认序列产生 func test_skip() { // ***** skip: 从源可观察序列发出元素,直到参考可观察序列发出元素 print("*****skip*****") Observable.of(1,2,3,4,5,6) .skip(2) //直接跳过前面两个元素,即从3开始 .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) } 复制代码
运行结果:
实例 90: 代码:
/// 直接跳过知足条件的元素,至关于过滤做用 func test_skipWhile() { print("*****skipWhile*****") //skipWhile刚刚和takeWhile的做用相反 Observable.of(1, 2, 3, 4, 5, 6) .skipWhile { $0 < 4 } //直接跳过知足条件的元素,至关于过滤做用(知足小于4的都跳过,即只有4,5,6) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
运行结果:
代码:
/// 抑制从源可观察序列发出元素,直到参考可观察序列发出元素 func test_skipUntil() { // *** skipUntil: 抑制从源可观察序列发出元素,直到参考可观察序列发出元素 // skipUntil 做用刚刚和 takeUntil 相反 print("*****skipUntil*****") let sourceSeq = PublishSubject<String>() let referenceSeq = PublishSubject<String>() sourceSeq .skipUntil(referenceSeq) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) // 没有条件命令 下面走不了 sourceSeq.onNext("kongyulu") sourceSeq.onNext("yifeng") sourceSeq.onNext("yisheng") //referenceSeq.onNext("yuhairong") // 条件一出来,下面就能够走了 sourceSeq.onNext("test1") sourceSeq.onNext("test2") sourceSeq.onNext("test3") } 复制代码
运行结果:
referenceSeq.onNext("yuhairong")
再来看运行结果。
运行结果:
实例 95: 代码:
/// 将一个可观察序列转换为一个数组,将该数组做为一个新的单元素可观察序列发出,而后终止 func test_toArray() { // *** toArray: 将一个可观察序列转换为一个数组,将该数组做为一个新的单元素可观察序列发出,而后终止 print("*****toArray*****") Observable.range(start: 1, count: 10) .toArray() //这里生成一个从1到10的数组 .subscribe { print($0) } .disposed(by: disposeBag) } 复制代码
运行结果:
实例 100: 代码:
/// 从一个设置的初始化值开始,而后对一个可观察序列发出的全部元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 相似scan func test_reduce() { // *** reduce: 从一个设置的初始化值开始,而后对一个可观察序列发出的全部元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 相似scan print("*****reduce*****") Observable.of(10, 100, 1000) .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
结果:
实例 105: 代码:
/// 以顺序方式链接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素以前,等待每一个序列成功终止 /// 用来控制顺序 func test_concat() { // *** concat: 以顺序方式链接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素以前,等待每一个序列成功终止 // 用来控制顺序 print("*****concat*****") let subject1 = BehaviorSubject(value: "kongyulu") let subject2 = BehaviorSubject(value: "1") let subjectsSubject = BehaviorSubject(value: subject1) subjectsSubject.asObservable() .concat() .subscribe { print($0) } .disposed(by: disposeBag) subject1.onNext("yifeng") subject1.onNext("yisheng") subjectsSubject.onNext(subject2) subject2.onNext("打印不出来") subject2.onNext("2") //subject1.onCompleted() // 必需要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步 subject2.onNext("3") } 复制代码
结果1:
subject1.onCompleted()
, 而使用了concat后subject2的订阅须要等待subject1完成以后才能执行。因此才有了咱们看到的上面打印结果,subject2的订阅信息都没有打印出来。
实例 110: 代码:
/// 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,而后终止 func test_catchErrorJustReturn() { // **** catchErrorJustReturn // 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,而后终止 print("*****catchErrorJustReturn*****") let sequenceThatFails = PublishSubject<String>() sequenceThatFails .catchErrorJustReturn("kongyulu") .subscribe{print($0)} .disposed(by: disposeBag) sequenceThatFails.onNext("yifeng") sequenceThatFails.onNext("yisheng")// 正常序列发送成功的 sequenceThatFails.onError(self.kylError) //发送失败的序列,一旦订阅到位 返回咱们以前设定的错误的预案 } 复制代码
输出结果:
实例 115: 代码:
/// 经过切换到提供的恢复可观察序列,从错误事件中恢复 func test_catchError() { // **** catchErrorJustReturn // 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,而后终止 print("*****catchErrorJustReturn*****") let sequenceThatFails = PublishSubject<String>() sequenceThatFails .catchErrorJustReturn("kongyulu") .subscribe{print($0)} .disposed(by: disposeBag) sequenceThatFails.onNext("yifeng") sequenceThatFails.onNext("yisheng")// 正常序列发送成功的 sequenceThatFails.onError(self.kylError) //发送失败的序列,一旦订阅到位 返回咱们以前设定的错误的预案 // **** catchError // 经过切换到提供的恢复可观察序列,从错误事件中恢复 print("*****catchError*****") let recoverySequence = PublishSubject<String>() recoverySequence .catchError { print("Error:",$0) return recoverySequence // 获取到了错误序列-咱们在中间的闭包操做处理完毕,返回给用户须要的序列(showAlert) } .subscribe{print($0)} .disposed(by: disposeBag) sequenceThatFails.onNext("test1") sequenceThatFails.onNext("test2") // 正常序列发送成功的 sequenceThatFails.onError(kylError) // 发送失败的序列 recoverySequence.onNext("yuhairong") } 复制代码
输出结果:
实例 120: 代码:
/// 经过无限地从新订阅可观察序列来恢复重复的错误事件 func test_retry() { // *** retry: 经过无限地从新订阅可观察序列来恢复重复的错误事件 print("*****retry*****") var count = 1 // 外界变量控制流程 let sequenceRetryErrors = Observable<String>.create { (observer) -> Disposable in observer.onNext("kongyulu") observer.onNext("yifeng") observer.onNext("yisheng") if count == 1 { // 流程进来以后就会过分-这里的条件能够做为出口,失败的次数 observer.onError(self.kylError) print("错误序列来了") count += 1 } observer.onNext("test1") observer.onNext("test2") observer.onNext("test3") observer.onCompleted() return Disposables.create() } sequenceRetryErrors //.retry() //调用这个retry后,上面的observer闭包会从新执行一次 .subscribe(onNext: {print($0)}) .disposed(by: disposeBag) } 复制代码
上面的代码咱们注释掉了//.retry()
,这样咱们能够更好的对比结果 输出结果:
实例 125: 代码:
/// retry(_:): 经过从新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数 func test_retry2() { // **** retry(_:): 经过从新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数 print("*****retry(_:)*****") var count = 1 // 外界变量控制流程 let sequenceThatErrors = Observable<String>.create { observer in observer.onNext("kongyulu") observer.onNext("yifeng") observer.onNext("yisheng") if count < 5 { // 这里设置的错误出口是没有太多意义的额,由于咱们设置重试次数 observer.onError(self.kylError) //发送错误消息 print("错误序列来了") count += 1 } //发送错误后,下面的sender都不会打印了 observer.onNext("sender 1") observer.onNext("sender 2") observer.onNext("sender 3") observer.onCompleted() return Disposables.create() } sequenceThatErrors .retry(3) //重复地从错误事件中恢复3次 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
结果:
func test_debug() { // **** debug // 打印全部订阅、事件和处理。 print("*****debug*****") var count = 1 let sequenceThatErrors = Observable<String>.create { observer in observer.onNext("Kongyulu") observer.onNext("yifeng") observer.onNext("yisheng") if count < 5 { observer.onError(self.kylError) print("错误序列来了") count += 1 } observer.onNext("yuhairong") observer.onNext("zhangsiyuan") observer.onNext("kongliyuan") observer.onCompleted() return Disposables.create() } sequenceThatErrors .retry(3) .debug() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } 复制代码
结果:
实例 135: 代码:
/// RxSwift.Resources.total 操做符 func testResourcesTotal() { // ** RxSwift.Resources.total: 提供全部Rx资源分配的计数,这对于在开发期间检测泄漏很是有用。 print("*****RxSwift.Resources.total*****") print(RxSwift.Resources.total) let subject = BehaviorSubject(value: "Cooci") let subscription1 = subject.subscribe(onNext: { print($0) }) print(RxSwift.Resources.total) let subscription2 = subject.subscribe(onNext: { print($0) }) print(RxSwift.Resources.total) subscription1.dispose() print(RxSwift.Resources.total) subscription2.dispose() print(RxSwift.Resources.total) } 复制代码
结果:
实例 140: 代码:
/// multicast func testMulticastConnectOperators(){ // *** multicast : 将源可观察序列转换为可链接序列,并经过指定的主题广播其发射。 print("*****multicast*****") let subject = PublishSubject<Any>() subject.subscribe{print("00:\($0)")} .disposed(by: disposeBag) let netOB = Observable<Any>.create { (observer) -> Disposable in sleep(2)// 模拟网络延迟 print("我开始请求网络了") observer.onNext("请求到的网络数据") observer.onNext("请求到的本地") observer.onCompleted() return Disposables.create { print("销毁回调了") } }.publish() netOB.subscribe(onNext: { (anything) in print("订阅1:",anything) }) .disposed(by: disposeBag) // 咱们有时候不止一次网络订阅,由于有时候咱们的数据可能用在不一样的额地方 // 因此在订阅一次 会出现什么问题? netOB.subscribe(onNext: { (anything) in print("订阅2:",anything) }) .disposed(by: disposeBag) _ = netOB.connect() } 复制代码
结果:
实例 145: 代码:
/// replay func testReplayConnectOperators(){ // **** replay: 将源可观察序列转换为可链接的序列,并将向每一个新订阅服务器重放之前排放的缓冲大小 // 首先拥有和publish同样的能力,共享 Observable sequence, 其次使用replay还须要咱们传入一个参数(buffer size)来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者 print("*****replay*****") let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5) interval.subscribe(onNext: { print(Date.time,"订阅: 1, 事件: \($0)") }) .disposed(by: self.disposeBag) delay(2) { _ = interval.connect() } delay(4) { interval.subscribe(onNext: { print(Date.time,"订阅: 2, 事件: \($0)") }) .disposed(by: self.disposeBag) } delay(8) { interval.subscribe(onNext: { print(Date.time,"订阅: 3, 事件: \($0)") }) .disposed(by: self.disposeBag) } delay(20, closure: { self.disposeBag = DisposeBag() }) /** 订阅: 1, 事件: 4 订阅: 1, 事件: 0 2019-05-28 21-32-42 订阅: 2, 事件: 0 2019-05-28 21-32-42 订阅: 1, 事件: 1 2019-05-28 21-32-42 订阅: 2, 事件: 1 2019-05-28 21-32-45 订阅: 2, 事件: 4 2019-05-28 21-32-46 订阅: 3, 事件: 0 2019-05-28 21-32-46 订阅: 3, 事件: 1 2019-05-28 21-32-46 订阅: 3, 事件: 2 2019-05-28 21-32-46 订阅: 3, 事件: 3 2019-05-28 21-32-46 订阅: 3, 事件: 4 // 序列从 0开始 // 定时器也没有断层 sub2 sub3 和 sub1 是同步的 */ } 复制代码
结果:
实例 150: 代码:
/// push - connect 将源可观察序列转换为可链接序列 func testPushConnectOperators(){ // **** push:将源可观察序列转换为可链接序列 // 共享一个Observable的事件序列,避免建立多个Observable sequence。 // 注意:须要调用connect以后才会开始发送事件 print("*****testPushConnect*****") let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish() interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") }) .disposed(by: disposeBag) delay(2) { _ = interval.connect() } delay(4) { interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") }) .disposed(by: self.disposeBag) } delay(6) { interval.subscribe(onNext: { print("订阅: 3, 事件: \($0)") }) .disposed(by: self.disposeBag) } delay(10, closure: { self.disposeBag = DisposeBag() }) /** 订阅: 1, 事件: 1 订阅: 2, 事件: 1 订阅: 1, 事件: 2 订阅: 2, 事件: 2 订阅: 1, 事件: 3 订阅: 2, 事件: 3 订阅: 3, 事件: 3 订阅: 2 从1开始 订阅: 3 从3开始 */ // 可是后面来的订阅者,却没法获得以前已发生的事件 } 复制代码
结果: