rxjava2源码解析(四)--变换

引言

先贴上前面几篇的连接:
rxjava2源码解析(一)基本流程分析
rxjava2源码解析(二)线程切换分析
rxjava2源码解析(三)线程池原理分析java

上一篇说了rxjava2的线程池原理,这篇咱们来讲说rxjava的变换。缓存

变换和线程切换算是rxjava最关键的两个功能。常见的变换有map(),flatMap()。咱们先从map方法提及吧。bash

map

咱们先举一个简单的例子,来看看map能作什么:并发

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Function<Student, String>() {
        @Override
        public String apply(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);
复制代码

上面的例子是一个功能,打印一个班级里students的名字。很简单,经过from方法对student进行遍历,一个map方法将student变换成name,而后下游打印就完事了。咱们知道rxjava2里面是有不少泛型设定的,若是类型错误是会直接标红。from方法返回的下游数据类型是student,而subscriber中接收的数据类型必须是String。很显然,这里map就将下游的数据类型进行了变换。
具体在源码中是如何实现的呢?咱们先看map的源码:app

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
复制代码

仍是老样子,抛开判空代码和hock机制,直接看ObservableMap类。不过在此以前,先看看map方法里面设定的泛型。T是Observable里设定的上游数据类型,map方法会返回一个Observable,这里就将整个链条的数据类型进行了变换。异步

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
复制代码

看过前面的几篇就知道,这里仍是老套路,仍是装饰器模式,仍是建立一个内部处理器MapObserver。内部处理器MapObserver负责与上游绑定,因此它的处理数据类型仍为T。ObservableMap与下游进行绑定订阅,因此ObservableMap中数据的类型为R。咱们在看MapObserver以前,先看看Function是什么。ide

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}
复制代码

OK,Function是一个接口,只有一个接口方法applyFunction规定了两个泛型:T、R。其中T是apply的参数类型,R是返回值类型。咱们在使用过程当中,重写apply方法进行数据类型变换,而后再用map方法插入到整条流水线中,就达到了变换的目的。oop

下面看看MapObserver中具体是怎么实现的:post

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
复制代码

很简单,MapObserveronNext负责处理上游下来的数据,在onNext方法中调用Functionapply方法,将T变换为下游须要的U(也就是前面的R),而后再将数据传递下去,达到变换的目的。ui

map的使用和源码都很简单,咱们来看看flatMap的。

flatMap

仍是先用一个简单的例子来看flatMap的用途:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Function<Student, Observable<Course>>() {
        @Override
        public Observable<Course> apply(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);
复制代码

产品说功能要改一改,不是打印每一个student的名字,而是要打印每一个sutdent全部课程名称。正常状况下,咱们在subscriber中获取到每一个student,而后用个for循环进行遍历打印就行,可是flatMap能够直接一步搞定。

细心的已经发现,这里的Function比较奇怪,它的返回值类型居然是Observable。具体怎么回事,咱们看看源码:

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        //这里的delayErrors,maxConcurrency,bufferSize都是默认值。
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
复制代码

先解释一下,delayErrorsmaxConcurrency,bufferSize这几个参数的意义:

  • delayErrors表示异常是否须要延迟到全部内部数据都传输完毕后抛出。默认值是false
  • maxConcurrency 表示最大并发数,默认值为Integer.MAX_VALUE
  • bufferSize 缓存的内部被观察者事件总数大小,默认值为128.

老样子,咱们直接看ObservableFlatMap

public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
复制代码

仍是原来的配方,仍是原来的味道。咱们来看看MergeObserver的源码一探究竟:

@Override
        public void onNext(T t) {
            //调用apply方法,获取到转换的Observable
            ObservableSource<? extends U> p;
            try {
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                upstream.dispose();
                onError(e);
                return;
            }
            //隐藏了一些判断代码
            subscribeInner(p);
        }

        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                //这里会走到else
                if (p instanceof Callable) {
                    ...
                } else {
                //这里新建一个InnerObserver,调用addInner添加到队列中,而后用apply中生成的Observable与之订阅。
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }
复制代码

如注释中所示,这里根据上游每个数据,生成一个Observable,而后新建一个InnerObserver,将这个InnerObserver添加到内部处理器队列中,并将Observable与这个InnerObserver进行订阅。
咱们以Observable.from()为例,看看这中间的流程是什么样的。

//from 方法返回一个ObservableFromArray装饰器
    public static <T> Observable<T> fromArray(T... items) {
       //省略部分判空代码
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    
    
//ObservableFromArray源码
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

    
    @Override
    public void subscribeActual(Observer<? super T> observer) {
        //订阅后,建立一个FromArrayDisposable内部类对象
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
        //这个方法很关键,咱们待会能够看看InnerObserver的onSubscribe方法。
        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }
    
    //FromArrayDisposable不是一个处理器,他只是一个带简单队列的Disposable
    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        final Observer<? super T> downstream;
        final T[] array;
        int index;
        boolean fusionMode;
        volatile boolean disposed;
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.downstream = actual;
            this.array = array;
        }
        // 这里显然是返回同步
        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }

        //poll方法会逐个返回队列中的数据
        @Nullable
        @Override
        public T poll() {
            int i = index;
            T[] a = array;
            if (i != a.length) {
                index = i + 1;
                return ObjectHelper.requireNonNull(a[i], "The array element is null");
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == array.length;
        }

        @Override
        public void clear() {
            index = array.length;
        }

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
        //在run方法中,开始向下游传递数据。不过这时候已经不重要了,由于在InnerObserver的onSubscribe方法中,已经经过poll方法将队列中的数据都传递出去了。固然这仅仅是在这个示例中是这样
        void run() {
            T[] a = array;
            int n = a.length;
            //开始向下游传递数据
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }
    }
}

