RxJava中操做符到底作了什么?

    RxJava今年完全火了一把,其中最牛逼之处就是操做符了,之前只知道怎么用,这几天看了看源码,大体的弄清楚了操做符的工做过程,今天分享给你们。若是有什么不对地方,请你们多多指教。java

    今天咱们已filter为例,看代码:app

Integer[] datas={1,2,3,4,5,6,7,8,9,10};
Observable.from(datas)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer>=5;
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                mText.append(integer.toString()+",");
            }
        });

     一个很简单的小例子,用过滤操做符 filter 找出大于等于5的数字。咱们点进去看看源码中filter作了什么ide

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
            return create(new OnSubscribeFilter<T>(this, predicate));
        }

    调用了create()方法,等等咱们何时是否是也用过create() 方法,咱们在建立Observable时候也用过create()方法,原来建立了一个新的Observable返回出去了,那岂不是说咱们的订阅者其实订阅的是这个新的Observable,咱们继续往下看create方法,create方法须要的参数是一个OnSubscribe对象,那咱们能够肯定
OnSubscribeFilter是OnSubscribe的一个实现类,咱们点进去看看。学习

public final class OnSubscribeFilter<T> implements OnSubscribe<T> {
    
        final Observable<T> source;
    
        final Func1<? super T, Boolean> predicate;
    
        public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
            this.source = source;
            this.predicate = predicate;
        }

    果真不出咱们所料,OnSubscribeFilter是OnSubscribe的实现类,咱们看他的构造方法,传递了两个参数,第一个参数Observable对象,一个Func1,其中第一个参数就是咱们咱们本身建立的那个Observable,第二个参数使咱们在外面写的Func1,而后保存了起来。咱们都知道在subscribe()订阅的时候,OnSubscribe的call()方法。咱们看看OnSubscribeFilter的call()方法都干了些什么测试

@Override
        public void call(final Subscriber<? super T> child) {
            FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
            child.add(parent);
            source.unsafeSubscribe(parent);
        }
&ensp;&ensp;&ensp;&ensp;出现了一个FilterSubscriber,什么鬼玩意儿,咱们看看他是什么鬼
static final class FilterSubscriber<T> extends Subscriber<T> {

        final Subscriber<? super T> actual;

        final Func1<? super T, Boolean> predicate;

        boolean done;

        public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
            this.actual = actual;
            this.predicate = predicate;
            request(0);
        }

        @Override
        public void onNext(T t) {
            boolean result;

            try {
                result = predicate.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            if (result) {
                actual.onNext(t);
            } else {
                request(1);
            }
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
        @Override
        public void setProducer(Producer p) {
            super.setProducer(p);
            actual.setProducer(p);
        }
    }
}

    一个Subscriber的子类,咱们看他的构造方法,两个参数,一个Subscriber一个Func1,咱们在建立对象时候Subscriber对象是咱们真正的从外界传过来的观察者,Func1呢使咱们建立OnSubscribeFilter时候传递进来的对象,也就是咱们在外界定义的Func1。
    回过头来咱们继续看OnSubscribeFilter的call方法。咱们看到source.unsafeSubscribe(parent),source是咱们原来外界的Observable,他订阅了FilterSubscriber对象。咱们在他的onNext方法中看到他根据func1.call(t)的返回值来判断是否让咱们外界的真正的观察者调用onNext方法。
    看到这里有没有恍然大悟,啥?我都不知道你在说啥,额,那咱们总体的屡屡。this

    咱们外界的代码,在subscribe()时候,Subscriber并非订阅了咱们本身写的Observable,Subscriber订阅的是filter方法返回的那个新的Observable对象,因此订阅时候会调用OnSubscribeFilter的call方法,OnSubscribeFilter才是咱们订阅的被观察者的onSubscribe对象,在OnSubscribeFilter的call()方法中,咱们让咱们包装的FilterSubscriber订阅咱们原来的被观察者,也就是咱们在外界生成的那个Observable。咱们在外界的Observable的onSubscribe对象的call方法中获得的观察者是FilterSubscriber对象,咱们调用的onNext会回调到FilterSubscriber的onNext方法中。在FilterSubscriber的onNext方法中咱们根据咱们传递的Func1来判断是否要回调真正的Subscriber的onNext方法,在为true的时候咱们才回调咱们外界的观察者的onNext方法,也就起到了过滤的做用。这就是Filter的整个的流程。spa

    咱们来测试下咱们的小结论:code

Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName());
                    subscriber.onNext(5);
                }
            }).filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer > 0;
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    
                }
            });

结果

不知道你们看明白没有,很是愿意和你们一块儿讨论,一块儿学习,欢迎留言对象

相关文章
相关标签/搜索