在RxJava 1.x 系列中,讲解了RxJava的大体用法,由于如今都用RxJava 2了,因此Rxjava 1就不细讲,主要来学习RxJava 2。html
/** * rajava2 的基本使用 */ private void rxJava2BaseUser() { Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); //throw new Exception("发生了错误"); } }) .subscribe(new Observer<String>() { Disposable disposable; // 新增该方法 @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); disposable = d; } @Override public void onNext(@NonNull String s) { Log.d(TAG, "Item: " + s); if (s.equals("4")) disposable.dispose(); // 在RxJava 2.x 中,新增的Disposable能够作到切断的操做,让Observer观察者再也不接收上游事件 } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("----- 01 -----"); emitter.onNext("----- 02 -----"); emitter.onNext("----- 03 -----"); } }) // Consumer 和 RxJava 1 中的 Action1 相似 .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Item: " + s); } }); }
基本使用和RxJava 1没有什么区别。java
1.新增了onSubscribe方法,onSubscribe方法会在事件开始的时候,触发。react
2.新增的Disposable能够作到切断的操做,让Observer观察者再也不接收上游事件。git
3.Action1 --> Consumer 只接收onNext方法。github
Disposable
该怎么办呢, RxJava中已经内置了一个容器
CompositeDisposable
, 每当咱们获得一个
Disposable
时就调用
CompositeDisposable.add()
将它添加到容器中, 在退出的时候, 调用
CompositeDisposable.clear()
便可切断全部的水管.
/** * rajava2 线程 */ private void rxJava2Thread() { Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "事件处理线程:" + Thread.currentThread().getName()); emitter.onNext("---- 1 ----"); emitter.onNext("---- 2 ----"); emitter.onNext("---- 3 ----"); } }) .subscribeOn(Schedulers.newThread()) // 指明被观察者处理的线程 .observeOn(AndroidSchedulers.mainThread()) // 指明观察者线程 .subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe:" + Thread.currentThread().getName()); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "Item: " + s + " :" + Thread.currentThread().getName()); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError:" + e.getMessage() + " :" + Thread.currentThread().getName()); } @Override public void onComplete() { Log.d(TAG, "onComplete:" + Thread.currentThread().getName()); } }); }
结果:网络
02-10 10:02:31.007 25414-25414/pers.bolin.rxjava2demo D/MainActivity: onSubscribe:main 02-10 10:02:31.009 25414-25970/pers.bolin.rxjava2demo D/MainActivity: 事件处理线程:RxNewThreadScheduler-1 02-10 10:02:31.047 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 1 ---- :main 02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 2 ---- :main 02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 3 ---- :main
能够看出事件已经被分到不一样的线程去处理了。ide
.subscribeOn(Schedulers.newThread()) // 指明被观察者处理的线程 .observeOn(AndroidSchedulers.mainThread()) // 指明观察者线程
须要注意的是subscribeOn 只在第一次切换有效,observeOn每次切换都是有效的post
看下线程的参数有哪些:学习
just;from ;map ;flatMap 和RxJava使用一致:RxJava 1.x 理解-3url
更多的操做符使用:
官方:http://reactivex.io/documentation/operators.html