转载请注明出处:http://www.wangxinarhat.com/2016/05/01/2016-05-01-rxjava-android-operate2/java
最近比较忙,也没想好这个文章该怎么写下去。可能会比较水,不过作事不能有始无终,因此继续吧。android
使用场景:制做缓存git
效果图github
代码:缓存
缓存管理类网络
public class DataCache { /** * 读取磁盘缓存数据 */ public List<ImageInfoBean> readData() { ... } /** * 写缓存 */ public void writeData(List<ImageInfoBean> list) { ... } /** * 删除缓存 */ public boolean deleteCache() { ... } }
数据管理类并发
public class Data { private static Data instance; private static final int DATA_SOURCE_MEMORY = 1;//内存 private static final int DATA_SOURCE_DISK = 2;//硬盘 private static final int DATA_SOURCE_NETWORK = 3;//网络 BehaviorSubject<List<ImageInfoBean>> cache; private int dataSource; private Data() { } public static Data newInstance() { if (instance == null) { instance = new Data(); } return instance; } private void setDataSource(@DataSource int dataSource) { this.dataSource = dataSource; } public String getDataSourceText() { int dataSourceTextRes; switch (dataSource) { case DATA_SOURCE_MEMORY: dataSourceTextRes = R.string.data_source_memory; break; case DATA_SOURCE_DISK: dataSourceTextRes = R.string.data_source_disk; break; case DATA_SOURCE_NETWORK: dataSourceTextRes = R.string.data_source_network; break; default: dataSourceTextRes = R.string.data_source_network; } return BaseApplication.getApplication().getString(dataSourceTextRes); } /** * 请求网络数据 */ public void loadData() { Network.getGankApi() .getBeauties(80, 1) .map(BeautyResult2Beautise.newInstance()) .doOnNext(new Action1<List<ImageInfoBean>>() { @Override public void call(List<ImageInfoBean> list) { DataCache.newInstance().writeData(list); } }) .subscribe(new Action1<List<ImageInfoBean>>() { @Override public void call(List<ImageInfoBean> list) { cache.onNext(list); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } }); } /** * 获取数据 * @param observer * @return */ public Subscription subscribeData(@Nullable Observer<List<ImageInfoBean>> observer) { if (null == cache) { cache = BehaviorSubject.create(); Observable.create(new Observable.OnSubscribe<List<ImageInfoBean>>() { @Override public void call(Subscriber<? super List<ImageInfoBean>> subscriber) { //从缓存获取数据 List<ImageInfoBean> list = DataCache.newInstance().readData(); if (null == list) { setDataSource(DATA_SOURCE_NETWORK); //请求网络数据 loadData(); } else { setDataSource(DATA_SOURCE_DISK); subscriber.onNext(list); } } }) .subscribeOn(Schedulers.io()).subscribe(cache); } else { //内存中获取的数据 setDataSource(DATA_SOURCE_MEMORY); } return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer); } /** * 清空内存 */ public void clearMemoryCache() { cache = null; } /** * 清空内存和硬盘数据 */ public void clearMemoryAndDiskCache() { clearMemoryCache(); DataCache.newInstance().deleteCache(); } }
获取数据dom
@OnClick(R.id.load) public void onClick() { startTime = System.currentTimeMillis(); swipeRefreshLayout.setRefreshing(true); unsubscribe(); subscription = Data.newInstance() .subscribeData(getObserver()); }
在观察者中进行获取数据结果的处理ide
private Observer<List<ImageInfoBean>> getObserver() { if (null == observer) { observer = new Observer<List<ImageInfoBean>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show(); } @Override public void onNext(List<ImageInfoBean> list) { swipeRefreshLayout.setRefreshing(false); int loadingTime = (int) (System.currentTimeMillis() - startTime); dataSituation.setText(getString(R.string.loading_time_and_source, loadingTime, Data.newInstance().getDataSourceText())); adapter.setImages(list); } }; } return observer; }
详解 函数
Subject能够当作是一个桥梁或者代理,在RxJava中同时充当了Observer和Observable的角色。由于它是一个Observer,它能够订阅一个或多个Observable;又由于它是一个Observable,它能够转发它收到(Observe)的数据,也能够发射新的数据。
使用场景:有的 token 并不是一次性的,而是能够屡次使用,直到它超时或被销毁(多数 token 都是这样的)。
这样的 token 处理起来比较麻烦:须要把它保存起来,而且在发现它失效的时候要可以自动从新获取新的 token >并继续访问以前因为 token 失效而失败的请求。
若是项目中有多处的接口请求都须要这样的自动修复机制,使用传统的 Callback 形式须要写出很是复杂的代码。
而使用 RxJava ,能够用 retryWhen() 来轻松地处理这样的问题。
效果图
Token API准备
因为找不到足够简单的用于示例的 token API,如下API是代码伪造的
/** * Created by wangxinarhat on 16-4-5. * TokenApi */ public class TokenApi { /** * 获取Observable * @param auth * @return */ public static Observable<Token> getToken(@NonNull String auth) { return Observable.just(auth).map(new Func1<String, Token>() { @Override public Token call(String s) { try { Thread.sleep(new Random().nextInt(600) + 600); } catch (InterruptedException e) { e.printStackTrace(); } Token token = new Token(); token.token = createToken(); return token; } }); } /** * 随机生成token * @return */ private static String createToken() { return "token_wangxinarhat_" + System.currentTimeMillis() % 1000; } /** * 根据Token获取用户数据 * @param token * @return */ public static Observable<DataInfo> getData(@NonNull Token token) { return Observable.just(token).map(new Func1<Token, DataInfo>() { @Override public DataInfo call(Token token) { try { Thread.sleep(new Random().nextInt(600) + 600); } catch (InterruptedException e) { e.printStackTrace(); } if (token.isInvalid) { throw new IllegalArgumentException("Token is invalid"); } DataInfo dataInfo = new DataInfo(); dataInfo.id = (int) (System.currentTimeMillis() % 1000); dataInfo.name = "USER_" + dataInfo.id; return dataInfo; } }); } }
Token
/** *Token类 */ public class Token { public String token; public boolean isInvalid;//token是否失效 public Token(boolean isInvalid) { this.isInvalid = isInvalid; } public Token() { } }
用户数据
/** * Created by wangxinarhat on 16-4-5. * 用户数据 */ public class DataInfo { public int id; public String name; }
操做符的使用
根据token请求数据
@OnClick(R.id.requestBt) void request() { tokenUpdated = false; swipeRefreshLayout.setRefreshing(true); unsubscribe(); final TokenApi tokenApi = new TokenApi(); subscription = Observable.just(null).flatMap(new Func1<Object, rx.Observable<DataInfo>>() { @Override public Observable<DataInfo> call(Object o) { return null == cachedFakeToken.token ? Observable.<DataInfo>error(new NullPointerException("token id null")) : tokenApi.getData(cachedFakeToken); } }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable.flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable throwable) { if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) { return tokenApi.getToken("flat_map") .doOnNext(new Action1<Token>() { @Override public void call(Token token) { tokenUpdated = true; cachedFakeToken.token = token.token; cachedFakeToken.isInvalid = token.isInvalid; } }); } return Observable.just(throwable); } }); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<DataInfo>() { @Override public void call(DataInfo dataInfo) { swipeRefreshLayout.setRefreshing(false); String token = cachedFakeToken.token; if (tokenUpdated) { token += "(" + getString(R.string.updated) + ")"; } tokenTv.setText(String.format(getString(R.string.got_token_and_data), token, dataInfo.id, dataInfo.name)); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { swipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show(); } }); }
销毁token
@OnClick(R.id.invalidateTokenBt) void incalidate() { cachedFakeToken.isInvalid = true; Toast.makeText(getActivity(), R.string.token_expired, Toast.LENGTH_SHORT).show(); }
详解
若是原始Observable遇到错误,从新订阅它指望它能正常终止。
retryWhen操做符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。
retryWhen老是传递onNext通知给观察者,因为从新订阅,可能会形成数据项重复。
不管收到多少次onError通知,无参数版本的retryWhen都会继续订阅并发射原始Observable。
接受单个count参数的retryWhen会最多从新订阅指定的次数,若是次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。
还有一个版本的retryWhen接受一个谓词函数做为参数,这个函数的两个参数是:重试次数和致使发射onError通知的Throwable。这个函数返回一个布尔值,若是返回true,retryWhen应该再次订阅和镜像原始的Observable,若是返回false,retryWhen会将最新的一个onError通知传递给它的观察者。
retryWhen操做符默认在trampoline调度器上执行。
使用场景
实时搜索,若是在EditText中监听到字符改变就发起请求数据,明显不合适。
有了Debounce操做符,仅在过了指定的一段时间还没发射数据时才发射一个数据,Debounce操做符会过滤掉发射速率过快的数据项,优化网络请求
效果图
代码
配合jakewharton大神的rxbinding使用,获取可观察对象
@Override public void onActivityCreated(@Nullable Bundle savedInstanceState) { super.onActivityCreated(savedInstanceState); setLogger(); //使用rxbing给EditText注册字符改变事件 subscription = RxTextView.textChangeEvents(input) .debounce(500, TimeUnit.MILLISECONDS)//设置发射时间间隔 .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
在观察者中进行结果处理
/** * 获取观察者 * @return */ private Observer<? super TextViewTextChangeEvent> getObserver() { return new Observer<TextViewTextChangeEvent>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(TextViewTextChangeEvent textViewTextChangeEvent) { //获得搜索关键字,进行网络请求 log(String.format("搜索关键字 : %s", textViewTextChangeEvent.text().toString())); } }; }
更新adapter数据集
/** * 更新adapter数据集 * * @param logMsg */ private void log(String logMsg) { if (isCurrentlyOnMainThread()) { mLogs.add(0, logMsg + " (main thread) "); mAdapter.notifyDataSetChanged(); } else { mLogs.add(0, logMsg + " (NOT main thread) "); new Handler(Looper.getMainLooper()).post(new Runnable() { @Override public void run() { mAdapter.notifyDataSetChanged(); } }); } }
详解
Debounce仅在过了一段指定的时间还没发射数据时才发射一个数据,会根据设置的时间间隔过滤掉发射速率过快的数据项。
按期收集Observable的数据放进一个数据包裹,而后发射这些数据包裹,而不是一次发射一个值。
这个操做符,我暂时尚未比较好的使用场景,不过既然是能够按期收集数据,那么应该能够作指定时间内点击次数等之类的统计。
效果图
代码
仍是使用jakewharton大神的rxbinding,注册点击事件获取可观察对象
@Override public void onActivityCreated(@Nullable Bundle savedInstanceState) { super.onActivityCreated(savedInstanceState); setLogger(); subscription = RxView.clicks(btn) .map(new Func1<Void, Integer>() { @Override public Integer call(Void aVoid) { log("点击一次"); return 1; } }) .buffer(3, TimeUnit.SECONDS)//设置收集数据时间间隔为3s .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
详解
Buffer操做符将一个Observable变换为另外一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。
还有就是:若是原来的Observable发射了一个onError通知,Buffer会当即传递这个通知,而不是首先发射缓存的数据,即便在这以前缓存中包含了原始Observable发射的数据。
由于我也才尝试使用rx,这篇终于挤出来了,好难。。代码在这里。
若是又学到新的使用场景,仍是会再写。
我是从国内rx大神扔物线,还有github上star数最多的那位哥们儿(kaushikgopal)学习的。由于他们都没有很详细的说明操做符的使用,因此才想写这个文章。
如想深刻学习,请看大神代码。