RxJava2源码分析——FlatMap和ConcatMap及其相关并发编程分析

本文章主要是对RxJava2FlatMapConcatMap这两个操做符进行源码分析,在阅读以前,能够先阅读如下文章:java

RxJava2源码分析——订阅react

RxJava2源码分析——线程切换android

RxJava2源码分析——Map操做符git

本文章用的RxJavaRxAndroid版本以下:github

implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
复制代码

FlatMap

FlatMap操做符能够将一个发射数据的Observable转变为多个Observables,而后将这些发射的数据合并进一个单独的Observable,发射的数据不保证有序数组

咱们先写段示例代码,为了方便理解,在调用FlatMap方法的时候,我就不用上Lambda链式调用了,代码以下:缓存

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
复制代码

Log以下:多线程

FlatMapLog.png

源码分析

咱们看下flatMap方法,分析可知,会依次调用如下方法,代码以下:app

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    // 注意:参数delayErrors传入的是false
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    // 注意:参数maxConcurrency传入的是Integer.MAX_VALUE
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    // bufferSize是指数据缓冲区的大小,与背压(Backpressure)有关
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // 这里有个判断,判断this是否是ScalarCallable的实现类,详细解释请看下面
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // 若是不是ScalarCallable的实现类就会调用如下方法
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

bufferSize()是数据缓冲区的大小,默认是128,可从如下代码得知:框架

// Observable.java
public static int bufferSize() {
    return Flowable.bufferSize();
}

// Flowable.java
public static int bufferSize() {
    return BUFFER_SIZE;
}

// Flowable.java
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
    BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
复制代码

ScalarCallable是一个接口,它的实现类有6个FlowableEmptyFlowableJustMaybeEmptyMaybeJustObservableEmptyObservableJust,分别对应这6个方法:Flowable.empty()Flowable.just(T item)Maybe.empty()Maybe.just(T item)Observable.empty()Observable.just(T item)

根据前几篇文章的经验可知,咱们只要看ObservableFlatMap这个类就好了,代码以下:

// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
}

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

咱们也像前几篇文章那样,先看下subscribeActual方法,这里会先调用ObservableScalarXMap.tryScalarXMapSubscribe方法,若是是true的话就return,这个方法中会判断source是否是Callable的实现类,若是是的话就会委托ObservableScalarXMap来发射事件,而后返回true,不然返回false,上面说的ScalarCallable接口就是继承Callable接口,因此咱们主要是看下面的逻辑,调用了sourcesubscribe方法,而且传入new出来的MergeObserver,咱们来看下MergeObserver这个类,要注意的点我都写上注释了,代码以下:

