Rxjava代码拆解

1 Rxjava2最简单使用方式拆解

Observable p=Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("hello world");
                e.onComplete();
            }
        });
复制代码
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

调用create方法以后实际上返回了一个ObservableCreate对象.继承了Observable,是一个被观察者对象.java

p.subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
复制代码

咱们看下subscribe方法.bash

public final void subscribe(Observer<? super T> observer) {
            ...
            subscribeActual(observer);
            ...
    }

复制代码

其余代码都删掉了,剩下最核心的 subscribeActual(observer),这个observer就是咱们建立的匿名内部类对象.subscribeActual()方法是个抽象方法,咱们看下ObservableCreate中是怎么实现的.app

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

CreateEmitter发射器,在这里咱们调用了 observer.onSubscribe(parent)也就是咱们建立的匿名observer类的onSubscribe方法.ide

***source.subscribe(parent)***最重要的方法可能没有之一,观察者和被观察者顺利会师,事件开始执行,oop

@Override
            public void subscribe(ObservableEmitter e) throws Exception {//这里的ObservableEmitter就是parent,也就是CreateEmitter发射器对象
                e.onNext("hello world");
                e.onComplete();
            }
复制代码

接下来看看CreateEmitter的onNext和onComplete方法.ui

@Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
复制代码

咱们看到在发射器的onNext方法中,啥也没作,就是当了个二传手,调用了咱们观察者的onNext方法.this

@Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
复制代码

onComplete方法中也就是调用了观察者的onComplete方法. 咱们来缕缕这个过程 1 create方法传返回了一个对象是ObservableCreate,ObservableCreate的构造方法中有一个ObservableOnSubscribe对象,也就是咱们使用create时候建立的匿名内部类对象. 2 p.subscribe(o)实际上调用了ObservableCreate的subscribeActual方法 3 subscribeActual中首先调用了 observer的onSubscribe方法,紧接着调用了source.subscribe(parent)也就是ObservableOnSubscribe的subscribe方法,事件开始执行 4 subscribe方法中调用CreateEmitter的onNext方法,这个方法调用了observer的onNext方法,观察者对事件进行反应. 5 subscribe方法中调用CreateEmitter的onComplete方法,这个方法调用了observer的onComplete方法,整个流程结束.spa

2 MAP操做符是怎么工做的

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
复制代码

map操做符把咱们的observable对象变化成了具体的ObservableMap,参数是咱们以前建立好的observable和mapper functioncode

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
复制代码

注意注意:这里造成了一个新的订阅关系 这里的source是咱们create建立的observable,要否则会懵,建立ObservableMap时候咱们传进来的this是咱们生成的observable. 到这里咱们会从新调用onSubscribe() subscribeActual(),这里就回到了咱们最简单模式时候的调用步骤.不一样的是咱们真正的调用observer的方法实在MapObserver对应的方法中. 具体流程是***发射器调用onNext方法-->MapObserver的onNext方法-->再到咱们定义的observer的onNext方法***server

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
            //调用mapper改变数据
                **v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");**
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //actual咱们定义的observer
            actual.onNext(v);
        }
复制代码

3 进阶flatMap

@SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
     ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
复制代码

看看ObservableFlatMap代码

public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
复制代码

是否是和MAP超级像,咱们这几看MergeObserver onNext作了什么

@Override
        public void onNext(T t) {
             ...
               p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");

            ...

            subscribeInner(p);
        }
         @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                  
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    addInner(inner);
                    p.subscribe(inner);
                    break;
                }
            }
        }

复制代码

省略了不少代码,咱们看主要逻辑,获取到flatMap生成的observableSource,而后 p.subscribe(inner);注意这里的P不是observable 看innerObserver的onNext作了什么

//这里的onNext事件由 p.subscribe(inner)触发
  @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }
        
        void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }
复制代码

在这里咱们终于看到咱们定义的observer接收到了onNext事件

4 总结

Observable ObservableSource要分清楚,他们都有一个方法叫subscribe() Observer Emitter分清楚,他们有共同的方法onNext() onError() onComplete() 不然话很容易晕头转向.

文章若有表述有错误,请指出,谢谢.

相关文章
相关标签/搜索