package qianxingzhe.rxjava_learning; import org.junit.Test; import java.util.ArrayList; import java.util.List; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscriber; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; /** * Created by lunyi.yly on 16/8/6. */ public class RxJavaText { @Test public void hello_world_01() { Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } }); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable e) { System.out.println("onError()"); } @Override public void onNext(String s) { System.out.println("onNext()"); System.out.println(s); } }; observable.subscribe(subscriber); } @Test public void hello_world_02() { Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } }); Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable e) { System.out.println("onError()"); } @Override public void onNext(String s) { System.out.println("onNext()"); System.out.println("s"); } }; observable.subscribe(observer); } @Test public void just_01() { Observable<String> observable = Observable.just("hello world"); Action1<String> action1 = new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }; observable.subscribe(action1); } /** * just 用来建立只发出一个事件就结束的Observable对象 */ @Test public void just_02() { Observable.just("hello world").subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * map 把一个事件转换为另外一个事件 */ @Test public void map() { Observable.just("hello world") .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.hashCode(); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); } /** * from 接收一个集合做为输入,而后每次输出一个元素给subscriber */ @Test public void from() { Observable.from(new String[]{"hello", "world"}) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * flatMap 接收一个Observable的输出做为输入,同时输出另一个Observable */ @Test public void flatmap() { List<List<String>> lists = new ArrayList<>(); Observable.from(lists) .flatMap(new Func1<List<String>, Observable<String>>() { @Override public Observable<String> call(List<String> strings) { return Observable.from(strings); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * filter 输出和输入相同的元素,而且会过滤掉那些不知足检查条件的 */ @Test public void filter() { Observable.from(new String[]{"hello", "world"}) .flatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String s) { return Observable.just(s); } }) .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s.equals("hello"); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * take 输出最多指定数量的结果 */ @Test public void take() { Observable.from(new Integer[]{1, 2, 3, 4, 5,}) .take(3) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); } /** * doOnNext 容许咱们在每次输出一个元素以前作一些额外的事情,好比这里的保存标题。 */ @Test public void doOnNext() { Observable.just("hello world") .doOnNext(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * subscribeOn 指定被观察者代码运行的线程 * ObserverOn 指定观察者运行的线程 */ @Test public void subscribeOn_ObserverOn() { Observable.just("http://www.baidu.com") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("在UI线程执行"); } }); } /** * unsubscribe 在他当前执行的地方终止 */ @Test public void unsubscribe() { Subscription subscribe = Observable.just("hello world") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); subscribe.unsubscribe(); System.out.println(subscribe.isUnsubscribed()); } }