// ObservableFlatMap.java
// MergeObserver继承AtomicInteger
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {

    private static final long serialVersionUID = -2117620485640801370L;

    final Observer<? super U> downstream;
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    volatile SimplePlainQueue<U> queue;

    volatile boolean done;

    final AtomicThrowable errors = new AtomicThrowable();

    volatile boolean cancelled;

    // 存放InnerObserver的数组
    final AtomicReference<InnerObserver<?, ?>[]> observers;

    static final InnerObserver<?, ?>[] EMPTY = new InnerObserver<?, ?>[0];

    static final InnerObserver<?, ?>[] CANCELLED = new InnerObserver<?, ?>[0];

    Disposable upstream;

    long uniqueId;
    long lastId;
    int lastIndex;

    Queue<ObservableSource<? extends U>> sources;

    int wip;

    MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        this.downstream = actual;
        // mapper是Function接口的实现类
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
        // 根据上面的代码可知,传入的Integer.MAX_VALUE,因此这段逻辑不会
        if (maxConcurrency != Integer.MAX_VALUE) {
            sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
        }
        // 建立一个InnerObserver数组的原子引用
        this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(T t) {
        // safeguard against misbehaving sources
        if (done) {
            return;
        }
        ObservableSource<? extends U> p;
        try {
            // 调用mapper的apply方法
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            upstream.dispose();
            onError(e);
            return;
        }

        // 在上面也分析过了,传入的Integer.MAX_VALUE,因此这段逻辑不会执行
        if (maxConcurrency != Integer.MAX_VALUE) {
            synchronized (this) {
                if (wip == maxConcurrency) {
                    sources.offer(p);
                    return;
                }
                wip++;
            }
        }

        subscribeInner(p);
    }

    @SuppressWarnings("unchecked")
    void subscribeInner(ObservableSource<? extends U> p) {
        // 一个死循环
        for (;;) {
            // 判断p是否是Callable接口的实现类,上面分析过,这里再也不赘述
            if (p instanceof Callable) {
                if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                    boolean empty = false;
                    synchronized (this) {
                        p = sources.poll();
                        if (p == null) {
                            wip--;
                            empty = true;
                        }
                    }
                    if (empty) {
                        drain();
                        break;
                    }
                } else {
                    break;
                }
            } else {
                // 若是p不是Callable接口的实现类,建立InnerObserver
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                // 调用addInner方法,将InnerObserver存放到observers数组中,下面会解析
                if (addInner(inner)) {
                    // 对每次建立的InnerObserver进行订阅
                    p.subscribe(inner);
                }
                break;
            }
        }
    }

    boolean addInner(InnerObserver<T, U> inner) {
        // 又是一个死循环
        for (;;) {
            // 从observers数组取出InnerObserver
            InnerObserver<?, ?>[] a = observers.get();
            if (a == CANCELLED) {
                // 若是是CANCELLED状态的就取消订阅
                inner.dispose();
                return false;
            }
            int n = a.length;
            // 建立新的InnerObserver数组,大小为a数组大小加1
            InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
            // 将a数组数据复制到b数组
            System.arraycopy(a, 0, b, 0, n);
            // 将新建的InnerObserver放到b数组最后的位置
            b[n] = inner;
            // 将b数组数据原子性地更新到a数组中
            if (observers.compareAndSet(a, b)) {
                // 若是成功就返回true
                return true;
            }
        }
    }

    // 移除InnerObserver的方法
    void removeInner(InnerObserver<T, U> inner) {
        for (;;) {
            InnerObserver<?, ?>[] a = observers.get();
            int n = a.length;
            if (n == 0) {
                return;
            }
            int j = -1;
            for (int i = 0; i < n; i++) {
                if (a[i] == inner) {
                    j = i;
                    break;
                }
            }
            if (j < 0) {
                return;
            }
            InnerObserver<?, ?>[] b;
            if (n == 1) {
                b = EMPTY;
            } else {
                b = new InnerObserver<?, ?>[n - 1];
                System.arraycopy(a, 0, b, 0, j);
                System.arraycopy(a, j + 1, b, j, n - j - 1);
            }
            if (observers.compareAndSet(a, b)) {
                return;
            }
        }
    }

    boolean tryEmitScalar(Callable<? extends U> value) {
        U u;
        try {
            u = value.call();
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            errors.addThrowable(ex);
            drain();
            return true;
        }

        if (u == null) {
            return true;
        }

        if (get() == 0 && compareAndSet(0, 1)) {
            downstream.onNext(u);
            if (decrementAndGet() == 0) {
                return true;
            }
        } else {
            SimplePlainQueue<U> q = queue;
            if (q == null) {
                if (maxConcurrency == Integer.MAX_VALUE) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                } else {
                    q = new SpscArrayQueue<U>(maxConcurrency);
                }
                queue = q;
            }

            if (!q.offer(u)) {
                onError(new IllegalStateException("Scalar queue full?!"));
                return true;
            }
            if (getAndIncrement() != 0) {
                return false;
            }
        }
        drainLoop();
        return true;
    }

    void tryEmit(U value, InnerObserver<T, U> inner) {
        // 判断get()是否是等于0,若是等于0就将值设为1
        if (get() == 0 && compareAndSet(0, 1)) {
            // 调用下游的onNext方法
            downstream.onNext(value);
            // 发射完数据后,判断自减1后的值是否是等于0,若是等于0,证实全部数据发射完成,方法结束
            if (decrementAndGet() == 0) {
                return;
            }
        } else {
            SimpleQueue<U> q = inner.queue;
            if (q == null) {
                // 建立SpscLinkedArrayQueue队列,它是一个单生产、单消费的数组队列,它能够在消费者变慢的状况下分配新的数组
                q = new SpscLinkedArrayQueue<U>(bufferSize);
                inner.queue = q;
            }
            // 将接收的上游数据缓存到队列中
            q.offer(value);
            // 判断值是否是不等于0后自增1,若是不等于0就结束方法
            if (getAndIncrement() != 0) {
                return;
            }
        }
        // 调用drainLoop方法
        drainLoop();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        if (errors.addThrowable(t)) {
            done = true;
            drain();
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        drain();
    }

    @Override
    public void dispose() {
        if (!cancelled) {
            cancelled = true;
            if (disposeAll()) {
                Throwable ex = errors.terminate();
                if (ex != null && ex != ExceptionHelper.TERMINATED) {
                    RxJavaPlugins.onError(ex);
                }
            }
        }
    }

    @Override
    public boolean isDisposed() {
        return cancelled;
    }

    void drain() {
        if (getAndIncrement() == 0) {
            drainLoop();
        }
    }

    void drainLoop() {
        final Observer<? super U> child = this.downstream;
        int missed = 1;
        for (;;) {
            // 检查订阅是否是被终止,若是是,方法结束
            if (checkTerminate()) {
                return;
            }
            // 将MergeObserver内的变量queue复制给svq,queue是一个队列
            SimplePlainQueue<U> svq = queue;

            if (svq != null) {
                for (;;) {
                    // 再次检查订阅是否是被终止,若是是,方法结束
                    if (checkTerminate()) {
                        return;
                    }

                    // 从队列中取出数据
                    U o = svq.poll();

                    // 若是是null的话,跳出该循环
                    if (o == null) {
                        break;
                    }

                    // 调用下游Observer的onNext方法,发射数据
                    child.onNext(o);
                }
            }

            boolean d = done;
            svq = queue;
            InnerObserver<?, ?>[] inner = observers.get();
            int n = inner.length;

            int nSources = 0;
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    nSources = sources.size();
                }
            }

            if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
                Throwable ex = errors.terminate();
                if (ex != ExceptionHelper.TERMINATED) {
                    // 判断Throwable是否是null
                    if (ex == null) {
                        // 调用下游Observer的onComplete方法
                        child.onComplete();
                    } else {
                        // 调用下游Observer的onError方法
                        child.onError(ex);
                    }
                }
                return;
            }

            // 处理数组数据
            int innerCompleted = 0;
            if (n != 0) {
                long startId = lastId;
                int index = lastIndex;

                if (n <= index || inner[index].id != startId) {
                    if (n <= index) {
                        index = 0;
                    }
                    int j = index;
                    for (int i = 0; i < n; i++) {
                        if (inner[j].id == startId) {
                            break;
                        }
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    index = j;
                    lastIndex = j;
                    lastId = inner[j].id;
                }

                int j = index;
                sourceLoop:
                for (int i = 0; i < n; i++) {
                    if (checkTerminate()) {
                        return;
                    }

                    @SuppressWarnings("unchecked")
                    InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                    SimpleQueue<U> q = is.queue;
                    if (q != null) {
                        // 处理InnerObserver数组中的每个InnerObserver对象
                        for (;;) {
                            U o;
                            try {
                                o = q.poll();
                            } catch (Throwable ex) {
                                Exceptions.throwIfFatal(ex);
                                is.dispose();
                                errors.addThrowable(ex);
                                if (checkTerminate()) {
                                    return;
                                }
                                removeInner(is);
                                innerCompleted++;
                                j++;
                                if (j == n) {
                                    j = 0;
                                }
                                continue sourceLoop;
                            }
                            if (o == null) {
                                break;
                            }

                            // 调用onNext方法,发射InnerObserver的数据
                            child.onNext(o);

                            // 检查订阅是否是被终止,若是是,方法结束
                            if (checkTerminate()) {
                                return;
                            }
                        }
                    }

                    boolean innerDone = is.done;
                    SimpleQueue<U> innerQueue = is.queue;
                    // 检查队列里的数据是否处理完毕
                    if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                        // 若是是,将对应的InnerObserver从数组中移除
                        removeInner(is);
                        // 检查订阅是否是被终止,若是是,方法结束
                        if (checkTerminate()) {
                            return;
                        }
                        // innerCompleted自增
                        innerCompleted++;
                    }

                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                lastIndex = j;
                lastId = inner[j].id;
            }

            // 判断innerCompleted是否是不等于0,也就是判断当前InnerObserver是否处理完毕
            if (innerCompleted != 0) {
                if (maxConcurrency != Integer.MAX_VALUE) {
                    while (innerCompleted-- != 0) {
                        ObservableSource<? extends U> p;
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                continue;
                            }
                        }
                        subscribeInner(p);
                    }
                }
                // 结束当前当前循环,进入下一个循环,继续处理下一个InnerObserver
                continue;
            }
            // 数据发射完毕后,将值自减
            missed = addAndGet(-missed);
            // 若是missed等于0,证实队列中的全部数据所有发射完毕,跳出循环,方法结束
            if (missed == 0) {
                break;
            }
        }
    }

    // 检查订阅是否是被终止的方法
    boolean checkTerminate() {
        if (cancelled) {
            return true;
        }
        Throwable e = errors.get();
        if (!delayErrors && (e != null)) {
            disposeAll();
            e = errors.terminate();
            if (e != ExceptionHelper.TERMINATED) {
                downstream.onError(e);
            }
            return true;
        }
        return false;
    }

    boolean disposeAll() {
        upstream.dispose();
        InnerObserver<?, ?>[] a = observers.get();
        if (a != CANCELLED) {
            a = observers.getAndSet(CANCELLED);
            if (a != CANCELLED) {
                for (InnerObserver<?, ?> inner : a) {
                    inner.dispose();
                }
                return true;
            }
        }
        return false;
    }
}

