RxSwift使用心得(一):using操做符与ActivityIndicator

using操做符

简介

using操做符的官方描述:html

建立一个可被清除的资源,它和 Observable 具备相同的寿命
经过使用 using 操做符建立 Observable 时,同时建立一个可被清除的资源,一旦 Observable 终止了,那么这个资源就会被清除掉了。react

beeth0ven.github.io/RxSwift-Chi…git

image.png

using方法是个静态方法,有两个实现:github

  1. 在ObservableType协议扩展中实现

供普通讯号源调用(Observable,Relay,ControlProperty等),方法建立了一个私有类型Using,具体内部实现比较复杂,还没开始看Rx,因此还没弄明白……编程

extension ObservableType {
    /** Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime. - seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html) - parameter resourceFactory: Factory function to obtain a resource object. - parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource. - returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object. */
    public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
        return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    }
}
复制代码
  1. 在PrimitiveSequence结构体扩展中实现

供序列信号源调用(Single,Maybe等),方法实现仍是调用了ObservableType扩展中的实现swift

extension PrimitiveSequence {
    public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, primitiveSequenceFactory: @escaping (Resource) throws -> PrimitiveSequence<Trait, Element>)
        -> PrimitiveSequence<Trait, Element> {
            return PrimitiveSequence(raw: Observable.using(resourceFactory, observableFactory: { (resource: Resource) throws -> Observable<Element> in
                return try primitiveSequenceFactory(resource).asObservable()
            }))
    }
}
复制代码

方法做用

方法接受两个参数:api

  1. resourceFactory闭包

这个闭包没有入参,返回类型为Disposable协议对象,该对象就是官方介绍中的可被清除的资源,当第二个参数闭包返回的Observable订阅释放的时候,该对象就会同步调用dispose方法来释放。markdown

  1. observableFactory闭包

这个闭包的入参为resourceFactory闭包的返回的那个Disposable协议对象,闭包的返回值为Observable信号源,该信号源也是using方法返回的信号源,用来给调用方进行订阅处理网络

由于在resourceFactory闭包中建立了一个能够dispose的对象,并且这个对象会做为入参交给observableFactory闭包来处理并最终返回一个信号源来给调用方订阅,那就有不少种使用方法了闭包

简单的示例

咱们先建立一个最简单的demo,使用using来建立一个信号源,该信号源只是简单的发送几个数字,可被清除的资源也只是在dispose的时候,打个log

  1. Disposable协议表示某个能够被释放的资源,只有一个dispose方法,RxSwift中并无供外部使用的默认实现,所以咱们须要本身定义一个TestDisposable:
class TestDisposable: NSObject, Disposable {
    func dispose() {
        //简单打印下就行
        print(String.init(format: "dp: %p dispose", self))
    }
    
    deinit {
        print("dp 释放")
    }
}
复制代码
  1. 而后使用using建立信号源,该信号源将持有TestDisposable并在订阅取消的时候同时释放Testdisposable:
_ = Observable<Int>.using({
    () -> TestDisposable in
    let dp = TestDisposable()
    print(String.init(format: "建立source: %p", dp))
    return dp
}, observableFactory: {
    dp in
    //并无对dp作处理,只是一样打印下dp信息
    print(String.init(format: "建立factory, dp: %p", dp))
    // 返回一个直接简单的输出数字的信号源
    return Observable.from([1,2,3,4,5]).debug("factory", trimOutput: false)
}).debug("using", trimOutput: false).subscribe()
复制代码

在using方法的resourceFactory闭包中,建立了TestDisposable,并打印了下地址,而后在observableFactory闭包中,一样打印了做为入参传进来的TestDisposable的地址,而后返回了一个发出5个数字的Observable信号源,该信号源会在发送1,2,3,4,5以后发送onComplete,而后出发订阅者的dispose,这里我用debug分别在observableFactory闭包内对建立的Observable进行debug信息打印以及在using返回的信号也进行了debug打印,而后跑起来~

运行结果

image.png

能够看到,先建立了source,而后建立了factory,在factory的订阅取消以后,source也跟着dispose,而后被释放掉了。

进阶使用

上面的简单使用中,咱们使用resourceFactory只是建立了一个很简单的可取消订阅的对象,而且在observableFactory闭包中没有对该对象进行任何处理,那若是咱们返回的是一个复杂的可取消订阅对象,该对象甚至携带有信号源,那么就能够在observableFactory闭包中对信号源进行处理,并返回新的信号源,就能够实现一些高级的操做了,官方使用using的例子就在官方demo:RxExample中:ActivityIndicator,很是牛逼的设计

ActivityIndicator

