首先要感谢简友 楠柯壹梦 提供的实战案例,这篇文章的例子是基于他提出的须要在token
失效时,刷新token
并从新请求接口的应用场景所想到的解决方案。若是你们有别的案例或者在实际中遇到什么问题也能够私信我,让咱们一块儿完善这系列的文章。html
有时候,咱们的某些接口会依赖于用户的token
信息,像咱们项目当中的资讯评论列表、或者帐户的书签同步都会依赖于用户token
信息,可是token
每每会有必定的有效期,那么咱们在请求这些接口返回token
失效的时候,就须要刷新token
再从新发起一次请求,这个流程图能够概括以下: java
token
失效,可是相比以前的例子,咱们增长了额外的两个需求:
token
,而不是单纯地等待一段时间再重试。token
失效而须要从新刷新token
的状况,那么须要判断当前是否有另外一个请求正在刷新token
,若是有,那么就不要发起刷新token
的请求,而是等待刷新token
的请求返回后,直接进行重试。本文的代码能够经过 RxSample 的第十四章获取。git
首先,咱们须要一个地方来缓存须要的Token
,这里用SharedPreferences
来实现,有想了解其内部实现原理的同窗能够看这篇文章:Android 数据存储知识梳理(3) - SharedPreference 源码解析。github
public class Store {
private static final String SP_RX = "sp_rx";
private static final String TOKEN = "token";
private SharedPreferences mStore;
private Store() {
mStore = Utils.getAppContext().getSharedPreferences(SP_RX, Context.MODE_PRIVATE);
}
public static Store getInstance() {
return Holder.INSTANCE;
}
private static final class Holder {
private static final Store INSTANCE = new Store();
}
public void setToken(String token) {
mStore.edit().putString(TOKEN, token).apply();
}
public String getToken() {
return mStore.getString(TOKEN, "");
}
}
复制代码
这里,咱们用一个简单的getUserObservable
来模拟依赖于token
的接口,token
存储的是获取的时间,为了演示方便,咱们设置若是距离上次获取的时间大于2s
,那么就认为过时,并抛出token
失效的错误,不然调用onNext
方法返回接口给下游。缓存
private Observable<String> getUserObservable (final int index, final String token) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, index + "使用token=" + token + "发起请求");
//模拟根据Token去请求信息的过程。
if (!TextUtils.isEmpty(token) && System.currentTimeMillis() - Long.valueOf(token) < 2000) {
e.onNext(index + ":" + token + "的用户信息");
} else {
e.onError(new Throwable(ERROR_TOKEN));
}
}
});
}
复制代码
下面,咱们来看一下整个完整的请求过程:多线程
private void startRequest(final int index) {
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
String cacheToken = TokenLoader.getInstance().getCacheToken();
Log.d(TAG, index + "获取到缓存Token=" + cacheToken);
return Observable.just(cacheToken);
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String token) throws Exception {
return getUserObservable(index, token);
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int mRetryCount = 0;
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Log.d(TAG, index + ":" + "发生错误=" + throwable + ",重试次数=" + mRetryCount);
if (mRetryCount > 0) {
return Observable.error(new Throwable(ERROR_RETRY));
} else if (ERROR_TOKEN.equals(throwable.getMessage())) {
mRetryCount++;
return TokenLoader.getInstance().getNetTokenLocked();
} else {
return Observable.error(throwable);
}
}
});
}
});
DisposableObserver<String> observer = new DisposableObserver<String>() {
@Override
public void onNext(String value) {
Log.d(TAG, index + ":" + "收到信息=" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, index + ":" + "onError=" + e);
}
@Override
public void onComplete() {
Log.d(TAG, index + ":" + "onComplete");
}
};
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
}
复制代码
为了方便你们阅读,我把全部的逻辑都写在了一整个调用链里,整个调用链分为四个部分:app
defer
:读取缓存中的token
信息,这里调用了TokenLoader
中读取缓存的接口,而这里使用defer
操做符,是为了在重订阅时,从新建立一个新的Observable
,以读取最新的缓存token
信息,其原理图以下:
flatMap
:经过token
信息,请求必要的接口。retryWhen
:使用重订阅的方式来处理token
失效时的逻辑,这里分为三种状况:重试次数到达,那么放弃重订阅,直接返回错误;请求token
接口,根据token
请求的结果决定是否重订阅;其它状况直接放弃重订阅。subscribe
:返回接口数据。关键点在于TokenLoader
的实现逻辑,代码以下:ide
public class TokenLoader {
private static final String TAG = TokenLoader.class.getSimpleName();
private AtomicBoolean mRefreshing = new AtomicBoolean(false);
private PublishSubject<String> mPublishSubject;
private Observable<String> mTokenObservable;
private TokenLoader() {
mPublishSubject = PublishSubject.create();
mTokenObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Thread.sleep(1000);
Log.d(TAG, "发送Token");
e.onNext(String.valueOf(System.currentTimeMillis()));
}
}).doOnNext(new Consumer<String>() {
@Override
public void accept(String token) throws Exception {
Log.d(TAG, "存储Token=" + token);
Store.getInstance().setToken(token);
mRefreshing.set(false);
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
mRefreshing.set(false);
}
}).subscribeOn(Schedulers.io());
}
public static TokenLoader getInstance() {
return Holder.INSTANCE;
}
private static class Holder {
private static final TokenLoader INSTANCE = new TokenLoader();
}
public String getCacheToken() {
return Store.getInstance().getToken();
}
public Observable<String> getNetTokenLocked() {
if (mRefreshing.compareAndSet(false, true)) {
Log.d(TAG, "没有请求,发起一次新的Token请求");
startTokenRequest();
} else {
Log.d(TAG, "已经有请求,直接返回等待");
}
return mPublishSubject;
}
private void startTokenRequest() {
mTokenObservable.subscribe(mPublishSubject);
}
}
复制代码
在retryWhen
中,咱们调用了getNetTokenLocked
来得到一个PublishSubject
,为了实现前面说到的下面这个逻辑: 函数
AtomicBoolean
来标记是否有刷新
Token
的请求正在执行,若是有,那么直接返回一个
PublishSubject
,不然就先发起一次刷新
token
的请求,并将
PublishSubject
做为该请求的订阅者。
这里用到了PublishSubject
的特性,它既是做为Token
请求的订阅者,同时又做为retryWhen
函数所返回Observable
的发送方,由于retryWhen
返回的Observable
所发送的值就决定了是否须要重订阅:spa
Token
请求返回正确,那么就会发送onNext
事件,触发重订阅操做,使得咱们能够再次触发一次重试操做。Token
请求返回错误,那么就会放弃重订阅,使得整个请求的调用链结束。而AtomicBoolean
保证了多线程的状况下,只能有一个刷新Token
的请求,在这个阶段内不会触发重复的刷新token
请求,仅仅是做为观察者而已,而且能够在刷新token
的请求回来以后马上进行重订阅的操做。在doOnNext/doOnError
中,咱们将正在刷新的标志位恢复,同时缓存最新的token
。
为了模拟上面提到的多线程请求刷新token
的状况,咱们在发起一个请求500ms
以后,马上发起另外一个请求,当第二个请求决定是否要重订阅时,第一个请求正在进行刷新token
的操做。
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_token);
mBtnRequest = (Button) findViewById(R.id.bt_request);
mBtnRequest.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startRequest(0);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
startRequest(1);
}
});
}
复制代码
控制台的输出以下,能够看到在第二个请求决定是否要重订阅时,它判断到已经有请求,所以只是等待而已。而在第一个请求致使的token
刷新回调以后,两个请求都进行了重试,并成功地请求到了接口信息。
本文中用到的操做符的官方解释连接以下:
关于retryWhen
的更详细的解释,推荐你们能够看一下以前的 RxJava2 实战知识梳理(6) - 基于错误类型的重试请求,它是这篇文章的基础。