// InnerObserver继承AtomicReference
static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> {

    private static final long serialVersionUID = -4606175640614850599L;
    final long id;
    final MergeObserver<T, U> parent;

    volatile boolean done;
    volatile SimpleQueue<U> queue;

    int fusionMode;

    InnerObserver(MergeObserver<T, U> parent, long id) {
        this.id = id;
        this.parent = parent;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            if (d instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<U> qd = (QueueDisposable<U>) d;

                // requestFusion和背压(Backpressure)有关,由于咱们这里没用到相关的类,因此fusionMode的值为0
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                // 订阅类型是同步
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    // 发射数据
                    parent.drain();
                    return;
                }
                // 订阅类型是异步
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }

    @Override
    public void onNext(U t) {
        // 根据上面分析可知,fusionMode的值为0,因此等于QueueDisposable.NONE
        if (fusionMode == QueueDisposable.NONE) {
            // 调用tryEmit方法
            parent.tryEmit(t, this);
        } else {
            parent.drain();
        }
    }

    @Override
    public void onError(Throwable t) {
        if (parent.errors.addThrowable(t)) {
            if (!parent.delayErrors) {
                parent.disposeAll();
            }
            done = true;
            parent.drain();
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        done = true;
        parent.drain();
    }

    public void dispose() {
        DisposableHelper.dispose(this);
    }
}
复制代码

ConcatMap

ConcatMap操做符能够将一个发射数据的Observable转变为多个Observables,而后将这些发射的数据合并进一个单独的Observable,发射的数据保证有序

咱们先写段示例代码,为了方便理解,在调用ConcatMap方法的时候,我就不用上Lambda链式调用了,代码以下:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .concatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
复制代码

Log以下:

ConcatMapLog.png

源码分析

咱们看下ConcatMap方法,分析可知,会依次调用如下方法,代码以下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(prefetch, "prefetch");
    // 判断this是否是ScalarCallable,上面分析过了,这里再也不赘述
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // 最后一个参数delayErrors传入的是ErrorMode.IMMEDIATE
    return RxJavaPlugins.onAssembly(new ObservableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
}
复制代码

根据前几篇文章的经验可知,咱们只要看ObservableConcatMap这个类就好了,代码以下:

public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize, ErrorMode delayErrors) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.bufferSize = Math.max(8, bufferSize);
}

