using操做符的官方描述:html
建立一个可被清除的资源,它和 Observable 具备相同的寿命
经过使用 using 操做符建立 Observable 时,同时建立一个可被清除的资源,一旦 Observable 终止了,那么这个资源就会被清除掉了。react
beeth0ven.github.io/RxSwift-Chi…git
using方法是个静态方法,有两个实现:github
供普通讯号源调用(Observable,Relay,ControlProperty等),方法建立了一个私有类型Using,具体内部实现比较复杂,还没开始看Rx,因此还没弄明白……编程
extension ObservableType {
/** Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime. - seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html) - parameter resourceFactory: Factory function to obtain a resource object. - parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource. - returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object. */
public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
}
}
复制代码
供序列信号源调用(Single,Maybe等),方法实现仍是调用了ObservableType扩展中的实现swift
extension PrimitiveSequence {
public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, primitiveSequenceFactory: @escaping (Resource) throws -> PrimitiveSequence<Trait, Element>)
-> PrimitiveSequence<Trait, Element> {
return PrimitiveSequence(raw: Observable.using(resourceFactory, observableFactory: { (resource: Resource) throws -> Observable<Element> in
return try primitiveSequenceFactory(resource).asObservable()
}))
}
}
复制代码
方法接受两个参数:api
这个闭包没有入参,返回类型为Disposable协议对象,该对象就是官方介绍中的可被清除的资源,当第二个参数闭包返回的Observable订阅释放的时候,该对象就会同步调用dispose方法来释放。markdown
这个闭包的入参为resourceFactory闭包的返回的那个Disposable协议对象,闭包的返回值为Observable信号源,该信号源也是using方法返回的信号源,用来给调用方进行订阅处理网络
由于在resourceFactory闭包中建立了一个能够dispose的对象,并且这个对象会做为入参交给observableFactory闭包来处理并最终返回一个信号源来给调用方订阅,那就有不少种使用方法了闭包
咱们先建立一个最简单的demo,使用using来建立一个信号源,该信号源只是简单的发送几个数字,可被清除的资源也只是在dispose的时候,打个log
class TestDisposable: NSObject, Disposable {
func dispose() {
//简单打印下就行
print(String.init(format: "dp: %p dispose", self))
}
deinit {
print("dp 释放")
}
}
复制代码
_ = Observable<Int>.using({
() -> TestDisposable in
let dp = TestDisposable()
print(String.init(format: "建立source: %p", dp))
return dp
}, observableFactory: {
dp in
//并无对dp作处理,只是一样打印下dp信息
print(String.init(format: "建立factory, dp: %p", dp))
// 返回一个直接简单的输出数字的信号源
return Observable.from([1,2,3,4,5]).debug("factory", trimOutput: false)
}).debug("using", trimOutput: false).subscribe()
复制代码
在using方法的resourceFactory闭包中,建立了TestDisposable,并打印了下地址,而后在observableFactory闭包中,一样打印了做为入参传进来的TestDisposable的地址,而后返回了一个发出5个数字的Observable信号源,该信号源会在发送1,2,3,4,5以后发送onComplete,而后出发订阅者的dispose,这里我用debug分别在observableFactory闭包内对建立的Observable进行debug信息打印以及在using返回的信号也进行了debug打印,而后跑起来~
运行结果:
能够看到,先建立了source,而后建立了factory,在factory的订阅取消以后,source也跟着dispose,而后被释放掉了。
上面的简单使用中,咱们使用resourceFactory只是建立了一个很简单的可取消订阅的对象,而且在observableFactory闭包中没有对该对象进行任何处理,那若是咱们返回的是一个复杂的可取消订阅对象,该对象甚至携带有信号源,那么就能够在observableFactory闭包中对信号源进行处理,并返回新的信号源,就能够实现一些高级的操做了,官方使用using的例子就在官方demo:RxExample中:ActivityIndicator,很是牛逼的设计
这是一个信号指示器,发出的信号值为Bool类型,能够给每一个须要监听的信号源绑定一个Token,当信号源订阅完成时,Token也跟着取消订阅,ActivityIndicator中持有一个relay来标记持有的Token个数,当Token个数为0时,ActivityIndicator会发出false,标记没有信号在发送了,当至少有一个Token时,会发出true,能够用作下载的指示器
建立一个延迟的Single信号源来模拟网络请求,而后模拟发送4个不一样时长的请求,使用debug打印这些请求的完成时间,同时使用debug打印ActivityIndicator的状态
// 用来建立模拟请求的闭包, 入参为模拟请求秒数
let testRequestBlock: (Int) -> Single<String> = {
seconds in
return .create(subscribe: {
singler in
print("模拟开始请求\(seconds)s")
// 延迟后发送模拟请求完成
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(seconds), execute: {
singler(.success("模拟\(seconds)请求完成"))
})
return Disposables.create()
})
}
// ActivityIndicator指示器,用来标记请求是否所有完成
let indicator = ActivityIndicator()
// 模拟四个请求
testRequestBlock(1).trackActivity(indicator).debug("req1", trimOutput: false).subscribe()
testRequestBlock(7).trackActivity(indicator).debug("req7", trimOutput: false).subscribe()
testRequestBlock(3).trackActivity(indicator).debug("req3", trimOutput: false).subscribe()
testRequestBlock(5).trackActivity(indicator).debug("req5", trimOutput: false).subscribe()
// 订阅ActivityIndicator,打印状态
indicator.asObservable().subscribe(onNext: {
isRequesting in
print(isRequesting ? "正在请求" : "请求结束")
})
复制代码
执行结果:
能够看到当开始请求时,ActivityIndicator发送信号true,四个请求所有完成时,发送信号false
通常状况下,业务逻辑会是:点击按钮,进行N多信号变换处理,发送请求。因此中间会用到各类操做符,当出现flatMap或者concatMap这样把旧信号源变成新信号源时,trackActivity()方法就要注意调用方法了,若是使用错误的话会监测到错误信号致使状态异常。
简单点信号流:点击按钮,flatMap变换成网络请求,监听这个请求的状态。
let req1 = testRequestBlock(1).debug("req1", trimOutput: false)
// 正确写法
btn1.rx.tap.flatMap({
_ -> Observable<String> in
print("按钮点击")
return req1.trackActivity(indicator)
}).debug("btn1", trimOutput: false).subscribe()
// 错误写法
btn1.rx.tap.flatMap({
_ -> Observable<String> in
print("按钮点击")
return req1.asObservable()
})
.trackActivity(indicator)
.debug("btn1", trimOutput: false).subscribe()
复制代码
正确运行结果:
能够发现,按钮事件的订阅,没有释放,由于按钮事件信号类型是ControlEvent,这种类型的信号特色是:运行在主线程且永远不会完成/错误,所以信号订阅永远不会被释放,flatMap的做用是在按钮点击事件产生后,把信号量换成了新的信号量来给订阅者处理,因此trackActivity()方法须要写在req1以后。
错误运行结果:
对比上面正确结果发现,还没点击按钮,indicator的状态就已经变成正在请求了,是由于错误写法下,indicator监测的是按钮时间信号的状态,所以在按钮subscribe以后就变成了true,由于按钮的信号源订阅永远不会释放,因此就不会打印请求结束
PS: 由于Driver,ControlEvent,Relay这样的信号源,都不会发送complete与error事件,因此他们的订阅者,永远不会自动释放订阅,必须由外部条件来触发dispose,不然会致使内存泄露,因此RxSwift中有DisposeBag对象,用来持有订阅返回的Disposable对象,而后bag被某个对象持有(通常是vc,或者vm)当bag释放时,会对持有的所有Disposable执行一遍dispose就能够避免内存泄露
另一种避免内存泄露的方法就是使用take(count),或者take(until)明确的标明,对于这种信号源我只取我要的几个信号,达到目标后订阅会被自动释放。可是这种方式并不保险,建议仍是使用bag或本身持有Disposable对象来管理订阅释放。
ActivityIndicator的原理就是使用了using方法,为须要监测状态的信号源建立了一个ActivityToken,在建立token时,计数+1,token.dispose时,计数-1,当计数等于0时发送false,大于0时发送true,使用distinctUntilChanged来过滤重复的信号。
// 声明周期跟随source的可释放资源, 会持有source, 持有一个释放方法, 在source取消订阅的时候, 会调用dispose方法
private struct ActivityToken<E> : ObservableConvertibleType, Disposable {
private let _source: Observable<E>
private let _dispose: Cancelable
init(source: Observable<E>, disposeAction: @escaping () -> Void) {
_source = source
_dispose = Disposables.create(with: disposeAction)
}
func dispose() {
_dispose.dispose()
}
func asObservable() -> Observable<E> {
return _source
}
}
// 类型为共享的序列信号源
public class ActivityIndicator : SharedSequenceConvertibleType {
// 信号元素类型为Bool
public typealias Element = Bool
// 信号序列策略为Driver(永不失败,必定在主线程订阅,每次新的订阅,都会把最后一个信号发送一次)
public typealias SharingStrategy = DriverSharingStrategy
// 递归所
private let _lock = NSRecursiveLock()
// 记录监测的还没有完成的信号源个数
private let _relay = BehaviorRelay(value: 0)
// 用来实现SharedSequenceConvertibleType用
private let _loading: SharedSequence<SharingStrategy, Bool>
public init() {
// 建立_loading:relay变换成Driver, 信号量变成Bool值, 过滤重复值
_loading = _relay.asDriver()
.map { $0 > 0 }
.distinctUntilChanged()
}
// 监测信号源
fileprivate func trackActivityOfObservable<Source: ObservableConvertibleType>(_ source: Source) -> Observable<Source.Element> {
// 使用using建立新的信号源
return Observable.using({ () -> ActivityToken<Source.Element> in
// 先个数+1
self.increment()
// 建立token并返回token给后面的observableFactory使用
return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
}) { t in
// 从token中取出source返回
return t.asObservable()
}
}
// 个数+1并让relay发送信号
private func increment() {
_lock.lock()
_relay.accept(_relay.value + 1)
_lock.unlock()
}
// 个数-1并让relay发送信号
private func decrement() {
_lock.lock()
_relay.accept(_relay.value - 1)
_lock.unlock()
}
// 实现SharedSequenceConvertibleType协议
public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
return _loading
}
}
extension ObservableConvertibleType {
// 给信号源类型扩展, 添加监测方法
public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<Element> {
// 用indicator监测self,
return activityIndicator.trackActivityOfObservable(self)
}
}
复制代码
using方法的核心就是建立了一个能够dispose的对象,绑定给信号源,在该信号源订阅完成时,把dispose对象一块儿给dispose掉。
而ActivityIndicator则是巧妙的使用须要检测的信号源A来建立token,使用token封装A,而后再从token中取出A做为using方法的返回,信号源在调用方法先后,内部信号量未改变,只是对整个信号进行了封装,而RxSwift整个框架都是使用block来对信号源进行各类封装,因此每次调用操做符(filter,map等)都是会对当前信号源进行封装,返回一个新的信号源对象,整个对象是新的,可是内部的信号量却使用了block来过滤处理。
就像是水管: 初始的信号源时水龙头,每个操做符,都至关于为这个水龙头接上了一根根管道,这些管道有的能够过滤,有的能够变颜色,有的甚至把水接到以后,喝掉并放出了新的水(๑´ㅂ`๑)。种种变换以后最终获得了咱们想要的结果。
最终的订阅者彻底不用关注最初的信号源是什么类型,也不关注中间如何变换,只关心本身接收的数据类型正确便可。中间逻辑调整时,只要拆掉就的管道,换上新的管道,便可轻松改变逻辑。这就是函数链式编程的好处。