No1:php
RxJava使用html
dependencies{
compile 'io.reactivex:rxjava:1.2.0'
compile 'io.reactivex:rxandroid:1.2.1'
}
1)建立Observer(观察者)java
Subscriber subscriber = new Subscriber<String>(){ @Override public void onCompleted(){ Log.d(TAG,"onCompleted"); } @Override public void onError(Throwable e){ Log.d(TAG,"onError"); } @Override public void onNext(String s){ Log.d(TAG,"onNext"+s); } @Override public void onStart(){ Log.d(TAG,"onStart"); } }
或者react
Observer<String> observer = new Observer<String>(){ @Override public void onCompleted(){ Log.d(TAG,"onCompleted"); } @Override public void onError(Throwable e){ Log.d(TAG,"onError"); } @Override public void onNext(String s){ Log.d(TAG,"onNext"+s); } }
2)建立Observable(被观察者)android
Observable observable = Observable.create(new Observable.OnSubscribe<String>){ @Override public void call(Subscriber<? super String> subscriber){ subscriber.onNext("杨影枫"); subscriber.onNext("月媚儿"); subscriber.onCompleted(); } } //或者 Observable observable = Observable.just("杨影枫","月媚儿"); //或者 String[] words = {"杨影枫","月媚儿"}; Observable observable = Observable.from(words);
3)Subscribe(订阅)网络
observable.subscribe(subscriber);
No2:并发
RxJava的Subjectide
能够理解为Subject=Observal+Observer函数
1)PublishSubject:PublishSubject只会把在订阅发生的时间点以后来自原始Observable的数据发射给观察者post
2)BehaviorSubject:当Observer订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据
3)ReplaySubject:无论Observer什么时候订阅ReplaySubject,ReplaySubject均会发射全部来自原始Observable的数据给Observer
4)AsyncSubject:当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据
No3:
建立操做符:
1)interval:建立一个按固定时间间隔发射整数序列的Observable,至关于定时器
2)range:建立发射指定范围的整数序列的Observable,能够拿来替代for循环,发射一个范围内的有序整数序列
3)repeat:建立一个N次重复发射特定数据的Observable
变换操做符:
1)map:经过指定一个Func对象,将Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable处理
2)flatmap:将Observable发射的数据集合变换为Observable集合,而后将这些Observable发射的数据平坦化地放进一个单独的Observable
3)cast:cast操做符的做用是强制将Observable发射的全部数据转换为指定类型
4)concatMap:和flatMap使用方法相似
5)flatMapIterable:能够将数据包装成Iterable,在Iterable中咱们就能够对数据进行处理了
6)buffer:将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射
7)groupBy:用于分组元素,将源Observable变换成一个发射Observable的新Observable(分组后的)
过滤操做符:
1)filter:对源Observable产生的结果自定义规则进行过滤,只有知足条件的结果才会提交给订阅者
2)elementAt:用来返回指定位置的数据
3)distinct:用来去重,其只容许尚未发射过的数据项经过
4)skip:将源Observable发射的数据过滤掉前n项
5)take:只取前n项
6)ignoreElements:忽略全部源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者
7)throttleFirst:会按期发射这个时间段里源Observable发射的第一个数据,throttleFirst操做符默认在computation调度器上执行
8)throttleWithTimeOut:经过时间来限流。源Observable每次发射出来的一个数据后就会进行计时。若是在设定好的时间结束前源Observable有新的数据发射出来,这个数据就会被丢弃,同时throttleWithTimeOut从新开始计时。
组合操做符:
1)startWith:会在源Observable发射的数据前面插上一些数据
2)merge:将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错
3)concat:将多个Observable发射的数据进行合并发射。concat严格按照顺序发射数据,前一个Observable没发射完成时不会发射后一个Observable的数据的
4)zip:合并两个或者多个Observable发射出的数据项,根据指定的函数变换它们,并发射一个新值
5)combineLastest:当两个Observable中的任何一个发射了数据时,使用一个函数结合每一个Observable发射的最近数据项,而且基于这个函数的结果发射数据
辅助操做符:
1)delay:让原始Observable在发射每项数据以前都暂停一段指定的时间段
2)Do系列操做符:包括doOnEach、doOnNext、doOnSubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo
3)subscribeOn:用于指定Observable自身在哪一个线程上运行
4)observeOn:用来指定Observer所运行的线程,也就是发射出的数据在哪一个线程上使用。通常会指定在主线程中运行,这样就能够修改UI
5)timeout:若是原始Observable过了指定的一段时长没有发射任何数据,timeout操做符会以一个onError通知终止这个Observable,或者继续执行一个备用的Observable。
错误处理操做符:
1)catch:拦截原始Observable的onError通知,将它替换为其余数据项或者数据序列,让产生的Observable可以正常终止或者根本不终止。包括onErrorReturn、onErrorResumeNext、onExceptionResumeNext
2)retry:不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成其数据序列。
布尔操做符:
1)all:根据一个函数对源Observable发射的全部数据进行判断,最终返回的结果就是这个判断结果。
2)contains:用来判断Observable所发射的数据是否包含某一个数据
3)isEmpty:用来判断源Observable是否发射过数据
条件操做符:
1)amb:对于给定两个或多个Observable,它只发射首先发射数据或通知的那个Observable的全部数据
2)defaultIfEmpty:发射来自原始Observable的数据,若是原始Observable没有发射数据,就发射一个默认数据
转换操做符:
1)toList:将发射多项数据且为每一项数据调用onNext方法的Observable发射的多项数据组合成一个List
2)toSortedList:相似于toList操做符:不一样的是,它会对产生的列表排序,默认是天然升序
3)toMap:收集原始Observable发射的全部数据项到一个Map(默认是HashMap),而后发射这个Map
No4:
RxJava线程控制:想切换线程,须要使用Scheduler
1)Scheduler.immediate():直接在当前线程运行,它是timeout、timeInterval和timeStamp操做符的默认调度器
2)Schedulers.newThread():老是启用新线程,并在新线程执行操做
3)Schedulers.io():I/O操做所使用的Scheduler
4)Schedulers.computation():计算所使用的Scheduler
5)Schedulers.trampoline():当咱们想在当前线程执行一个任务时,并非当即时,能够用.trampoline()将它入列
6)AndroidSchedulers.mainThread():RxAndroid库中提供的Scheduler,它指定的操做在主线程中运行
No5:
RxJava结合OkHttp访问网络
private Observable<String> getObservable(final String ip){ Observable observable = Observable.create(new Observable.OnSubscribe<String>(){ @Override public void call(final Subscriber<? super String> subscriber){ mOkHttpClient = new OkHttpClient(); RequestBody formBody = new FormBody.Builder().add("ip",ip).build(); Request request = new Request.Builder().url("http://ip.taobao.com/service/getIpInfo.php").post(formBody).build(); Call call = mOkHttpClient.newCall(request); call.enqueue(new Callback(){ @Override public void onFailure(Call call,IOException e){ subscriber.onError(new Exception("error")); } @Override public void onResponse(Call call,Response response) throws IOException{ String str = response.body().string(); subscriber.onNext(str); subscriber.onCompleted(); } }); } }); return observable; }
private void postAsynHttp(String size){ getObservable(size).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>(){ @Override public void onCompleted(){ Log.d(TAG,"onCompleted"); } @Override public void onError(Throwable e){ Log.d(TAG,e.getMessage()); } @Override public void onNext(String s){ Log.d(TAG,s); Toast.makeText(getApplicationContext(),"请求成功",Toast.LENGTH_SHORT).show(); } }); }
No6:
No7: