RxJava

最基本的使用方式:ide

角色有:Observable(可观察的,被观察者),Observer(观察者)Subscriber(订阅者,也可认为是观察者)this

Observer接口定义了三种方法:code

public interface Observer<T> {
    //Observable  通知 Observer 已经正常完成数据传输,Observable在最后一次调用onNext后调用该方法
    void onCompleted();
    // Observable 通知 Observer 遇到了error,若是Obverable调用了该方法,它将不会再调用onCompleted 也不会再调用onNext
    void onError(Throwable e);
    //当Observer订阅的事件发生时,Observable调用该方法通知Observer,Observable能够屡次调用该方法,但一旦Observable调用了onCompleted或者 onError就不会再调用该方法
    void onNext(T t);
}

Subscriber 是一个实现了Observer的抽象类,提供了订阅,取消订阅,判断当前订阅状态一些功能server

package rx;

import rx.internal.util.SubscriptionList;


public abstract class Subscriber<T> implements Observer<T>, Subscription {
    private static final long NOT_SET = Long.MIN_VALUE;

    private final SubscriptionList subscriptions;
    private final Subscriber<?> subscriber;
    private Producer producer;
    private long requested = NOT_SET; // default to not set

    protected Subscriber() {
        this(null, false);
    }

    
    protected Subscriber(Subscriber<?> subscriber) {
        this(subscriber, true);
    }

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    /**
     * 只要list没有被标识为取消订阅(SubscriptionList中有个字段标识),则把当前的Subscription加入。若是list已经被标识为取消订阅,该方法会将当前的Subscription也取消
     */
    public final void add(Subscription s) {
        subscriptions.add(s);
    }

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    /**
     * Indicates whether this Subscriber has unsubscribed from its list of subscriptions.
     */
    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }    
}

Observable 中维护了Subscriber的对象,当调用observable的subscribe (Subscriber)方法时,会执行Subscriber中的call方法,这个描述跨度有点大,先理解为当这样调用的时候,表示被观察者(Observable)触发了通知事件,执行观察者(Subscriber)想执行的操做,具体怎么到call方法的后面再分析。对象

一个简单的使用例子接口

public class TestRxj {
    public static void main(String[] args) {
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("Yeah!Complete!");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("No!Error!");
            }

            @Override
            public void onNext(String t) {
                processData(t);
            }

            private void processData(String data) {
                System.out.println("Hello,I'm received something,i'm going to process it "+data);
            }

        };

        Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> t) {
                if(!t.isUnsubscribed()){
                    t.onNext("this is the first data");
                    t.onCompleted();
                }
            }
        };
        Observable<String> observable = Observable.create(onSubscribe);

        observable.subscribe(subscriber);
    }
}


/** 结果
*Hello,I'm received something,i'm going to process it this is the first data
*Yeah!Complete!
*/

Observable 的subscribe 作的事情:事件

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }

        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }

        subscriber.onStart();


        if (!(subscriber instanceof SafeSubscriber)) {

            subscriber = new SafeSubscriber<T>(subscriber);
        }


        try {

            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

            return RxJavaHooks.onObservableReturn(subscriber);

        } catch (Throwable e) {

            Exceptions.throwIfFatal(e);

            if (subscriber.isUnsubscribed()) {

                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));

            } else {

                try {

                    subscriber.onError(RxJavaHooks.onObservableError(e));

                } catch (Throwable e2) {

                    Exceptions.throwIfFatal(e2);

                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);

                    RxJavaHooks.onObservableError(r);
 
                    throw r; // NOPMD

                }
            }
            return Subscriptions.unsubscribed();
        }
    }

Hystrix中的一段ip

final protected Observable<R> getExecutionObservable() {

        return Observable.defer(new Func0<Observable<R>>() {

            @Override

            public Observable<R> call() {

                try {

                    return Observable.just(run());

                } catch (Throwable ex) {

                    return Observable.error(ex);

                }

            }

        }).doOnSubscribe(new Action0() {

            @Override

            public void call() {

                // Save thread on which we get subscribed so that we can interrupt it later if needed

                executionThread.set(Thread.currentThread());

            }

        });

    }
 
 
相关文章
相关标签/搜索