在正式铺展开本文内容以前,咱们先思考一个问题:java
你认为 RxJava 真的好用吗,它好用在哪?android
CallbackHell,中文翻译为 回调地狱,在以往没有依赖RxJava
+ Retrofit
进行网络请求的代码中,这种代码并很多见(好比AsyncTask
),我曾有幸见识并维护了各类3层4层AsyncTask
回调嵌套的项目——后来我一直拒绝阅读AsyncTask
的源码,我想这应该是一个很重要的缘由。git
很感谢 @prototypez 的 《RxJava 沉思录》 系列的文章,我我的认为它是 目前国内关于RxJava讲解最好的系列 ,做者列举了国内大多数文章中,关于RxJava好处的最多见的一些呼声:github
不能否认,这些的确都是RxJava优秀的闪光点,但我认为这不是核心,正如 这篇文章 所说的,其更重要的意义在于:编程
RxJava 给咱们的事件驱动型编程带来了新的思路,
RxJava
的Observable
一会儿把咱们的维度拓展到了时间和空间两个维度。api
事件驱动型编程这个词很准确,如今我从新组织个人语言,”不要打破链式调用!“,这句话更应该说,不要破坏RxJava事件驱动型的编程思想。网络
如今让咱们回到文章的标题上,Android开发中,网络请求的错误处理一直是一个没法回避的需求,有了随着RxJava
+ Retrofit
的普及,不免会遇到这个问题:app
这是我17年年初总结的一篇博客,那时我对于RxJava
的理解比较有限,我阅读了网上不少前辈的博客,并总结了文中的这种方案,就是把全局的error处理放在onError()
中,并将Subscriber
包装成MySubscriber
:异步
public abstract class MySubscriber<T> extends Subscriber<T> {
 // ...
@Override
public void onError(Throwable e) {
onError(ExceptionHandle.handleException(e)); // ExceptionHandle中就是全局处理的逻辑,详情参考上方文章
}
public abstract void onError(ExceptionHandle.ResponeThrowable responeThrowable);
}
api.requestHttp() //网络请求
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber<Model>(context) { // 包装了全局error处理逻辑的MySubscriber
@Override
public void onNext(Model model) { // ... }
@Override
public void onError(ExceptionHandle.ResponeThrowable throwable) {
// .......
}
});
复制代码
这种解决方案于我当时看来没有问题,我认为这应该就是 完美的解决方案 了吧。
很快我就意识到了另一个问题,就是这种方案成功地驱动我写出了 RxJava版本的Callback Hell。
我不想大家笑话个人代码,所以我决定先不把它们抛出来,来看一个常见的需求:
请求一个API,若是发生异常,弹出一个Dialog,询问用户是否重试,若是重试,从新请求这个API。
让咱们看看可能不少开发者 第一直觉 会写出的代码(为了保证代码不那么啰嗦,这里我使用了Kotlin
):
api.requestHttp()
.subscribe(
onNext = {
// ...
},
onError = {
AlertDialog.Builder(context) // 弹出一个dialog,提示用户是否重试
.xxxx
.setPositiveButton("重试") { _, _ -> // 点击重试按钮,从新请求
api.requestHttp()
.subscribe(
onNext = { ... },
onError = { ... }
)
}
.setNegativeButton("取消") { _, _ -> // 啥都不作 }
.show()
}
)
复制代码
瞧!咱们写出了什么!
如今你也许明白了我当时的处境,onError()
和onComplete()
意味着此次订阅事件的终止,若是全局的异常处理都放在onError()
中,接下来若是还有其余的需求(好比网络请求),就意味着你要在这个回调方法中再添加一层回调。
在一边高呼RxJava
链式调用简洁好用和 避免了CallbackHell 时,咱们将 响应式编程 扔到了一旁,而后继续 按照平常的思惟 写着 一模一样的代码。
若是你以为这种操做彻底能够接受,咱们能够将需求升级一下:
若是发生异常,弹出dialog提示用户重试,这种dialog最多可弹出3次。
好的,若是说,最多重试一次,让代码额外增长了1层回调的嵌套(其实是2层,Dialog的点击事件自己也是一层回调),那么最多重试3次,就是.....4层回调:
api.requestHttp()
.subscribe(
onNext = {
// ...
},
onError = {
api.requestHttp()
.subscribe(
onNext = {
// ...
},
onError = {
api.requestHttp()
.subscribe(
onNext = { ... },
onError = { ... } // 还有一层
)
}
)
}
)
复制代码
你能够说,我把这个请求封装成一个函数,而后每次只调用函数就好了,话虽如此,你依然不可否认这种 CallbackHell 并不优雅。
若有可能,我但愿它能作到的是:
轻量级意味着 较低的依赖成本,若是一个工具库,它又要依赖若干个三方库,首先apk体积的急速膨胀就使人没法接受。
灵活意味着 更低的迁移成本,我不但愿,添加 或者 移除 这个工具令个人整个项目发生巨大的改动,甚至是重构。
若有可能,不要在已有的业务逻辑代码上进行修改。
低的学习成本 可让开发者更快的上手这个工具。
若有可能,请让这个工具库可以随心所欲。
这样看来,上文中经过继承的方式对全局error的处理方案,存在着必定的局限性,抛开使人瞠目结舌的回调地狱以外,不能用lambda表达式 就已经让我难以忍受。
我花了一些时间开源了这个工具:
RxWeaver: A lightweight and flexible error handler tools for RxJava2.
Weaver 翻译过来叫作 织布鸟,我最初的目的也正是让这个工具可以对逻辑代码正确地组织,达到实现RxJava全局Error处理的需求。
为了代码的足够简洁,我选择使用Kotlin做为示范代码,我保证你能够看懂并理解它们——若是你的项目中适用的开发语言是
Java
,也请不用担忧, RxWeaver 一样提供了Java
版本的依赖和示例代码,你能够在这里找到它。
RxWeaver的配置很是简单,你只须要配置好对应的GlobalErrorTransformer
类,而后在须要处理error的网络请求代码中,经过compose()
操做符,将GlobalErrorTransformer
交给RxJava, 请注意,仅仅须要一行代码:
private fun requestHttp() {
serviceManager.requestHttp() // 网络请求
.compose(RxUtils.handleGlobalError<UserInfo>(this)) // 加上这行代码
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe( // ....)
}
复制代码
RxUtils.handleGlobalError<UserInfo>(this)
相似Java
中的静态工具方法,它会返回一个对应GlobalErrorTransformer
的一个实例——里面存储的是对应的error处理逻辑,这个类并非 RxWeaver 的一部分,而是根据不一样项目的不一样业务,本身实现的一个类:
object RxUtils {
fun handleGlobalError(activity: FragmentActivity): GlobalErrorTransformer {
// ....
}
}
复制代码
如今咱们须要知道的是,这样一行代码,能够作到什么样的程度。
让咱们从3个不一样梯度的需求看看这个工具的韧性:
这是最多见的一种需求,当出现某种特殊异常(本案例以JSONException为例),咱们会经过Toast提示这样的消息给用户:
全局异常捕获-Json解析异常!
fun test() {
Observable.error(JSONException("JSONException"))
.compose(RxUtils.handleGlobalError<UserInfo>(this))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// ...
}
}
复制代码
毫无疑问,当没有加compose(RxUtils.handleGlobalError<UserInfo>(this))
这行代码时,此次订阅的结果必然是弹出一个 “onError:xxxx”的 toast。
如今咱们加上了compose的这行代码,让咱们拭目以待:
看起来成功了,即便咱们在onError()
里面针对Exception
作出了单独的处理,可是这个JSONException依然被全局捕获了,并弹出了一个额外的toast :“全局异常捕获-Json解析异常!” 。
这彷佛是一个很简单的需求,咱们提高一点难度:
此次需求是:
若接收到一个
ConnectException
(链接异常),咱们让弹出一个dialog,这个dialog只会弹一次,若用户选择重试,从新请求API
又回到了上文中这个可能会引起 Callback Hell 的需求,咱们疑问,如何保证 Dialog和重试逻辑正确执行的同时,不打破Observable流的连续性(链式调用)?
fun test2() {
Observable.error(ConnectException()) // 此次咱们把异常换成了`ConnectException`
.compose(RxUtils.handleGlobalError<UserInfo>(this))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// ...
}
}
复制代码
依然是熟悉的代码,此次咱们把异常换成了ConnectException
,咱们直接看结果:
由于咱们数据源是一个固定的ConnectException
,所以咱们不管怎么重试,必然都只会接收到ConnectException
,这不重要,你发现没有,即便是一个复杂的需求(弹出dialog,用户选择后,决定是否从新请求这个流),RxWeaver 依然能够胜任。
最后一个案例,让咱们再来一个更复杂的。
详细需求是:
当接收到Token失效的Error时,跳转login界面,用户从新登陆成功后,返回初始界面,并从新请求API;若是用户登陆失败或取消登陆,弹出错误信息。
显然这个逻辑有点复杂了, 对于实现这个需求来说,彷佛不太现实,此次是否会一筹莫展呢?
fun test3() {
Observable.error(TokenExpiredException())
.compose(RxUtils.handleGlobalError<UserInfo>(this))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribe {
// ...
}
}
复制代码
此次咱们把异常换成了TokenExpiredException
(由于直接实例化一个HttpException
过于复杂,因此咱们自定义一个异常模拟代替它),咱们直接看结果:
固然,不管怎么重试,数据源始终只会发射TokenExpiredException
,可是咱们成功实现了这个看似复杂的需求。
我认为RxWeaver达到了我心目中的设计要求:
你不须要担忧 RxWeaver 的体积,它足够的轻量,轻量到全部类加起来只有不到200行代码,同时,除了RxJava
和RxAndroid
,它 没有任何其它的依赖 ,体积大小只有3kb。
RxWeaver 的配置不须要 修改 或者 删除 任意一行已经存在的业务代码——它是彻底可插拔的。
它的原理也是很是 简单 的,只要熟悉了onErrorResumeNext
、retryWhen
、doOnError
这几个关键的操做符,你就能够立刻上手对应的配置。
能够经过接口实现任意复杂的需求实现。
这彷佛本末倒置了,对于一个工具来讲,熟练使用API 每每比 阅读源码并了解原理 优先级更高一些。可是个人想法是,若是你先了解了原理,这个工具的使用你会更加驾轻就熟。
实际上,RxWeaver的源码很是简单,简单到组件内部 没有任何Error处理逻辑,全部的逻辑都交给用户进行配置,它只是一个 中间件。
它的原理也是很是 简单 的,只要熟悉了onErrorResumeNext
、retryWhen
、doOnError
这几个关键的操做符,你就能够立刻上手对应的配置。
对于全局异常的处理,我只须要在既有代码的 链式调用 加上一行代码,配置一个 GlobalErrorTransformer<T>
交给 compose()
操做符————这个操做符是 RxJava
给咱们提供的能够面向 响应式数据类型 (Observable/Flowable/Single等等)进行 AOP 的接口, 能够对响应式数据类型 加工 、修饰 ,甚至 替换。
这意味着,在既有的代码上,使用compose()
操做符,我能够将一段特殊处理的逻辑代码插入到这个Observable
中,这实在太方便了。
对compose操做符不了解的同窗,请参考 【译】避免打断链式结构:使用.compose()操做符 @by小鄧子
compose()
操做符须要我传入一个对应 响应式类型 (Observable/Flowable/Single等等)的Transformer
接口,可是问题是不一样的 响应式类型 对应不一样的 Transformer
接口,不一样的因而咱们实现了一个通用的 GlobalErrorTransformer<T>
接口以 兼容不一样响应式类型的事件流 :
class GlobalErrorTransformer<T> constructor(
private val globalOnNextRetryInterceptor: (T) -> Observable<T> = { Observable.just(it) },
private val globalOnErrorResume: (Throwable) -> Observable<T> = { Observable.error(it) },
private val retryConfigProvider: (Throwable) -> RetryConfig = { RetryConfig() },
private val globalDoOnErrorConsumer: (Throwable) -> Unit = { },
private val upStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() },
private val downStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() }
) : ObservableTransformer<T, T>, FlowableTransformer<T, T>, SingleTransformer<T, T>, MaybeTransformer<T, T>, CompletableTransformer {
// ...
}
复制代码
如今咱们思考一下,若是咱们想把error处理的逻辑放在GlobalErrorTransformer
里面,把这个GlobalErrorTransformer
交给compose()
操做符,就等于把error处理的逻辑所有 插入 到既有的Observable
事件流中了:
fun test() {
observable
.compose(RxUtils.handleGlobalError<UserInfo>(this)) // 插入异常处理逻辑
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribe {
// ...
}
}
复制代码
同理,若是某个API不须要追加全局异常处理的逻辑,只须要把这行代码删掉便可,不会影响其余的业务代码。
这是一个不错的思路,接下来,咱们须要思考的是,如何将不一样的异常处理逻辑加进GlobalErrorTransformer
中?
这个操做符的做用实在很是明显了,就是当咱们接收到某个 Throwable 时,想要作的逻辑:
这实在很适合大部分简单的错误处理需求,就像上文的需求1同样,当咱们接收到某种指定的异常,弹出对应的message提示用户,逻辑代码以下:
when (error) {
is JSONException -> {
Toast.makeText(activity, "全局异常捕获-Json解析异常!", Toast.LENGTH_SHORT).show()
}
else -> {
}
}
复制代码
这种错误的处理方式, 不会对既有的Observable进行变换 ,也就是说,JSONException
依然会最终传递到subscribe的 onError()
的回调中——你依然须要实现 onError()
的回调,哪怕什么都不作,若有必要,再进行特殊的处理,不然会发生崩溃。
这种方式很简单,可是涉及复杂的需求就无能为力了,这时候咱们就须要借助onErrorResumeNext
操做符了。
以上文的需求2为例,若接收到一个指定的异常,咱们需展现一个Dialog,提示用户是否重试—— 这种状况下,doOnError
操做符明显无能为力,由于它不具备 对Observable进行变换的能力。
这时就须要 onErrorResumeNext
操做符上场了,它的做用是:当流的事件传递过程当中发生了错误,咱们能够将一个新的流交个 onErrorResumeNext
操做符,以保证事件流的继续传递。
这是一个被严重低估的操做符,这个操做符意味着,只要你给一个Observable<T>
的,就能继续往下传递事件,那么,这和需求中的 展现一个Dialog供用户选择 有关系吗?
固然有关系,咱们只须要把Dialog的事件转换成对应的Observable
便可:
object RxDialog {
/** * 简单的示例,弹出一个dialog提示用户,将用户的操做转换为一个流并返回 */
fun showErrorDialog(context: Context, message: String): Single<Boolean> {
return Single.create<Boolean> { emitter ->
AlertDialog.Builder(context)
.setTitle("错误")
.setMessage("您收到了一个异常:$message,是否重试本次请求?")
.setCancelable(false)
.setPositiveButton("重试") { _, _ -> emitter.onSuccess(true) }
.setNegativeButton("取消") { _, _ -> emitter.onSuccess(false) }
.show()
}
}
}
复制代码
RxDialog的 showErrorDialog()
函数将会展现一个Dialog,返回值为一个 Single<Boolean>
的流,当用户点击 肯定 按钮,订阅者会接收到一个 true
事件,反之,点击 取消 按钮,则会收到一个 false
事件。
RxJava还能这么用?
固然,RxJava
所表明的是一种响应式的编程范式,在刚接触RxJava的时候,咱们都见过这样一种说法:RxJava 很是强大的一点即是 异步。
如今咱们回过头来,网络请求的数据流 表明的是一种异步,难道 弹出一个dialog,等待的用户选择结果 难道不也是一种异步吗?
换句话说,网络请求 的流中事件意味着 网络请求的结果,那么上文中的 Single<Boolean>
表明着流中的事件是 ** Dialog的点击事件**。
其实RxJava发展的这些年来,Github上的RxJava扩展库层出不穷,好比RxPermission
,RxBinding
等等等等,前者是将 权限请求 的结果做为事件,交给了Observable
进行传递;后者则是将 **View对应的事件 ** (好比点击事件,长按事件等等)交给了Observable
。
回过头来,咱们如今经过RxDialog
建立了一个 响应式的Dialog,并获取到了用户的选择结果Single<Boolean>
,接下来咱们须要作的就只是根据Single<Boolean>
中事件的值来判断 是否从新请求网络数据 了。
RxJava提供了 retryWhen()
操做符,交给咱们去处理是否从新执行流的订阅(本文中就是指从新进行网络请求):
篇幅所限,我不会针对这个操做符进行太多的讲解,关于 retryWhen()
操做符,请参考:
【译】对RxJava中.repeatWhen()和.retryWhen()操做符的思考 by 小鄧子
继续上文的思路,咱们到了Dialog对应的Single<Boolean>
流,当用户选择后,实例化一个RetryConfig 对象,并把选择的结果Single<Boolean>
交给了 condition
属性:
RetryConfig(condition = RxDialog.showErrorDialog(params))
data class RetryConfig(
val maxRetries: Int = DEFAULT_RETRY_TIMES, // 最大重试次数,默认1
val delay: Int = DEFAULT_DELAY_DURATION, // 重试延迟,默认1000ms
val condition: () -> Single<Boolean> = { Single.just(false) } // 是否重试
)
复制代码
如今让咱们来从新整理一下思路:
1.当用户接收到一个指定的异常时,弹出一个Dialog,其选择结果为Single<Boolean>
;
2.RetryConfig
内部存储了一个Single<Boolean>
的属性,这是一个决定了是否重试的函数;
3.当用户选择了确认按钮,将Single(true)
交给并实例化一个RetryConfig
,这意味着会重试,若是选择了取消,则为Single(false)
,意味着不会重试。
看来,仅仅须要这几个操做符,Error处理复杂的需求咱们已经可以实现了?
的确如此,实际上,GlobalErrorTransformer
内部的处理,也正是调用这几个操做符:
class GlobalErrorTransformer<T> constructor(
private val globalOnNextRetryInterceptor: (T) -> Observable<T> = { Observable.just(it) },
private val globalOnErrorResume: (Throwable) -> Observable<T> = { Observable.error(it) },
private val retryConfigProvider: (Throwable) -> RetryConfig = { RetryConfig() },
private val globalDoOnErrorConsumer: (Throwable) -> Unit = { },
private val upStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() },
private val downStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() }
) : ObservableTransformer<T, T>,
FlowableTransformer<T, T>,
SingleTransformer<T, T>,
MaybeTransformer<T, T>,
CompletableTransformer {
override fun apply(upstream: Observable<T>): Observable<T> =
upstream
.flatMap {
globalOnNextRetryInterceptor(it)
}
.onErrorResumeNext { throwable: Throwable ->
globalOnErrorResume(throwable)
}
.observeOn(upStreamSchedulerProvider())
.retryWhen(ObservableRetryDelay(retryConfigProvider))
.doOnError(globalDoOnErrorConsumer)
.observeOn(downStreamSchedulerProvider())
// 其余响应式类型同理...
}
复制代码
这也正是 RxWeaver 这个工具为何如此 轻量 的缘由,即便是 核心类 GlobalErrorTransformer
也并无更复杂的逻辑,仅仅是对几个操做符的组合使用而已。
此外的几个类,也无非是对重试逻辑接口的封装罢了。
看到这里,有的小伙伴可能已经有这样一个疑问了:
需求2中的,Dialog的逻辑我可以理解,那么,需求3中,Token失效,跳转login并返回重试是如何实现的?
实际上,不管是 网络请求 , 仍是 弹出Dialog , 亦或者 跳转Login,其终究只是一个 事件的流 而已,前者能经过接口返回一个 Observble<T>
或者 Single<T>
, 跳转Login 固然也能够:
class NavigatorFragment : Fragment() {
fun startLoginForResult(activity: FragmentActivity): Single<Boolean> {
// ....
}
}
复制代码
篇幅所限,本文不进行实现代码的展现,源码请参考这里。
其原理和 RxPermissions 、RxLifecycle 还有笔者的 RxImagePicker 彻底同样,依靠一个不可见的Fragment
对数据进行传递。
在本文的开始,我简单介绍了 RxWeaver 的几个优势,其中一个是 极低的学习成本。
本文发布以前,我把个人工具介绍给了一些刚接触 RxJava 的开发者,他们接触以后,反馈居然出奇的统一:
你这个东西太难了!
对于这个结果,我很诧异,由于这毕竟只是一个加起来还不到200行的工具库,后来我仔细的思考,我终于得出了一个结论,那就是:
本文的内容理解起来很 简单 ,但首先须要你对RxJava有必定的理解,这比较 困难。
RxJava的学习曲线很是陡峭!正如 @prototypez 在他的 这篇文章 中所说的同样:
RxJava 是一个 “夹带了私货” 的框架,它自己最重要的贡献是提高了咱们思考事件驱动型编程的维度,可是它与此同时又逼迫咱们去接受了函数式编程。
正如本文一开始所说的,咱们已经习惯了 过程式编程 的思惟,所以文中的一些 抽象的操做符 会让咱们陷入必定的迷茫,可是这也正是 RxJava 的魔力所在——它让我不断想要将新的需求 从更高层级进行抽象,尝试写出更简洁的代码(至少在我看来)。
我很是喜欢 RxWeaver , 有朋友说说它代码有点少,但我却认为 轻量 是它最大的优势,它的本质目的也正是帮助开发者 对业务逻辑进行组织,使其可以写出更 Reactive 和 Functional 的代码 。
--------------------------广告分割线------------------------------
Hello,我是却把清梅嗅,若是您以为文章对您有价值,欢迎 ❤️,也欢迎关注个人博客或者Github。
若是您以为文章还差了那么点东西,也请经过关注督促我写出更好的文章——万一哪天我进步了呢?