简介

这是一个信号指示器,发出的信号值为Bool类型,能够给每一个须要监听的信号源绑定一个Token,当信号源订阅完成时,Token也跟着取消订阅,ActivityIndicator中持有一个relay来标记持有的Token个数,当Token个数为0时,ActivityIndicator会发出false,标记没有信号在发送了,当至少有一个Token时,会发出true,能够用作下载的指示器

使用

建立一个延迟的Single信号源来模拟网络请求,而后模拟发送4个不一样时长的请求,使用debug打印这些请求的完成时间,同时使用debug打印ActivityIndicator的状态

// 用来建立模拟请求的闭包, 入参为模拟请求秒数
let testRequestBlock: (Int) -> Single<String> = {
    seconds in
    return .create(subscribe: {
        singler in
        print("模拟开始请求\(seconds)s")
        // 延迟后发送模拟请求完成
        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(seconds), execute: {
            singler(.success("模拟\(seconds)请求完成"))
        })
        return Disposables.create()
    })
}

// ActivityIndicator指示器,用来标记请求是否所有完成
let indicator = ActivityIndicator()

// 模拟四个请求
testRequestBlock(1).trackActivity(indicator).debug("req1", trimOutput: false).subscribe()
testRequestBlock(7).trackActivity(indicator).debug("req7", trimOutput: false).subscribe()
testRequestBlock(3).trackActivity(indicator).debug("req3", trimOutput: false).subscribe()
testRequestBlock(5).trackActivity(indicator).debug("req5", trimOutput: false).subscribe()

// 订阅ActivityIndicator,打印状态
indicator.asObservable().subscribe(onNext: {
    isRequesting in
    print(isRequesting ? "正在请求" : "请求结束")
})
复制代码

执行结果:

image.png

能够看到当开始请求时,ActivityIndicator发送信号true,四个请求所有完成时,发送信号false

配合信号流使用

通常状况下,业务逻辑会是:点击按钮,进行N多信号变换处理,发送请求。因此中间会用到各类操做符,当出现flatMap或者concatMap这样把旧信号源变成新信号源时,trackActivity()方法就要注意调用方法了,若是使用错误的话会监测到错误信号致使状态异常。

简单点信号流:点击按钮,flatMap变换成网络请求,监听这个请求的状态。

let req1 = testRequestBlock(1).debug("req1", trimOutput: false)

// 正确写法
btn1.rx.tap.flatMap({
    _ -> Observable<String> in
    print("按钮点击")
    return req1.trackActivity(indicator)
}).debug("btn1", trimOutput: false).subscribe()

// 错误写法
btn1.rx.tap.flatMap({
    _ -> Observable<String> in
    print("按钮点击")
    return req1.asObservable()
})
.trackActivity(indicator)
.debug("btn1", trimOutput: false).subscribe()
复制代码

正确运行结果:

image.png

能够发现,按钮事件的订阅,没有释放,由于按钮事件信号类型是ControlEvent,这种类型的信号特色是:运行在主线程且永远不会完成/错误,所以信号订阅永远不会被释放,flatMap的做用是在按钮点击事件产生后,把信号量换成了新的信号量来给订阅者处理,因此trackActivity()方法须要写在req1以后。

错误运行结果:

image.png

对比上面正确结果发现,还没点击按钮,indicator的状态就已经变成正在请求了,是由于错误写法下,indicator监测的是按钮时间信号的状态,所以在按钮subscribe以后就变成了true,由于按钮的信号源订阅永远不会释放,因此就不会打印请求结束

PS: 由于Driver,ControlEvent,Relay这样的信号源,都不会发送complete与error事件,因此他们的订阅者,永远不会自动释放订阅,必须由外部条件来触发dispose,不然会致使内存泄露,因此RxSwift中有DisposeBag对象,用来持有订阅返回的Disposable对象,而后bag被某个对象持有(通常是vc,或者vm)当bag释放时,会对持有的所有Disposable执行一遍dispose就能够避免内存泄露
另一种避免内存泄露的方法就是使用take(count),或者take(until)明确的标明,对于这种信号源我只取我要的几个信号,达到目标后订阅会被自动释放。可是这种方式并不保险,建议仍是使用bag或本身持有Disposable对象来管理订阅释放。

原理

ActivityIndicator的原理就是使用了using方法,为须要监测状态的信号源建立了一个ActivityToken,在建立token时,计数+1,token.dispose时,计数-1,当计数等于0时发送false,大于0时发送true,使用distinctUntilChanged来过滤重复的信号。

