RxJava = Observable + Operator + Scheduler + Observer?html
Observable
.fromArray(1, 2, 3, 4)
.map { it * 5 }
.filter { it -> it > 10 }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { println(it) }
复制代码
GOTO 2016 • Exploring RxJava 2 for Android • Jake Whartonjava
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("Hello");
emitter.onComplete();
}
})
复制代码
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello";
}
});
复制代码
同时在Android使用场景中:react
ReactiveX的每种特定语言实现都实现了一组操做符。大体分为如下分类:
数据库
指定订阅过程运行的线程,同时控制了Observable,Observer。
咱们先看一段代码运行结果。windows
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
//.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//运行结果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[main,5,main] 5
复制代码
加上subscribeOn
以后,发现subscribeOn的上游和下游运行线程都发生了变化。markdown
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//运行结果
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
复制代码
指定观察者所在线程。
咱们先看一段代码运行结果。网络
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
//.observeOn(Schedulers.newThread())
.subscribe { println("int next :${Thread.currentThread()} $it") }
//运行结果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[main,5,main] 5
复制代码
加上observeOn
以后,发现observeOn下游运行线程都发生了变化。app
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.observeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//运行结果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
复制代码
@POST("/content/user/info")
Observable<BaseResponse<UserInfo>> userInfo_Ob(@Body UserInfoReq req);
复制代码
这里有一段咱们使用操做符的代码,app初始化时同步服务端数据
//app初始化时同步服务端数据,
fun checkRequiredObservable(): Observable<PersonRequiredInfo> {
//后续须要的我的信息
var personRequiredInfo = PersonRequiredInfo()
val allObservables = arrayListOf(
...
//一系列独立的请求
appTabSetting(),//底部tab配置
userInfoObservable(),//用户信息
queryBindConfig(),//用户绑定帐号信息
syncBabyInfoObservable(userId) //同步宝宝信息
...
)
var zipObservable = Observable.zip(allObservables) {
return@zip personRequiredInfo
}
return apolloAppConfig()//先获取apollo配置信息,同时保存配置
.flatMap {zipObservable } //并行上面一系列请求
.onErrorReturn { personRequiredInfo }
.doOnNext {registerUserSuperProperties()}
.compose(RxHelper.io2MainThread())
}
private fun syncBabyInfoObservable(): Observable<Any> {
return getBabyInfoObservable() //先获取服务端宝宝信息
.flatMap { updateBabyInfoObservable() } //本地与服务端比较,若是须要上传则上传
.flatMap { getBabyInfoObservable().map { Any() } }//再次获取同步后的宝宝信息
.onErrorReturn { Any()}
}
复制代码
现有接口情况的状况下,很难想象,若是没有RxJava我该如何组合这些请求。
debounce
Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.
throttleFirst
Returns an Observable that emits only the first item emitted by the source ObservableSource during sequential time windows of a specified duration.
RxView.clicks(container)
.throttleFirst(800,TimeUnit.MILLISECONDS)
.subscribe { onclick() }
复制代码
RxTextView.textChanges(etSearch)
.debounce(searchDebounceTime, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { goSearch(chars) }
复制代码
class RefreshDebounce {
private var rxEmitter: ObservableEmitter<Long>? = null
private var observable = Observable
.create<Long> { rxEmitter = it }
.debounce(1000L, TimeUnit.MILLISECONDS)
constructor(consumer: (Long) -> Unit? ){
observable.subscribe { consumer.invoke(it) }
}
fun callRefresh() = rxEmitter?.onNext(System.currentTimeMillis())
}
//使用
var refreshDebounce = RefreshDebounce { println("refresh:$it") }
refreshDebounce.callRefresh()
复制代码
private void startCountDown15Min() {
countDown15MinDisposable = Observable.timer(900, TimeUnit.SECONDS)
subscribe(aLong -> Log.i(TAG, "倒计时时间到"));
}
private void cancelCountDown15Min() {
if (countDown15MinDisposable != null && !countDown15MinDisposable.isDisposed()) {
countDown15MinDisposable.dispose();
}
}
复制代码
new RxPermissions(activity)
.request(Manifest.permission.READ_PHONE_STATE)
.subscribe(aBoolean -> Log.d(TAG,"result:"+aBoolean)
复制代码