RxJava练武场是一个rxjava在项目中应用的小系列,包括:java
Rxjava这个库和其余常见库不太同样,通常的库例如Glide,ButterKnife都是为了解决实际问题出现的,必定程度上是刚需。Glide库若是不用他,那么应用本身就要处理图片下载、压缩、内存管理、多级缓存等等复杂的逻辑。这类问题复杂而常见,而像Glide这类的轮子,Api的设计都比较友好,一个简单的api调用就能完成一个本来很复杂的功能,简直不要太爽。编程
Glide.with(context)
.load(url)//图片加载
.crossFade()//动画设置
.placeholder(R.drawable.place_image)//占位图
.error(R.drawable.error_image)//失败占位图
.override(width,height)//图片裁剪
.thumbnail(thumbnailRequest)//配置缩略图
.diskCacheStrategy(DiskCacheStrategy.SOURCE)//缓存策略
.into(imageView);
复制代码
而Rxjava,你刚开始看起来,都不知道他是干什么的。“异步处理”?不是通常都使用观察者模式吗?AsyncTask,Handler也能够,要rxjava干吗?若是你有兴趣研究过一点rxjava,会发现网上的教程都会说:"zip map flatmap debounce等操做符把异步回调变得‘简洁’‘优雅’",而后对比一下原来的代码和使用rxjava后的代码,最后感叹一下rxjava设计的鬼才和功能的强大。我本身在初次接触rxjava时也感受,这些rxjava的优势描述比较空洞,这项技术的意义大于实用。 实际状况是这样么?在具体开发中,异步调用给咱们的最大困扰是:异步回调的时间并不可控。当有多个异步回调时,这些调用相互联系和依赖,搞清楚每一个回调什么时候返回是个重要的问题。在每一个关键时间节点对‘分散的callback’作正确的事,有过相似编程经验的人都知道,是很是痛苦的事,若是还想代码容易看懂,简直是疯了。 api
常常遇到这种需求,接口的请求依赖token信息。一个请求须要先请求token(token若是存在缓存则使用缓存),依赖这个token才能进行正常网络请求。这个token有必定的时效性,在时效性内可使用缓存,过时后须要从新请求token并从新发起一次请求。这个流程能够概括以下图: 缓存
一、网络请求前,对token是否有缓存判断,若是没有先请求token,并把这个请求阻塞且缓存 二、token请求过程当中,若是有新的token请求进来,加入阻塞队列 三、token请求后,通知阻塞的队列(广播等方式),依次进行阻塞的请求 四、对两种次数限制,分别作逻辑判断安全
以上就是传统实现方法,就不贴代码了,这样实现有如下特色: 一、要时刻维护一个阻塞队列 (注意其添加和清空的时机) 二、token请求结束后,有一个回调机制通知阻塞队列,(这个回调须要注册和反注册) 三、两处的次数限制,次数维护的变量,很差维护(通常动态秘钥为了便于使用会作成单例,单例内的变量相似static,维护较复杂) 四、请求重试的逻辑很差实现,bash
咱们能够看到这里涉及到不少静态变量的维护,广播等异步回调的处理,这种状况一多,编程者会变得很被动。并且token的异步请求和真正的网络异步请求杂糅在一块儿,增大了问题的复杂性。网络
一些代码网络请求部分与前一篇博客《基于RxJava Retrofit的网络框架》相关。多线程
public static <R> Observable send(final MapiHttpRequest request, final MapiTypeReference<R> t){
return Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
//传入token缓存
return Observable.just(Store.sToken);
}
}).flatMap(new Function<String, ObservableSource<R>>() {
@Override
public ObservableSource<R> apply(String key) throws Exception {
if(TextUtils.isEmpty(key) && !request.skipCheckKeyValid()){
//token没有缓存,须要请求Token
return Observable.<R>error(new KeyNotValidThrowable());
} else {
//Token存在缓存,直接请求
return sendRequestInternal(request,t);
}
}
})
//进入失败重试流程
.retryWhen(new Function<Observable<? extends Throwable>, ObservableSource<String>>() {
private int retryCount = 0;
@Override
public ObservableSource<String> apply(Observable<? extends Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Throwable throwable) throws Exception {
if (throwable instanceof KeyNotValidThrowable){
//同一Request,有过一次KeyNotValidThrowable,则再也不重试
if (retryCount > 0){
return Observable.error(throwable);
} else {
//token缓存不在,进入TokenLoader请求token
retryCount++;
return TokenLoader.getInstance().getNetTokenLocked();
}
} else if (throwable instanceof ApiException){
//token过时的状况,从新获取token,并重试
ApiException apiException = (ApiException)throwable;
if (apiException.getCode() == MapiResultCode.SECRETKEY_EXPIRED.value()){
if (retryCount > 0){
return Observable.error(throwable);
} else {
//token缓存失效,进入TokenLoader请求token
retryCount++;
return DynamicKeyLoader.getInstance().getNetTokenLocked();
}
}
}
//其余类型错误,直接抛出,再也不重试
return Observable.error(throwable);
}
});
}
});
}
复制代码
也许你第一次看也挺晕,别怕,你顺着注释捋捋逻辑,是否是感受代码的实现好像画了一个时序图。 除了注释之外,几点说明: 一、defer操做符的做用是在retry时,会从新建立新的Observable,不然会使用上次的Observable,不会从新获取Store.sToken 二、retryWhen操做符,与sendRequestInternal内部统一配置的retryWhen并不冲突,至关于二次retry 三、retryWhen中若是抛出error ,则再也不重试; 四、重试请求,经过返回getNetTokenLocked这个subject实现。(下面详述)并发
总体的流程被压缩到了一个函数中,rxjava自己的retrywhen和subject机制,已经替咱们完成了这么几点: 一、自动重试的注册和反注册,subject被回调完直接失效,再次请求要从新注册。 二、高并发request,维护队列,经过mTokenObservable的回调自动解决了这个问题 三、retry次数的维护,因为每次request的retry都是从新建立的内部类,因此变量的维护变的简单。 四、重试的逻辑被retry操做符自动实现了,只要重写retry的返回值就能够控制重试的策略。app
public class TokenLoader {
public static final String TAG = TokenLoader.class.getSimpleName();
private AtomicBoolean mRefreshing = new AtomicBoolean(false);
private PublishSubject<String> mPublishSubject;
private Observable<String> mTokenObservable;
private TokenLoader() {
final TokenRequest request = new TokenRequest(CarOperateApplication.getInstance());
mTokenObservable = Observable
.defer(new Callable<ObservableSource<TokenRequest>>() {
@Override
public ObservableSource<TokenRequest> call() throws Exception {
return Observable.just(request);
}
})
.flatMap(new Function<TokenRequest, ObservableSource<MapiHttpResponse<Boolean>>>() {
@Override
public ObservableSource<MapiHttpResponse<Boolean>> apply(RefreshKeyRequest refreshKeyRequest) throws Exception {
//Token请求接口
return ApiHelper.sendDynamicKey(refreshKeyRequest,new MapiTypeReference<MapiHttpResponse<Boolean>>(){});
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<TokenRequest>>() {
private int retryCount = 0;
@Override
public ObservableSource<TokenRequest> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<TokenRequest>>() {
@Override
public ObservableSource<RefreshKeyRequest> apply(Throwable throwable) throws Exception {
retryCount++;
if (retryCount == 3){
//失败次数达到阈值,更改请求策略
request.setFlag(0);
return Observable.just(request);
} else if (retryCount > 3){
//失败次数超过阈值,抛出失败,放弃请求
mRefreshing.set(false);
return Observable.error(throwable);
} else {
//再次请求token
return Observable.just(request);
}
}
});
}
})
// .delay(6000, TimeUnit.MILLISECONDS) //模拟token请求延迟
.map(new Function<MapiHttpResponse<Boolean>,String>() {
@Override
public String apply(MapiHttpResponse<Boolean> response) throws Exception {
//成功,保存token缓存
if (response.getContent().booleanValue() == true){
setCacheToken(response.getToken());
} else if (response.getContent().booleanValue() == false){
setCacheToken(UcarK.getSign());
}
//请求完成标识
mRefreshing.set(false);
return getCacheToken();
}
});
}
public static TokenLoader getInstance() {
return Holder.INSTANCE;
}
private static class Holder {
private static final TokenLoader INSTANCE = new TokenLoader();
}
public String getCacheToken() {
return Store.sToken;
}
public void setCacheToken(String key){
Store.sToken = key;
}
/**
*
* @return
*/
public Observable<String> getNetTokenLocked() {
if (mRefreshing.compareAndSet(false, true)) {
Log.d(TAG, "没有请求,发起一次新的Token请求");
startTokenRequest();
} else {
Log.d(TAG, "已经有请求,直接返回等待");
}
return mPublishSubject;
}
private void startTokenRequest() {
mPublishSubject = PublishSubject.create();
mTokenObservable.subscribe(mPublishSubject);
}
}
复制代码
仍是读注释,除了注释之外,几点说明: 一、mRefreshing的做用是在token请求过程当中,再也不容许新的token请求, 变量采用原子类,而非boolean;这样在多线程环境下,原子类的方法是线程安全的。 compareAndSet(boolean expect, boolean update)这个方法两个做用 1)比较expect和mRefresh是否一致 2)将mRefreshing置为update
二、startTokenRequest()方法开启token请求,注意Observable在subscribe时才正式开始
三、这里使用了PublishSubject较为关键,在rxjava中Subject既是observable,又是observer,在TokenLoader中,mPublishSubject是mTokenObservable的观察者,token请求的会由mPublishSubject响应,同时mPublishSubject也做为Observable返回给TokenLoader的调用者做为retryWhen的返回值返回。(因此这里PublishSubject的泛型与send()方法中Observable的泛型应该是一致的)
四、对于mRefreshing是true的状况,直接返回mPublishSubject,这样每一个阻塞的请求retryWhen都会等待mPublishSubject的返回值,回调通知的顺序与加入阻塞的顺序是队列关系(先请求的接口,先回调),知足咱们的需求。
最后: 感受怎么样,是豁然开朗仍是越陷越深,无论那样都没有关系,你须要的是了解还存在另外一种处理异步任务的方法。在你下一次遇到一样让你头疼的问题时,你能够把这篇文章拿起来再看看,也许你的头疼会好一点了。。。