RxJava是一个开源的Rx框架ReactiveX的java版本。
ReactiveX的主要目的是经过一系列Observable组合异步或事件代码。其中使用的是观察者模式。
能够吧Reactive想象成不断向订阅者推送对象的机制,这个推送过程能够是同步的也能够是异步的。java
下面看一个简单的例子
首先引入rxjava的依赖react
//gradle compile group: 'io.reactivex', name: 'rxjava', version: '1.2.1'
@Test public void helloworld() { Observable obs=Observable.from(new String[]{"WORLD","WORLD2"}); obs.subscribe(value-> System.out.println("Hello " + value + "!")); } -------输出------ Hello WORLD! Hello WORLD2!
首先建立了Observable对象,而后在上面订阅了一个subscriber输出observable的内容。框架
上面helloworld使用固定的值做为observable,可使用Observable的create方法进行建立内容异步
@Test public void create() { Observable.create(subscriber -> { IntStream.range(1, 5).forEach(i -> { if(!subscriber.isUnsubscribed()){ subscriber.onNext("hello" + i); } }); if(!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } }).subscribe(System.out::println); } -------输出------ hello1 hello2 hello3 hello4
上面的例子建立了4个(1-4)对象,Observable的create方法接受一个方法,在方法内部向subscriber推送信息。
subsribler输出获得的值。
上面的代码都是在主线程中运行的,若是想要不阻塞主线程能够在create中新建线程或使用线程池gradle
@Test public void asyn() throws InterruptedException { Observable.create(subscriber -> { new Thread(() -> { IntStream.range(1, 5).forEach(i -> { if (!subscriber.isUnsubscribed()) { subscriber.onNext("hello" + i); } }); if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } }).start(); }).subscribe(obj -> { System.out.println(obj); System.out.println(Thread.currentThread()); }); Thread.sleep(1000); } -------输出------ hello1 Thread[Thread-0,5,main] hello2 Thread[Thread-0,5,main] hello3 Thread[Thread-0,5,main] hello4 Thread[Thread-0,5,main]
Observable同时也提供了诸多如skip/take/map/zip等流式的转换操做。线程