RxJava介绍2:基本结构和使用场景

RxJava构成

简述

RxJava = Observable + Operator + Scheduler + Observer?html

Observable
    .fromArray(1, 2, 3, 4)
    .map { it * 5 }
    .filter { it -> it > 10 }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { println(it) }
复制代码


截屏2021-05-10 上午11.13.55.png

Observable*

可观察数据源有哪些特征,

  • Usually do work when you start or stop listening
  • Synchronous or asynchronous
  • Single item,many items,or empty
  • Terminates with an error or succeeds to completion
  • May never terminate!
  • Just an implemention of the Observer pattern

GOTO 2016 • Exploring RxJava 2 for Android • Jake Whartonjava

哪些可称为可观察数据源

  • View事件(触摸,点击..)
  • 网络请求返回的数据
  • 查询数据库返回的数据
  • 手机传感器通知数据(GPS,电量)
  • 闹钟定时器通知...

如何建立建Observable

  • Observable.just("Hello")
  • Observable.fromArray("Hello","Hello2")
  • Observable.fromCallable{"Hello"}
  • Observable.create{ it.onNext("Hello")}
  • Observable.Interval(200)
Observable.create(new ObservableOnSubscribe<String>() {
	@Override
	public void subscribe(ObservableEmitter<String> emitter)  {
		emitter.onNext("Hello");
		emitter.onComplete();
	}
})
复制代码
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return "Hello";
    }
});
复制代码