@Override
public void subscribeActual(Observer<? super U> observer) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
        return;
    }

    // 这里delayErrors传入的是ErrorMode.IMMEDIATE
    if (delayErrors == ErrorMode.IMMEDIATE) {
        // 对observer进行序列化
        SerializedObserver<U> serial = new SerializedObserver<U>(observer);
        // 调用订阅方法,而且传入new出来的SourceObserver
        source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
    } else {
        source.subscribe(new ConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END));
    }
}
复制代码

咱们看下SourceObserver这个类,有些源码的逻辑和FlatMap比较类似,这里就再也不赘述了,代码以下:

// ObservableConcatMap.java
static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {

    private static final long serialVersionUID = 8828587559905699186L;
    final Observer<? super U> downstream;
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final InnerObserver<U> inner;
    final int bufferSize;

    SimpleQueue<T> queue;

    Disposable upstream;

    volatile boolean active;

    volatile boolean disposed;

    volatile boolean done;

    int fusionMode;

    SourceObserver(Observer<? super U> actual,
                            Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize) {
        this.downstream = actual;
        this.mapper = mapper;
        this.bufferSize = bufferSize;
        this.inner = new InnerObserver<U>(actual, this);
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            if (d instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) d;

                // requestFusion和背压(Backpressure)有关,由于咱们这里没用到相关的类,因此fusionMode的值为0
                int m = qd.requestFusion(QueueDisposable.ANY);
                // 订阅关系是同步
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;

                    downstream.onSubscribe(this);

                    drain();
                    return;
                }

                // 订阅关系是异步
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;

                    downstream.onSubscribe(this);

                    return;
                }
            }

            // 建立一个大小为数据缓冲区大小的队列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);

            // 调用下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        // 根据上面分析可知,fusionMode的值为0,因此等于QueueDisposable.NONE
        if (fusionMode == QueueDisposable.NONE) {
            // 将接收的上游数据缓存到队列中
            queue.offer(t);
        }
        // 调用drain方法
        drain();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        done = true;
        dispose();
        downstream.onError(t);
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        drain();
    }

    void innerComplete() {
        active = false;
        drain();
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }

    @Override
    public void dispose() {
        disposed = true;
        inner.dispose();
        upstream.dispose();

        if (getAndIncrement() == 0) {
            queue.clear();
        }
    }

    void drain() {
        // 判断值是否是不等于0后自增1,若是不等于0就结束方法
        if (getAndIncrement() != 0) {
            return;
        }

        for (;;) {
            // 判断是否是结束订阅
            if (disposed) {
                // 若是是,就清空队列,结束方法
                queue.clear();
                return;
            }
            // active是用volatile修饰,active是用来判断当前是否还有InnerObserver在发射,因此能保证发射InnerObserver是有序的,这点和FlatMap不同
            if (!active) {

                boolean d = done;

                T t;

                try {
                    // 从队列取出数据
                    t = queue.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    dispose();
                    queue.clear();
                    downstream.onError(ex);
                    return;
                }

                boolean empty = t == null;

                // 判断是否发射完毕,同时队列是否还有数据
                if (d && empty) {
                    // 若是发射完毕,同时队列是没有数据的话,结束订阅,调用下游Observer的onComplete方法
                    disposed = true;
                    downstream.onComplete();
                    return;
                }

                // 再次判断队列是否还有数据
                if (!empty) {
                    // 若是队列还有数据,执行如下逻辑
                    ObservableSource<? extends U> o;

                    try {
                        // 调用mapper的apply方法
                        o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        dispose();
                        queue.clear();
                        downstream.onError(ex);
                        return;
                    }

                    // active设为true,表示当前还在发射数据,其余任务就进入不了上面所说的判断了
                    active = true;
                    // 调用下游Observer的订阅方法
                    o.subscribe(inner);
                }
            }

            // 发射完数据后,判断自减1后的值是否是等于0,若是等于0,证实全部数据发射完成,方法结束
            if (decrementAndGet() == 0) {
                break;
            }
        }
    }

    // InnerObserver继承AtomicReference
    static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {

        private static final long serialVersionUID = -7449079488798789337L;

        final Observer<? super U> downstream;
        final SourceObserver<?, ?> parent;

        InnerObserver(Observer<? super U> actual, SourceObserver<?, ?> parent) {
            this.downstream = actual;
            this.parent = parent;
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onNext(U t) {
            // 调用下游Observer的onNext方法
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            parent.dispose();
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            parent.innerComplete();
        }

        void dispose() {
            DisposableHelper.dispose(this);
        }
    }
}
复制代码