复制代码

如上面注释所示,from方法返回一个简单的ObservableFromArrayObservableFromArraysubscribe中,调用下游处理器的onSubscribe方法,而后调用自身的run方法。咱们看看InnerObserver中是怎么处理的:

static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;

        volatile boolean done;
        volatile SimpleQueue<U> queue;

        int fusionMode;
        //这里会用一个独特的ID来给每一个InnerObserver作标记
        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                //FromArrayDisposable知足这个条件
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<U> qd = (QueueDisposable<U>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    //由上面FromArrayDisposable的源码可知这里返回SYNC
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        queue = qd;
                        //这里直接将done设置为true,是由于下面的parent.drain()会直接取出全部数据并传递给下游
                        done = true;
                        //数据在这其中进行下发和传递
                        parent.drain();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;
                    }
                }
            }
        }

        @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                //当上游执行到这里时,数据已经被传递完毕了。这里单指此次示例
                parent.drain();
            }
        }

        ....
    }
复制代码

具体的信息都写在上面的注释中,咱们直接来看MergeObserverdrain()方法。

void drain() {
            //这里进行判断,确保drainLoop还在执行时不会被再次调用
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }

        void drainLoop() {
            //获取到下游Observer
            final Observer<? super U> child = this.downstream;
            int missed = 1;
            for (;;) {
                //判断是否有error
                if (checkTerminate()) {
                    return;
                }
                ...
                boolean d = done;
                svq = queue;
                InnerObserver<?, ?>[] inner = observers.get();
                int n = inner.length;
                int nSources = 0;
                ...
                int innerCompleted = 0;
                if (n != 0) {
                //初始lastId lastIndex都为0
                    long startId = lastId;
                    int index = lastIndex;
                    ...
                    int j = index;
                    sourceLoop:
                    for (int i = 0; i < n; i++) {
                        
                        //获取到当前InnerObserver
                        @SuppressWarnings("unchecked")
                        InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                        //q就是FromArrayDisposable。
                        SimpleQueue<U> q = is.queue;
                        if (q != null) {
                            for (;;) {
                                U o;
                                try {
                                    //在这里循环调取FromArrayDisposable队列中数据,而后传递到下游
                                    o = q.poll();
                                } catch (Throwable ex) {
                                    ....
                                }
                                if (o == null) {
                                    break;
                                }
                                child.onNext(o);
                                ...
                            }
                        }
                        //前面标记过,在onSubscribe中已经将done设置为true.
                        boolean innerDone = is.done;
                        SimpleQueue<U> innerQueue = is.queue;
                        //因为上面已经将数据处理完毕,这里innerQueue.isEmpty()返回为trueif (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            //将该InnerObserver从队列中移除
                            removeInner(is);
                            if (checkTerminate()) {
                                return;
                            }
                            innerCompleted++;
                        }

                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    lastIndex = j;
                    lastId = inner[j].id;
                }
                ...
                //这里与开头getAndIncrement()相呼应,确保drainLoop在执行时不会被再次调用
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
复制代码

OK,整个流程就清晰了,划重点:

  • flatMap()是基础装饰器Observable的一个方法,参数是一个Function,只不过这个Functionapply()方法返回类型为一个Observable
  • flatMap()返回一个ObservableFlatMap装饰器对象。ObservableFlatMap被订阅后会调用subscribeActual()方法,在此方法中,会建立一个内部类MergeObserver对象,并将上游装饰器与之订阅。
  • MergeObserver在接收到上游数据后,会调用Functionapply()方法,将数据转换为一个Observable,并建立一个内部InnerObserver,将这个InnerObserver放入队列中,而后将生成的Observable与之订阅。
  • 在同步的状态下,InnerObserveronSubscribe()方法会直接调用MergeObserverdrain()方法,将数据所有都直接传递给下游。从而完成整个流程。

观察代码会发现,同步仅仅是flatMap的一个简单状况,更复杂的状况在于异步。具体的你们能够去源码里研究一下,毕竟这篇的篇幅已经够长了。下一篇预告一下,咱们来看看背压。

相关文章
相关标签/搜索