最基本的使用方式:ide
角色有:Observable(可观察的,被观察者),Observer(观察者)Subscriber(订阅者,也可认为是观察者)this
Observer接口定义了三种方法:code
public interface Observer<T> { //Observable 通知 Observer 已经正常完成数据传输,Observable在最后一次调用onNext后调用该方法 void onCompleted(); // Observable 通知 Observer 遇到了error,若是Obverable调用了该方法,它将不会再调用onCompleted 也不会再调用onNext void onError(Throwable e); //当Observer订阅的事件发生时,Observable调用该方法通知Observer,Observable能够屡次调用该方法,但一旦Observable调用了onCompleted或者 onError就不会再调用该方法 void onNext(T t); }
Subscriber 是一个实现了Observer的抽象类,提供了订阅,取消订阅,判断当前订阅状态一些功能server
package rx; import rx.internal.util.SubscriptionList; public abstract class Subscriber<T> implements Observer<T>, Subscription { private static final long NOT_SET = Long.MIN_VALUE; private final SubscriptionList subscriptions; private final Subscriber<?> subscriber; private Producer producer; private long requested = NOT_SET; // default to not set protected Subscriber() { this(null, false); } protected Subscriber(Subscriber<?> subscriber) { this(subscriber, true); } protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } /** * 只要list没有被标识为取消订阅(SubscriptionList中有个字段标识),则把当前的Subscription加入。若是list已经被标识为取消订阅,该方法会将当前的Subscription也取消 */ public final void add(Subscription s) { subscriptions.add(s); } @Override public final void unsubscribe() { subscriptions.unsubscribe(); } /** * Indicates whether this Subscriber has unsubscribed from its list of subscriptions. */ @Override public final boolean isUnsubscribed() { return subscriptions.isUnsubscribed(); } }
Observable 中维护了Subscriber的对象,当调用observable的subscribe (Subscriber)方法时,会执行Subscriber中的call方法,这个描述跨度有点大,先理解为当这样调用的时候,表示被观察者(Observable)触发了通知事件,执行观察者(Subscriber)想执行的操做,具体怎么到call方法的后面再分析。对象
一个简单的使用例子接口
public class TestRxj { public static void main(String[] args) { Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Yeah!Complete!"); } @Override public void onError(Throwable e) { System.out.println("No!Error!"); } @Override public void onNext(String t) { processData(t); } private void processData(String data) { System.out.println("Hello,I'm received something,i'm going to process it "+data); } }; Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> t) { if(!t.isUnsubscribed()){ t.onNext("this is the first data"); t.onCompleted(); } } }; Observable<String> observable = Observable.create(onSubscribe); observable.subscribe(subscriber); } } /** 结果 *Hello,I'm received something,i'm going to process it this is the first data *Yeah!Complete! */
Observable 的subscribe 作的事情:事件
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); } subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber); } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { Exceptions.throwIfFatal(e); if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); RxJavaHooks.onObservableError(r); throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }
Hystrix中的一段ip
final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); }