不讲 rxjava 和 retrofit 而是直接上手 2 了,由于 2 封装的更好用的更多。java
常见的 button 点击事件为例,button 是被观察者,listener 是观察者,setOnClickListener 过程是订阅,有了订阅关系后在 button 被点击的时候,监听者 listener 就能够响应事件。 react
这里的button.setOnClickListener(listener)
看上去意思是被观察者订阅了观察者(杂志订阅了读者),逻辑上不符合平常生活习惯。其实这是设计模式的习惯,没必要纠结,习惯了这种模式就利于理解观察者模式了。数据库
Observable
:被观察者(ble 结尾的单词通常表示 可...的,可观察的)Observer
:观察者(er 结尾的单词通常表示 ...者,...人)subscribe
:订阅首先建立 Observable 和 Observer,而后 observable.subscribe(observer)
,这样 Observable 发出的事件就会被 Observer 响应。通常咱们不手动建立 Observable,而是由 Retrofit 返回给咱们,咱们拿到 Observable 以后只需关心如何操做 Observer 中的数据便可。
不过为了由浅入深的演示,仍是手动建立 Observable 来说解。json
常见的几种方式,不经常使用的不写了,由于我以为这个模块不是重点。设计模式
var observable=Observable.create(ObservableOnSubscribe<String> {...})
var observable=Observable.just(...)
var observable = Observable.fromIterable(mutableListOf(...))
var observable2=Observable.create(object :ObservableOnSubscribe<String>{ override fun subscribe(emitter: ObservableEmitter<String>) { emitter.onNext("Hello ") emitter.onNext("RxJava ") emitter.onNext("GoodBye ") emitter.onComplete() } })
ObservableOnSubscribe
和ObservableEmitter
都是陌生人,这个要是详细讲涉及到源码分析,东西可就多了(主要是我不熟悉),因此能够理解成 ObservableOnSubscribe 是用来帮助建立 Observable 的,ObservableEmitter 是用来发出事件的(这些事件在观察者 Observer 中能够响应处理)。
emitter 一次发射了三个事件,而后调用了 onComplete() 这些在下面讲观察者 Observer 时还会提到,一并讲解。api
var observable=Observable.just("Hello","RxJava","GoodBye")
这句的效果等同于上面用 create 建立 observable,即 调用 3 次 onNext 后再调 onComplete。数组
var observable = Observable.fromIterable(mutableListOf("Hello","RxJava","GoodBye"))
这句的效果等同于上面用 create 建立 observable,即 调用 3 次 onNext 后再调 onComplete。网络
val observer = object : Observer<String> { override fun onComplete() { Log.e("abc", "-----onComplete-----") } override fun onSubscribe(d: Disposable) { Log.e("abc", "-----onSubscribe-----") } override fun onNext(t: String) { Log.e("abc", "-----onNext-----$t") } override fun onError(e: Throwable) { Log.e("abc", "-----onError-----$e") } } //订阅 observable.subscribe(observer)
log 打印状况:app
-----onSubscribe----- -----onNext-----Hello -----onNext-----RxJava -----onNext-----GoodBye -----onComplete-----
能够看到,先是创建订阅关系,而后根据前面 observable 的发射顺序来打印 onNext,参数经过 onNext(t: String) 传进来,最后调用 onComplete,多说一句,在 just 和 fromIterable 的状况下,没有手动调用 Emitter,可是仍会先调用 onNext,最后调用 onComplete框架
这两个词意思分别是消费者(能够理解为消费被观察者发射出来的事件)和行为(能够理解为响应被观察者的行为)。对于 Observer 中的 4 个回调方法,咱们未必都能用获得,若是只须要用到其中的一部分,就须要 Consumer 和 Action 上场了。
有参数的onSubscribe
、onNext
、onError
咱们用 Consumer 来代替,无参的onComplete
用 Action 代替:
observable.subscribe(object :Consumer<String>{ override fun accept(t: String?) { Log.e("abc", "-----onNext-----$t") } }) //打印 -----onNext-----Hello -----onNext-----RxJava -----onNext-----GoodBye
说明一下,若是 subscribe 中咱们只传一个对象参数,那只能是subscribe(Consumer<? super T> onNext)
(onNext 方法),不能是 Action 或 Consumer<? super Throwable> onError、Consumer<? super Disposable> onSubscribe
==注意==:Consumer 中的回调方法名称是 accept,区别于前面的 onNext
带有两个 Consumer 参数,分别负责 onNext 和 onError 的回调。
observable.subscribe(object : Consumer<String> { override fun accept(t: String?) { Log.e("abc", "-----onNext-----$t") } }, object : Consumer<Throwable> { override fun accept(t: Throwable?) { Log.e("abc", "-----onError-----$e") } })
若是想要一个带有两个 Consumer 可是不是这种搭配(好比subscribe(Consumer<? super T> onNext, Consumer<? super Disposable> onSubscribe)
),能够吗?答案是:不行
带有三个参数,分别负责onNext、onError和onComplete的回调。
observable.subscribe(object : Consumer<String> { override fun accept(t: String?) { Log.e("abc", "-----onNext-----$t") } }, object : Consumer<Throwable> { override fun accept(t: Throwable?) { Log.e("abc", "-----onError-----$e") } }, object : Action { override fun run() { Log.e("abc", "-----onComplete-----") } })
一样,三个参数只能有这一种搭配
==注意==:Action 中的回调方法名称是 run,区别于前面的 onComplete
这种状况和直接 new 出来的 Observer 效果同样。
observable2.subscribe(object : Consumer<String> { override fun accept(t: String?) { Log.e("abc", "-----onNext-----$t") } }, object : Consumer<Throwable> { override fun accept(t: Throwable?) { Log.e("abc", "-----onError-----$e") } }, object : Action { override fun run() { Log.e("abc", "-----onComplete-----") } },object : Consumer<Disposable>{ override fun accept(t: Disposable?) { Log.e("abc", "-----onSubscribe-----") } })
在上面的例子中,Observable 发送的都是 String 类型的数据,因此在 Observer 中接收的也都是 String,现实开发中的数据多种多样,并且有时候 Observable 提供的数据不是咱们理想的状况,这种状况下就须要用到转换操做符。
一样咱们只讲经常使用的:
好比咱们想把上游的 Int 类型的数据转换成 String 能够这样操做:
Observable.fromIterable(mutableListOf<Int>(1, 3, 5, 7, 8)) .map(object : Function<Int, String> { override fun apply(t: Int): String { return "zb$t" } }) .subscribe(object : Consumer<String> { override fun accept(t: String?) { Log.e("abc","-- $t --") } }) //Log日志 -- zb1 -- -- zb3 -- -- zb5 -- -- zb7 -- -- zb8 --
经过map
操做符,Int 类型数据,到 Consumer 里已经成了 String(这里为了简单的只看数据就没用 Observer 而改用 Consumer,二者均可以)。这里面用到了Function
,它的第一个泛型是 Observable 中发射的数据类型,第二个泛型是咱们想要装换以后的数据类型,在 Function 的 apply 方法中手动完成数据的转化。
示意图:map 把圆的变成了方的。
与 map 类似,不过 flatMap 返回的是一个 Observable,也就是说 Function 的第二个泛型固定了,就是 Observable,这样说不太好理解,看个例子:
假如如今有多个学生,每一个学生有多个科目,每一个科目考了屡次试,如今要打印全部的分数。单单只用 map 就不能直接搞定,试试吧
class Course(var name: String, var scores: MutableList<Int>) class Student(var name: String, var courses: MutableList<Course>) var stu1Course1 = Course("体育",mutableListOf(80, 81, 82)) var stu1Course2 = Course("美术",mutableListOf(63, 62, 60)) var stu1 = Student("StuA", mutableListOf(stu1Course1, stu1Course2)) var stu2Course1 = Course("音乐",mutableListOf(74, 77, 79)) var stu2Course2 = Course("希腊语",mutableListOf(90, 90, 91)) var stu2 = Student("StuB", mutableListOf(stu2Course1, stu2Course2)) Observable.just(stu1,stu2) .map(object :Function<Student,MutableList<Course>>{ override fun apply(t: Student): MutableList<Course> { return t.courses } }) .subscribe(object :Consumer<MutableList<Course>>{ override fun accept(t: MutableList<Course>?) { for (item in t!!){ for (i in item.scores){ Log.e("abc","--->$i") } } } })
经过两层 for 循环能够打印,这也是没办法的事,由于在 map 里面只能拿到 Course 集合。使用 flatMap 的状况是这样的:
Observable.just(stu1, stu2) .flatMap(object : Function<Student, ObservableSource<Course>> { override fun apply(t: Student): ObservableSource<Course> { return Observable.fromIterable(t.courses) } }) .flatMap(object : Function<Course, ObservableSource<Int>> { override fun apply(t: Course): ObservableSource<Int> { return Observable.fromIterable(t.scores) } }) .subscribe(object : Consumer<Int> { override fun accept(t: Int?) { Log.e("abc", "---> $t") } }) // log 打印 ---> 80 ---> 81 ---> 82 ---> 63 ---> 62 ---> 60 ---> 74 ---> 77 ---> 79 ---> 90 ---> 90 ---> 91
用了两次 flatMap,链式调用比缩进式更清晰。这里面的 flatMap 返回值类型的是 ObservableSource 并非咱们在前面提到的 Observable,查看 Observable 源码能够看到,它继承了 ObservableSource,因此这种多态用法是能够的。
另外在 apply 中返回的Observable.fromIterable(t.courses)
这一句不就是咱们建立 Observable 的方式吗?
简单的说,map 是把 Observable 发射的数据变换一下类型,flatMap 是把数据中集合/数组中的每一个元素再次经过 Observable 发射。
示意图:faltMap 把一系列圆的经过一系列的 Observable 变成了一系列方的。
图虽然画的丑,可是我想意思比较明白了。
filter
是过滤的意思,经过判断是否符合咱们想要的逻辑,来决定是否发射事件,只有返回 true 的事件才被发射,其余的被抛弃。还以上面的例子为例,假如咱们只想看 80 分以上的成绩能够这样过滤:
Observable.just(stu1, stu2) .flatMap(object : Function<Student, ObservableSource<Course>> { override fun apply(t: Student): ObservableSource<Course> { return Observable.fromIterable(t.courses) } }) .flatMap(object : Function<Course, ObservableSource<Int>> { override fun apply(t: Course): ObservableSource<Int> { return Observable.fromIterable(t.scores) } }) .filter(object :Predicate<Int>{ override fun test(t: Int): Boolean { return t > 80 } }) .subscribe(object : Consumer<Int> { override fun accept(t: Int?) { Log.e("abc", "---> $t") } }) // log 打印 ---> 81 ---> 82 ---> 90 ---> 90 ---> 91
注意,filter 里面不是用 Function 了,而是 Predicate,这个单词是“基于...”的意思,基于 t > 80,也就是选择大于 80 分的成绩。
前面 3 小节讲了不少,都是为了讲清楚 RxJava 的整个工做流程,还没涉及到线程切换。现实开发中更多的时候 Observable 是经过 Retrofit 返回给咱们的。Retrofit 是一个网络请求框架,它基于 OkHttp3,作了更好的封装,结合 RxJava 用惯了的话能够大大提到开发效率。仍是同样,咱们只看怎么用,不涉及源码解读。
implementation 'com.squareup.retrofit2:retrofit:2.6.2' implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
先引入依赖,而后咱们请求一个知乎日报的新闻数据(点击查看数据:https://news-at.zhihu.com/api...:
// ZhEntity class ZhEntity { var date: String? = null var stories: MutableList<StoriesBean>? = null var top_stories: MutableList<TopStoriesBean>? = null class StoriesBean { var image_hue: String? = null var title: String? = null var url: String? = null var hint: String? = null var ga_prefix: String? = null var type: Int = 0 var id: Int = 0 var images: MutableList<String>? = null } class TopStoriesBean { var image_hue: String? = null var hint: String? = null var url: String? = null var image: String? = null var title: String? = null var ga_prefix: String? = null var type: Int = 0 var id: Int = 0 } }
// ApiService import retrofit2.Call import retrofit2.http.GET import retrofit2.http.Url interface ApiService { @GET("news/latest") fun getLatestNews(): Call<ZhEntity> }
// 调用 val retrofit = Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .baseUrl("https://news-at.zhihu.com/api/4/") .build() val service: ApiService = retrofit.create(ApiService::class.java) val call: Call<ZhEntity> = service.getLatestNews() call.enqueue(object : Callback<ZhEntity> { override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) { Log.e("abc", "--> $t") } override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) { Log.e("abc", "-->${Gson().toJson(response?.body())}") } })
代码有点多,分别解释一下,ZhEntity 是实体类,ApiService 是一个接口,里面用注解的方式定义了一个方法 getLatestNews,@GET
表示 Get 请求,由此能够想象确定有@POST
,@GET
里面还有参数,这是请求地址 BaseUrl 后面的子文件夹。
getLatestNews 函数返回类型是 Call,这个是 Retrofit 定义用来请求网络的。
第三段代码,现实建立了一个 Retrofit 对象,addConverterFactory(GsonConverterFactory.create())
是把接口返回的 json 类型的数据转换成实体类的类型,这个东西在implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
时被引入。
而后是一系列的 Call 调用 qnqueue 操做什么的,看得出,没有用 Rxjava 同样能够完成网络请求,并且代码不复杂,好了,本文到此结束。
好吧,我在扯淡。继续讲,有人说不喜欢 url 被截成两段,能够这样修改,效果彻底相同:
// ApiService import retrofit2.Call import retrofit2.http.GET import retrofit2.http.Url interface ApiService { @GET fun getLatestNews(@Url url:String): Call<ZhEntity> }
// 调用 val retrofit = Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .baseUrl("https://www.baidu.com") .build() val service: ApiService = retrofit.create(ApiService::class.java) val call: Call<ZhEntity> = service.getLatestNews("https://news-at.zhihu.com/api/4/news/latest") call.enqueue(object : Callback<ZhEntity> { override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) { Log.e("abc", "--> $t") } override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) { Log.e("abc", "-->${Gson().toJson(response?.body())}") } })
baseUrl 仍是要的,不过设置成其余值无所谓了,由于不会被请求。
啰嗦了这么多,才讲到这里。抱歉水平有限,没办法用简单的语言说清复杂的问题。
首先,引入依赖时多加一句对 RxJava 的支持:
implementation 'com.squareup.retrofit2:retrofit:2.6.2' implementation 'com.squareup.retrofit2:converter-gson:2.6.2' implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.2'
而后,咱们的 getLatestNews 就能够直接返回一个 Observable 了!
import io.reactivex.Observable import retrofit2.http.GET interface ApiService { @GET("news/latest") fun getLatestNews(): Observable<ZhEntity> }
放心写,不会报错,有了 Observable,就好办了,轻车熟路:
val retrofit = Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .baseUrl("https://news-at.zhihu.com/api/4/") .build() val service: ApiService = retrofit.create(ApiService::class.java) val observable = service.getLatestNews() observable.subscribeOn(Schedulers.newThread()) .subscribe(object : Observer<ZhEntity> { override fun onComplete() { } override fun onSubscribe(d: Disposable) { } override fun onNext(t: ZhEntity) { Log.e("abc","-->${Gson().toJson(t)}") } override fun onError(e: Throwable) { Log.e("abc","-->$e") } })
除了 Observable 来源变了,其余与本文最先讲的 RxJava 没什么不一样。非要说不一样,有一点,多了一句subscribeOn(Schedulers.newThread())
,下面讲讲这个。
subscribeOn
:定义 Observable 发射事件所处的线程observeOn
:定义转换/响应事件所处的线程(map、flatMap、Observer 等),可屡次切换线程切换比较常见,好比 子线程请求网络数据主线程更新 UI,subscribeOn
和observeOn
有哪些线程能够选择?它们又是怎样使用的?咱们先看一个例子:
Thread(object : Runnable { override fun run() { Log.e("abc","Thread当前线程:${Thread.currentThread().name}") observable.subscribeOn(Schedulers.newThread()) .doOnNext(object :Consumer<ZhEntity>{ override fun accept(t: ZhEntity?) { Log.e("abc","doOnNext当前线程:${Thread.currentThread().name}") } }) .observeOn(Schedulers.io()) .flatMap(object :Function<ZhEntity,ObservableSource<ZhEntity.StoriesBean>>{ override fun apply(t: ZhEntity): ObservableSource<ZhEntity.StoriesBean> { Log.e("abc","flatMap当前线程:${Thread.currentThread().name}") return Observable.fromIterable(t.stories) } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer<ZhEntity.StoriesBean> { override fun onComplete() { } override fun onSubscribe(d: Disposable) { Log.e("abc","onSubscribe当前线程:${Thread.currentThread().name}") } override fun onNext(t: ZhEntity.StoriesBean) { Log.e("abc","Observer当前线程:${Thread.currentThread().name}") Log.e("abc", "-->${Gson().toJson(t)}") } override fun onError(e: Throwable) { Log.e("abc", "-->$e") } }) } }).start() // log 打印 Thread当前线程:Thread-4 onSubscribe当前线程:Thread-4 doOnNext当前线程:RxNewThreadScheduler-1 flatMap当前线程:RxCachedThreadScheduler-1 Observer当前线程:main Observer当前线程:main Observer当前线程:main
这里面只有doOnNext
没讲过,如今说说:每发送 onNext() 以前都会先回调这个方法,因此 doOnNext 和 Observable 的 subscribe(发射事件的方法)处于同一个线程。
从这个例子能够看出:
- Observable 和 Observer 创建订阅关系是在当前线程中(Thread-4)
subscribeOn
决定 Observable 发射事件所处的线程(即 Retrofit 请求网络所在线程)- 第一次
observeOn
决定 flatMap 所在的线程(RxCachedThreadScheduler-1)- 再次
observeOn
决定 Observer 所在线程(Android 主线程 main)
因此每次调用observeOn
就会切换线程,而且决定的是接下来的变换/响应的线程。多说一句,屡次设置 subscribeOn,只有第一次生效。
线程可选值:
线 程 名 称 | 说明 | |
---|---|---|
Schedulers.immediate() | 默认的 Scheduler,直接在当前线程运行,至关于不指定线程 | |
Schedulers.newThread() | 启用新线程,并在新线程执行操做 | |
Schedulers.io() | I/O 操做(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差很少,区别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空闲的线程,所以多数状况下 io() 比 newThread() 更有效率。不要把计算工做放在 io() 中,能够避免建立没必要要的线程 | |
Schedulers.computation() | 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU | |
AndroidSchedulers.mainThread() | Android 主线程 |
最后介绍一下这两个类,Disposable
前文出现过,在 Observer 的 onSubscribe 函数中,有一个 Disposable 类型的参数:override fun onSubscribe(d: Disposable) {}
,经过前面介绍咱们知道,Observable 和 Observer 创建订阅关系时会调用 onSubscribe 方法,可是没有说这个参数的做用。
Disposable 的 dispose() 函数能够用来解除订阅,这样就不会收到 Observable 发射的事件:
var dis ?= null val observable = Observable.fromIterable(mutableListOf("Hello", "RxJava", "GoodBye")) val observer = object : Observer<String> { override fun onComplete() { } override fun onSubscribe(d: Disposable) { dis=d Log.e("abc", "-----onSubscribe-----$d") } override fun onNext(t: String) { if (t=="Hello") dis.dispose() Log.e("abc", "-----onNext-----$t") } override fun onError(e: Throwable) { } } observable.subscribe(observer) // log 打印 -----onNext-----Hello
能够看到,调用dis.dispose()
后,就不在打印上游发射的"RxJava"和"GoodBye"了。
CompositeDisposable 能够用来管理多个 Disposable,经过add()
方法添加 Disposable 对象,而后在 onDestroy 方法里面调用clear()
或者dispose()
来清除全部的 Disposable,这样能够防止内存泄漏。
val cDis = CompositeDisposable() // ...代码省略 override fun onSubscribe(d: Disposable) { cDis.add(d) } // ...代码省略 override fun onDestroy() { super.onDestroy() cDis.clear() }
多说一句,经过查看RxJava2CallAdapterFactory.create()
源码可知,dispose()
方法能主动断开 Observable 和 Observer 之间的链接,还能取消 Retrofit 的网络请求,因此放心的用吧。