// 声明周期跟随source的可释放资源, 会持有source, 持有一个释放方法, 在source取消订阅的时候, 会调用dispose方法
private struct ActivityToken<E> : ObservableConvertibleType, Disposable {
    private let _source: Observable<E>
    private let _dispose: Cancelable

    init(source: Observable<E>, disposeAction: @escaping () -> Void) {
        _source = source
        _dispose = Disposables.create(with: disposeAction)
    }

    func dispose() {
        _dispose.dispose()
    }

    func asObservable() -> Observable<E> {
        return _source
    }
}

// 类型为共享的序列信号源
public class ActivityIndicator : SharedSequenceConvertibleType {
    // 信号元素类型为Bool
    public typealias Element = Bool
    // 信号序列策略为Driver(永不失败,必定在主线程订阅,每次新的订阅,都会把最后一个信号发送一次)
    public typealias SharingStrategy = DriverSharingStrategy

    // 递归所
    private let _lock = NSRecursiveLock()
    // 记录监测的还没有完成的信号源个数
    private let _relay = BehaviorRelay(value: 0)
    // 用来实现SharedSequenceConvertibleType用
    private let _loading: SharedSequence<SharingStrategy, Bool>

    public init() {
        // 建立_loading:relay变换成Driver, 信号量变成Bool值, 过滤重复值
        _loading = _relay.asDriver()
            .map { $0 > 0 }
            .distinctUntilChanged()
    }

    // 监测信号源
    fileprivate func trackActivityOfObservable<Source: ObservableConvertibleType>(_ source: Source) -> Observable<Source.Element> {
        // 使用using建立新的信号源
        return Observable.using({ () -> ActivityToken<Source.Element> in
            // 先个数+1
            self.increment()
            // 建立token并返回token给后面的observableFactory使用
            return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
        }) { t in
            // 从token中取出source返回
            return t.asObservable()
        }
    }

    // 个数+1并让relay发送信号
    private func increment() {
        _lock.lock()
        _relay.accept(_relay.value + 1)
        _lock.unlock()
    }

    // 个数-1并让relay发送信号
    private func decrement() {
        _lock.lock()
        _relay.accept(_relay.value - 1)
        _lock.unlock()
    }

    // 实现SharedSequenceConvertibleType协议
    public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
        return _loading
    }
}

extension ObservableConvertibleType {
    // 给信号源类型扩展, 添加监测方法
    public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<Element> {
        // 用indicator监测self,
        return activityIndicator.trackActivityOfObservable(self)
    }
}

复制代码

生命周期

  1. ActivityIndicator初始化,_relay个数为0,状态为false
  2. 某个信号源A调用trackActivity方法,准备监测
  3. ActivityIndicator调用trackActivityOfObservable方法,使用using方法,先对_relay+1,而后建立ActivityToken,ActivityToken建立时,持有了A,而且使用ActivityIndicator的-1方法做为闭包参数建立Disposables对象。
  4. 建立ActivityToken完成,接着using调用observableFactory方法,把token持有的resource(信号源A)做为结果返回,所以信号源A经过调用trackActivity方法以后,返回的信号源的信号量与A一致,只不过通过了层层封装
  5. 此时ActivityIndicator的_relay个数为1,状态为true
  6. A完成,订阅释放
  7. ActivityToken触发dispose方法,调用ActivityIndicator的-1方法,而后ActivityToken释放
  8. 此时ActivityIndicator的_relay个数为0,状态为false

总结

using方法的核心就是建立了一个能够dispose的对象,绑定给信号源,在该信号源订阅完成时,把dispose对象一块儿给dispose掉。

而ActivityIndicator则是巧妙的使用须要检测的信号源A来建立token,使用token封装A,而后再从token中取出A做为using方法的返回,信号源在调用方法先后,内部信号量未改变,只是对整个信号进行了封装,而RxSwift整个框架都是使用block来对信号源进行各类封装,因此每次调用操做符(filter,map等)都是会对当前信号源进行封装,返回一个新的信号源对象,整个对象是新的,可是内部的信号量却使用了block来过滤处理。

就像是水管: 初始的信号源时水龙头,每个操做符,都至关于为这个水龙头接上了一根根管道,这些管道有的能够过滤,有的能够变颜色,有的甚至把水接到以后,喝掉并放出了新的水(๑´ㅂ`๑)。种种变换以后最终获得了咱们想要的结果。

最终的订阅者彻底不用关注最初的信号源是什么类型,也不关注中间如何变换,只关心本身接收的数据类型正确便可。中间逻辑调整时,只要拆掉就的管道,换上新的管道,便可轻松改变逻辑。这就是函数链式编程的好处。

相关文章
相关标签/搜索