FlatMap和ConcatMap对比

在作对比以前,我改下上面的两段示例代码,都调用delay方法,延迟1s发射,代码以下:

FlatMap:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
复制代码

Log以下:

FlatMapDelayLog.png

ConcatMap:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .concatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
复制代码

Log以下:

ConcatMapLog.png

我这里发射了3组数据,须要注意的是,咱们发现FlatMap3组数据都是不按顺序的,可是每组数据里发射的数据都是按顺序的ConcatMap3组数据都是按顺序的,并且每组数据里发射的数据也是按顺序的,那为何这样呢?其实上面阅读源码的时候也稍微说起了下,这里再详细解释下,由于FlatMap对应的MergeObserverConcatMap对应的SourceObserver都继承了AtomicInteger,在解释这个类前,先说下几个概念。

volatile

volatile的语义:

  1. volatile修饰的变量的操做不保证是原子性的。
  2. Java内存模型不会对volatile指令进行重排序优化,能够保证对volatile变量的操做是按照指令的顺序执行。
  3. volatile修饰的变量能保证对全部线程的可见性,每次修改值都会马上同步回主内存,每次读取值都会从主内存中从新读取。

指令重排序

处理器经过缓存可以从数量级上下降内存延迟的成本,由于对主存的一次访问须要花费硬件屡次的时钟周期,而这些缓存为了性能会从新排列待定内存操做的顺序,也就是重排序,这里有个前提,**Java内存模型(Java Memory Model)经过先行发生原则(happen-before)**保证顺序执行语义,对一个volatile变量的写操做先行发生于后面对这个变量的读操做,这里的前后指的是时间上的顺序,在这里举个例子:

Object object = new Object();
复制代码

这条语句会转成多条汇编指令,大体作了如下三件事情

  1. Object类实例分配内存空间。
  2. 初始化Object对象
  3. object变量指向刚分配的内存,这时候object变量就不是null了。

由于Java编译器容许指令重排序对其优化,上面这3个步骤可能1->2->3或者是1->3->2,可是步骤1确定是第一个执行的,由于作指令重排序有个前提,就是必须遵循先行发生原则,保证最后是正确的执行结果,执行步骤2步骤3的前提是步骤1,必须为实例分配内存空间才能去初始化对象或者将变量指向分配的内存。

单线程这样的优化是没有问题的,可是在多线程就会有问题了,这里我举个例子,咱们会使用双重检查锁定(Double Check Locking,简称DCL)来实现单例,它是懒汉模式,代码以下:

package com.tanjiajun.rxjavademo;

/** * Created by TanJiaJun on 2019-11-14. */
public class Singleton {

    // mInstance用volatile修饰,保证指令执行的顺序
    private static volatile Singleton mInstance;

    // 私有构造函数
    private Singleton() {
        // 防止经过反射调用构造函数形成单例失效
        if (mInstance != null) {
            throw new RuntimeException("Cannot construct a singleton more than once.");
        }
    }

    // 获取单例的方法
    public static Singleton getInstance() {
        // 第一次判断mInstance是否为null,判断是否须要同步,提升性能和效率
        if (mInstance == null) {
            synchronized (Singleton.class) {
                // 第二次判断mInstance是否为null,判断是否已经建立实例
                if (mInstance == null) {
                    mInstance = new Singleton();
                }
            }
        }
        // 返回mInstance
        return mInstance;
    }

}
复制代码