同时在Android使用场景中:react

  • RxView.clicks(view) : Observable
  • login(@Body LoginReqParams) : Observable
  • RxPermissions.request(READ_PHONE_STATE) : Observable
  • Operator 操做符

    介绍

    ReactiveX的每种特定语言实现都实现了一组操做符。大体分为如下分类:
    数据库

    • Creating Observables : create,just,from
    • Transforming Observables :Map,FlatMap
    • Filtering Observables:Filter,Debouncee,Distinct,Skip
    • Combining Observables : Merge,Zip,CombineLatest
    • Error Handling Operators: Catch,Retry
    • Observable Utility Operators: Delay,TimeInterval,ObserveOn,SubscribeOn
    • Conditional and Boolean Operators : All,Contains,TkeWhile
    • Mathematical and Aggregate Operators : Average,Count,Max
    • Connectable Observable Operators : Connect,Replay
    • Operators to Convert Observables : To

    map

    image.png

    fliter

    image.png

    Scheduler 线程调度

    subscribeOn

    指定订阅过程运行的线程,同时控制了Observable,Observer。
    咱们先看一段代码运行结果。windows

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	//.subscribeOn(Schedulers.newThread())
        .subscribe { println("in next :${Thread.currentThread()} $it") }
    
    //运行结果
    in main:Thread[main,5,main]
    in create:Thread[main,5,main]
    in next  :Thread[main,5,main] 5
    复制代码

    加上subscribeOn以后,发现subscribeOn的上游和下游运行线程都发生了变化。markdown

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	.subscribeOn(Schedulers.newThread())
        .subscribe { println("in next :${Thread.currentThread()} $it") }
    
    //运行结果
    in main:Thread[main,5,main]
    in create:Thread[RxNewThreadScheduler-1,5,main]
    in next  :Thread[RxNewThreadScheduler-1,5,main] 5
    复制代码

    image.png

    observeOn

    指定观察者所在线程。
    咱们先看一段代码运行结果。网络

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	//.observeOn(Schedulers.newThread())
        .subscribe { println("int next :${Thread.currentThread()} $it") }
    
    //运行结果
    in main:Thread[main,5,main]
    in create:Thread[main,5,main]
    in next  :Thread[main,5,main] 5
    复制代码

    加上observeOn以后,发现observeOn下游运行线程都发生了变化。app

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	.observeOn(Schedulers.newThread())
        .subscribe { println("in next :${Thread.currentThread()} $it") }
    //运行结果
    in main:Thread[main,5,main]
    in create:Thread[main,5,main]
    in next  :Thread[RxNewThreadScheduler-1,5,main] 5
    复制代码


    image.png
    特别的是:
    image.png async

    Observer 观察者

    ide

    APP中使用场景

    列举目前咱们App中使用到RxJava的场景。

    网络请求:Retrofit+RxJava

    @POST("/content/user/info")
    Observable<BaseResponse<UserInfo>> userInfo_Ob(@Body UserInfoReq req);
    复制代码

    这里有一段咱们使用操做符的代码,app初始化时同步服务端数据

    //app初始化时同步服务端数据,
    fun checkRequiredObservable(): Observable<PersonRequiredInfo> {
        	//后续须要的我的信息
            var personRequiredInfo = PersonRequiredInfo()
            val allObservables = arrayListOf(
                    ...
                	//一系列独立的请求
                    appTabSetting(),//底部tab配置
                	userInfoObservable(),//用户信息
                    queryBindConfig(),//用户绑定帐号信息
                    syncBabyInfoObservable(userId) //同步宝宝信息
                    ...
            )
             var zipObservable =  Observable.zip(allObservables) { 
                 return@zip personRequiredInfo 
             }
            return apolloAppConfig()//先获取apollo配置信息,同时保存配置
                    .flatMap {zipObservable } //并行上面一系列请求
                    .onErrorReturn { personRequiredInfo }
                    .doOnNext {registerUserSuperProperties()}
                    .compose(RxHelper.io2MainThread())
    }
    private fun syncBabyInfoObservable(): Observable<Any> {
            return getBabyInfoObservable() //先获取服务端宝宝信息
                .flatMap { updateBabyInfoObservable() } //本地与服务端比较,若是须要上传则上传
                .flatMap { getBabyInfoObservable().map { Any() } }//再次获取同步后的宝宝信息
                .onErrorReturn { Any()}
    }
    复制代码

    现有接口情况的状况下,很难想象,若是没有RxJava我该如何组合这些请求。

    截屏2021-05-10 下午4.27.31.png

    防抖(debounce)与节流(throttle)

    debounce

    Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.

    throttleFirst

    Returns an Observable that emits only the first item emitted by the source ObservableSource during sequential time windows of a specified duration.

    View重复点击

    RxView.clicks(container)
        .throttleFirst(800,TimeUnit.MILLISECONDS)
        .subscribe { onclick() }
    复制代码

    搜索防抖

    RxTextView.textChanges(etSearch)
        .debounce(searchDebounceTime, TimeUnit.MILLISECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { goSearch(chars) }
    复制代码

    页面刷新频率控制

    class RefreshDebounce {
        private var rxEmitter: ObservableEmitter<Long>? = null
        private var observable = Observable
            .create<Long> { rxEmitter = it }
            .debounce(1000L, TimeUnit.MILLISECONDS)
    
        constructor(consumer: (Long) -> Unit? ){
            observable.subscribe { consumer.invoke(it) }
        }
        fun callRefresh() = rxEmitter?.onNext(System.currentTimeMillis())
    }
    
    //使用
    var refreshDebounce = RefreshDebounce { println("refresh:$it") }
    refreshDebounce.callRefresh()
    复制代码

    定时/延时执行

    private void startCountDown15Min() {
        countDown15MinDisposable = Observable.timer(900, TimeUnit.SECONDS)
               subscribe(aLong -> Log.i(TAG, "倒计时时间到"));
    }
    
    private void cancelCountDown15Min() {
        if (countDown15MinDisposable != null && !countDown15MinDisposable.isDisposed()) {
            countDown15MinDisposable.dispose();
        }
    }
    
    复制代码

    系统权限申请

    new RxPermissions(activity)
    	.request(Manifest.permission.READ_PHONE_STATE)
        .subscribe(aBoolean -> Log.d(TAG,"result:"+aBoolean)
    复制代码