建立实例的这条语句会转成多条汇编指令,大概作了以下3件事情

  1. Singleton类实例分配内存空间。
  2. 初始化Singleton对象
  3. mInstance变量指向刚分配的内存,这时候mInstance变量就不是null了。

若是咱们不用volatile修饰,也不加同步锁的话,假设有两个线程,分别是AB,若是线程A建立实例步骤是1->3->2,当它执行步骤3的时候,这时候mInstance变量已经不是null了,线程B也执行getInstance方法,进入第一个判断,由于mInstance变量已经不是null了,因此就会建立另一个实例了,形成单例失效

CAS操做

CAS(Compare And Swap),翻译过来就是比较和交换,它能够防止共享变量出现脏读脏写问题,保证了原子操做CAS也是乐观锁的一种实现方式,乐观锁是什么呢?乐观锁老是假设最好的状况,因此在数据进行提交更新的时候才会去检查是否有冲突。

AtomicInteger

咱们看下AtomicInteger的源码,代码以下:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // 用到sun.misc.Unsafe
    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long VALUE;

    static {
        try {
            // VALUE是内存偏移值
            VALUE = U.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
    }

    // 用volatile修饰value,保证指令的执行顺序
    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    public AtomicInteger() {
    }

    public final int get() {
        return value;
    }

    public final void set(int newValue) {
        value = newValue;
    }

    public final void lazySet(int newValue) {
        U.putOrderedInt(this, VALUE, newValue);
    }

    public final int getAndSet(int newValue) {
        return U.getAndSetInt(this, VALUE, newValue);
    }

    // 主要看这个方法,它就是CAS操做,我会在下面解析
    public final boolean compareAndSet(int expect, int update) {
        return U.compareAndSwapInt(this, VALUE, expect, update);
    }

    public final boolean weakCompareAndSet(int expect, int update) {
        return U.compareAndSwapInt(this, VALUE, expect, update);
    }

    public final int getAndIncrement() {
        return U.getAndAddInt(this, VALUE, 1);
    }

    public final int getAndDecrement() {
        return U.getAndAddInt(this, VALUE, -1);
    }

    public final int getAndAdd(int delta) {
        return U.getAndAddInt(this, VALUE, delta);
    }

    public final int incrementAndGet() {
        return U.getAndAddInt(this, VALUE, 1) + 1;
    }

    public final int decrementAndGet() {
        return U.getAndAddInt(this, VALUE, -1) - 1;
    }

    public final int addAndGet(int delta) {
        return U.getAndAddInt(this, VALUE, delta) + delta;
    }

    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public String toString() {
        return Integer.toString(get());
    }

    public int intValue() {
        return get();
    }

    public long longValue() {
        return (long)get();
    }

    public float floatValue() {
        return (float)get();
    }

    public double doubleValue() {
        return (double)get();
    }

}
复制代码

compareAndSet方法就是CAS操做,它是调用sun.misc.Unsafe里的compareAndSwapInt方法,这个方法是个native方法,其做用是每次从内存中根据内存偏移量(VALUE)取出的值和expect比较,若是数据一致就把内存中的值改成update

结论

由于FlatMap对应的MergeObserverConcatMap对应的SourceObserver都继承了AtomicInteger,根据以前的源码分析,它们两个操做符每组数据里发射数据的操做都是原子操做,所以它们都是按顺序的;不一样的是,在ObservableConcatMap的源码中,咱们能够看到它用volatile修饰的active布尔值判断当前是否还有InnerObserver在发射,可是在ObservableFlatMap的源码中没看到相关的逻辑,因此FlatMap发射的那几组数据是不按顺序的,ConcatMap发射的那几组数据是按顺序的。

个人GitHub:TanJiaJunBeyond

Android通用框架:Android通用框架(Kotlin-MVVM)

个人掘金:谭嘉俊

个人简书:谭嘉俊

相关文章
相关标签/搜索