RxJava(二):基础知识

博客主页java

1. Observable

RxJava 使用一般须要三步:数据库

  1. 建立 Observable

Observable 字面意思是被观察者,使用 RxJava 须要建立一个被观察者,它会决定何时触发事件以及触发怎样的事件。有点相似上游发送命令,能够在这里决定异步操做模块的顺序和异步操做模块的次数。segmentfault

  1. 建立 Observer

Observer 即观察者,它能够在不一样的线程中执行任务。这种模式能够极大地简化并发操做,由于它建立了一个处于待命状态的观察者哨兵,能够在将来某个时刻响应 Observable 的通知,而不须要阻塞等待 Observable 发射数据。缓存

  1. 使用 subscribe() 进行订阅

建立了 Observable 巳和 Observer 以后,咱们还须要使用 subscribe() 方法将它们链接起来,这样整个上下游就能衔接起来实现链式调用。安全

Observable.just("Hello World!").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "accept: " + s);
    }
});

just() 是 RxJava 的建立操做符,用于建立一个 Observable,Consumer 是消费者,用于接收单个值。网络

subscribe 有多个重载的方法。
并发

一个重载方法的版本,subscribe(onNext, onError, onComplete)框架

Observable.just("Hello World!").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error-> " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 执行结果
Next-> Hello World!
Complete.

onComplete 是一个 Action 它与 Consumer 的区别以下:异步

  1. Action 无参数类型。
  2. Consumer :单一参数类型。

再来看一个重载方法的版本, subscribe(onNext, onError, onComplete, onSubscribe)ide

Observable.just("Hello World!").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error-> " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
}, new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "subscribe");
    }
});

// 执行结果
subscribe
Next-> Hello World!
Complete.

从打印结果可知:先执行了onSubscribe 再执行了 onNext和onComplete

在RxJava 2 中, Observable 再也不支持订阅 Subscriber ,而是须要使用 Observer 做为观察者

Observable.just("Hello World!").subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "Next-> " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "Error-> " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "Complete.");
    }
});

// 执行结果
subscribe
Next-> Hello World!
Complete.

在RxJava 中, 被观察者、观察者、subscribe()方法三者缺一不可。只有使用了 subscribe(),被观察者才会开始发送数据.

RxJava 2 的5种观察者模式:

5种观察者模式的描述,只有 Flowable 支持背压,若是有须要背压的状况,则必须使用 Flowable
l7DHSK.png

do 操做符

do 操做符能够给 Observable 生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段时,这些回调就会被触发。在 RxJava 中包含了不少的 doXXX 操做符。

Observable.just("Hello World!")
        .doOnNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "doOnNext-> " + s);
            }
        })
        .doAfterNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "doAfterNext-> " + s);
            }
        })
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnComplete.");
            }
        })
        // 订阅以后回调方法
        .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.d(TAG, "doOnSubscribe");
            }
        })
        .doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doAfterTerminate");
            }
        })
        .doFinally(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doFinally");
            }
        })
        // Observable 每发射一个数据就会触发这个回调,不只包括onNext,还包括onComplete和onError
        .doOnEach(new Consumer<Notification<String>>() {
            @Override
            public void accept(Notification<String> stringNotification) throws Exception {
                Log.d(TAG, "doOnEach-> "
                        + (stringNotification.isOnNext() ? "onNext"
                        : stringNotification.isOnComplete() ? "onComplete"
                        : "onError"));
            }
        })
        // 订阅后能够取消订阅
        .doOnLifecycle(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.d(TAG, "doOnLifecycle::onSubscribe-> " + disposable.isDisposed());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnLifecycle::onDispose");
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "subscribe-> " + s);
            }
        });

// 执行结果
doOnSubscribe
doOnLifecycle::onSubscribe-> false
doOnNext-> Hello World!
doOnEach-> onNext
subscribe-> Hello World!
doAfterNext-> Hello World!
doOnComplete.
doOnEach-> onComplete
doFinally
doAfterTerminate


2. Hot Observable Cold Observable

Observable 的分类

在 RxJava 中, Observable 有 Hot 和 Cold 之分。

Hot Observable 不管有没有观察者进行订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时(多个观察者进行订阅时) , Hot Observable 与订阅者们的关系是一对多的关系,能够与多个订阅者共享信息。

Cold Observable 是只有观察者订阅了,才开始执行发射数据流的代码。井且 Cold Observable 和 Observer 只能是一对一的关系 。当有多个不一样的订阅者时,消息是从新完整发送的。也就是说,对 Cold Observable ,有多个 Observer 的时候,它们各自的事件是独立的。

Cold Hot 区别:

  1. 把Hot Observable 想象成一个广播电台,全部在此刻收听的昕众都会听到同一首歌
  2. Cold Observable 一张音乐 CD ,人们能够独立购买并听取它。

Cold Observable

Observable 的 just、creat、range、fromXXX 等操做符都能生成 Cold Observable

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};


Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread());

observable.subscribe(subscriber1);
observable.subscribe(subscriber2);

// 执行结果
    subscriber2-> 0
 subscriber1-> 0
 subscriber1-> 1
    subscriber2-> 1
 subscriber1-> 2
    subscriber2-> 2
    subscriber2-> 3
 subscriber1-> 3
 subscriber1-> 4
    subscriber2-> 4
 subscriber1-> 5
    subscriber2-> 5
    subscriber2-> 6
 subscriber1-> 6
 subscriber1-> 7
    subscriber2-> 7
    subscriber2-> 8
 subscriber1-> 8
 subscriber1-> 9
    subscriber2-> 9

subscriber1 和 subscriber2 结果并不必定是相同的,它们两者是彻底独立的。create 操做符建立的Observable 是 Cold Observable

Cold Observable 如何转换成 Hot Observable

使用 publish ,生 ConnectableObservable

使用 publish 操做符 ,可让 Cold Observable 转换成 Hot Observable,它将原先 Observable
转换 ConnectableObservable

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

Consumer<Long> subscriber3 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "      subscriber3-> " + aLong);
    }
};


ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread()).publish();

// 须要调用connect()才能真正执行
observable.connect();

// 订阅
observable.subscribe(subscriber1);
observable.subscribe(subscriber2);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}
observable.subscribe(subscriber3);

// 执行结果
 subscriber1-> 0
    subscriber2-> 0
 subscriber1-> 1
    subscriber2-> 1
       subscriber3-> 1
 subscriber1-> 2
    subscriber2-> 2
       subscriber3-> 2
 subscriber1-> 3
    subscriber2-> 3
       subscriber3-> 3
 subscriber1-> 4
    subscriber2-> 4
       subscriber3-> 4
 subscriber1-> 5
    subscriber2-> 5
       subscriber3-> 5
 subscriber1-> 6
    subscriber2-> 6
       subscriber3-> 6
 subscriber1-> 7
    subscriber2-> 7
       subscriber3-> 7
 subscriber1-> 8
    subscriber2-> 8
       subscriber3-> 8
 subscriber1-> 9
    subscriber2-> 9
       subscriber3-> 9
 subscriber1-> 10
    subscriber2-> 10
       subscriber3-> 10
 subscriber1-> 11
    subscriber2-> 11
       subscriber3-> 11

多个订阅的 subscriber (或者说观察者)共享同一事件。在这里,ConnectableObservable 是线程安全的。

使用 Subject/Processor

Subject 和 Processor 的做用相同。Processor 是 RxJava 2.x 新增的类,继承自 Flowable, 支持背压控制( Back Presure ),而 Subject 则不支持背压控制。

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

Consumer<Long> subscriber3 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "      subscriber3-> " + aLong);
    }
};


Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread());

PublishSubject<Long> subject = PublishSubject.create();
observable.subscribe(subject);


observable.subscribe(subscriber1);
observable.subscribe(subscriber2);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}
subject.subscribe(subscriber3);

// 执行结果与上面使用 publish 操做符相同。

Subject 既是 Observable ,又是 Observer (Subscriber)。能够从 Subject 源码上看到 ,继承自 Observable ,实现 Observer

// Subject源码
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
  // ...省略
}

Subject 做为观察者,能够订阅目标 Cold Observable ,使对方开始发送事件。同时它又做为 Observable 转发或者发送新的事件,让 Cold Observable 借助 Subject 转换为 Hot Observable

Subject 井不是线程安全的,若是想要其线程安全,须要调用 toSerialized() 方法(在RxJava 1.x 中还能够用 SerializedSubject 代替 Subject ,可是在 RxJava 2.x 以后 SerializedSubject 再也不是 public class)

然而,不少基于 EventBus 改造的 RxBus 并无这么作。这样的作法是很是危险的,会遇到并发的状况。

Hot Observable 如何转换成 Cold Observable

ConnectableObservable 的 refCount 操做符

在 ReactiveX 官网的解释是:make a Connectable Observable behave like an ordinary Observable

RefCount 操做符把一个可链接的 Observable 链接和断开的过程自动化了。它操做一个可链接的Observable 返回一个普通的 Observable 。当第一个订阅者/观察者订阅这个 Observable 时,RefCount 链接到下层的可链接 Observable。 RefCount 跟踪有多少个观察者订阅它,直到最后一个观察者完成,才断开与下层可链接 Observable 的链接。

若是全部的订阅者/观察者都取消订阅了,则数据流中止:若是从新订阅,则从新开始数据流。

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread()).publish();

connectableObservable.connect();

Observable<Long> observable = connectableObservable.refCount();

Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

disposable1.dispose();
disposable2.dispose();

Log.d(TAG, "从新开始数据流");

disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);

// 执行结果
 subscriber1-> 0
    subscriber2-> 0
 subscriber1-> 1
    subscriber2-> 1
 从新开始数据流
 subscriber1-> 0
    subscriber2-> 0
 subscriber1-> 1
    subscriber2-> 1

若是不是全部的订阅者/观察者都取消了订阅 ,而只是部分取消,则部分的订阅者/观察者从新开始订阅时,不会从头开始数据流

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

Consumer<Long> subscriber3 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "      subscriber3-> " + aLong);
    }
};

ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread()).publish();

connectableObservable.connect();

Observable<Long> observable = connectableObservable.refCount();

Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);
observable.subscribe(subscriber3);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

disposable1.dispose();
disposable2.dispose();

Log.d(TAG, "subscriber1 和 subscriber2 从新订阅");

disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);

// 执行结果
 subscriber1-> 0
    subscriber2-> 0
       subscriber3-> 0
 subscriber1-> 1
    subscriber2-> 1
       subscriber3-> 1
 subscriber1 和 subscriber2 从新订阅
       subscriber3-> 2
 subscriber1-> 2
    subscriber2-> 2
       subscriber3-> 3
 subscriber1-> 3
    subscriber2-> 3
       subscriber3-> 4
 subscriber1-> 4
    subscriber2-> 4

subscriber1 和 subscriber2 先取消了订阅,subscriber3 井没有取消订阅。以后,subscriber1 和 subscriber2 又从新订阅。最终 subscriber一、subscriber二、subscriber3 的值保持一致

Observable share 操做符

share 操做符封装了 publish().refCount()调用,能够看其源码

public final Observable<T> share() {
    return publish().refCount();
}

3. Flowable

在 RxJava 2.x 中,Observable 再也不支持背压(Back Pressure),而改由 Flowable 来支持非阻塞式的背压。 Flowable 是 RxJava 2.x 新增的被观察者, Flowable 能够当作 Observable 新的实现,它支持背压,同时实现 Reactive Streams 的 Publisher 接口。 Flowable 全部的操做符强制支持背压,不过 Flowable 中的操做符大多与 Observable 相似。

(1) 使用 Observable 较好的场景

  • 通常处理最大不超过 1000 条数据,而且几乎不会出现内存溢出
  • GUI 鼠标事件,基本不会背压(能够结合 sampling/debouncing 操做)
  • 处理同步流

(2) 使用 Flowable 较好的场景

  • 处理以某种方式产生超过 lOKB 的元素;
  • 文件读取与分析
  • 读取数据库记录,也是一个阻塞的和基于拉取模式
  • 网络 I/O 流
  • 建立一个响应式非阻塞接口

4. Single、Completable 和 Maybe

一般状况下 ,若是想要使用 RxJava, 首先会想到的是使用 Observable ,若是考虑到背压的状况,则在 RxJava 2.x 下会使用 Flowable 。除 Observable 和 Flowable 外,在 RxJava 2.x 中还有3种类型的被观察者 Single、Completable 和 Maybe

Single

从 SingleEmitter 的源码能够看出,Single 只有 onSuccess 和 onError 事件

public interface SingleEmitter<T> {

    void onSuccess(@NonNull T t);

    void onError(@NonNull Throwable t);

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    boolean tryOnError(@NonNull Throwable t);
}

其中, onSuccess 用于发射数据(在 Observable/Flowable 中使用 onNext() 来发射数据),并且只能发射一个数据,后面即便再发射数据也不会作任何处理。

Single 的 SingleObserver 中只有 onSuccess 和 onError ,并无 onComplete 。这也是Single 与其余4种被观察者之间的最大区别。

Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(SingleEmitter<String> emitter) throws Exception {
        emitter.onSuccess("test");
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "onSuccess-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "onError-> " + throwable.getMessage());
    }
});

// 执行结果
onSuccess-> test

上面 Observer 中有两个 Consumer, 还能够进一步简化:

Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(SingleEmitter<String> emitter) throws Exception {
        emitter.onSuccess("test");
    }
}).subscribe(new BiConsumer<String, Throwable>() {
    @Override
    public void accept(String s, Throwable throwable) throws Exception {
        Log.d(TAG, "onSuccess-> " + s);
    }
});

Single 能够经过 toXXX 方法转换成 Observable、Flowable、Completable、Maybe

Completable

Completable 在建立后,不会发射任何数据。从 CompletableEmitter 的源码中能够看到。

public interface CompletableEmitter {

    void onComplete();

    void onError(@NonNull Throwable t);

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    boolean tryOnError(@NonNull Throwable t);
}

Completable 只有 onComplete 和 onError 事件,同时 Completable 井没有 map、flatMap 等操做符,它的操做符比起 Observable/Flowable 要少得多

能够经过企fromXXX 操做符来建立一个 Completable

Completable.fromAction(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Hello World!");
    }
}).subscribe();

// 执行结果
Hello World!

Completable 常常结合 andThen 操做符使用。

Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(CompletableEmitter emitter) throws Exception {
        try {
            Log.d(TAG, "--------------");
            TimeUnit.SECONDS.sleep(1);
            emitter.onComplete();
        } catch (InterruptedException e) {
            emitter.onError(e);
        }
    }
}).andThen(Observable.range(1, 5))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        });

// 执行结果
18:19:37.943  --------------
18:19:38.947  Next-> 1
18:19:38.947  Next-> 2
18:19:38.947  Next-> 3
18:19:38.948  Next-> 4
18:19:38.948  Next-> 5

emitter.onComplete() 执行完成以后,代表 Completable 经执行完毕,接下来执行 andThen 里的操做。

在 Completable 中, andThen 有多个重载的方法,正好对应了5种被观察者的类型。

Completable andThen(CompletableSource next)

<T> Observable<T> andThen(ObservableSource<T> next)

<T> Maybe<T> andThen(MaybeSource<T> next)

<T> Flowable<T> andThen(Publisher<T> next)

<T> Single<T> andThen(SingleSource<T> next)

Completable 也能够经过 toXXX 方法转换成 Observable、Flowable、Single 以及 Maybe

Maybe

Maybe 是 RxJava 2.x 以后才有的新类型,能够当作是 Single 和 Completable 的结合。

Maybe 建立以后, MaybeEmitter 和 SingleEmitter 样,并无 onNext() 方法,一样须要 onSuccess() 方法来发射数据。

Maybe.create(new MaybeOnSubscribe<String>() {
    @Override
    public void subscribe(MaybeEmitter<String> emitter) throws Exception {
        emitter.onSuccess("Hello World!");
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Success-> " + s);
    }
});

// 执行结果
Success-> Hello World!

Maybe 也只能发射 0 或者 1 个数据,即便发射多个数据, 后面发射的数据也不会处理

emitter.onSuccess("Hello World!");
 emitter.onSuccess("Hello World!2");

// 执行结果仍然是
Success-> Hello World!

若是 MaybeEmitter 先调用了 onComplete(),即便后面再调用 onSuccess(),也不会发射任何数据。

Maybe.create(new MaybeOnSubscribe<String>() {
    @Override
    public void subscribe(MaybeEmitter<String> emitter) throws Exception {
        emitter.onComplete();
        emitter.onSuccess("Hello World!");
        emitter.onSuccess("Hello World!2");

    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Success-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 执行结果
Complete.

Maybe 在没有数据发射时,subscribe 会调用 MaybeObserver 的 onComplete()。若是 Maybe 有数据发射或者调用了 onError(),则不会执行 MaybeObserver 的 onComplete()。

也能够将 Maybe 转换成 Observable、Flowable、Single,只需相应地调用 toObservable()、toFlowable()、 toSingle()便可。

5. Subject 和 Processor

Subject 是一种特殊的存在

Subject 既是 Observable ,又是 Observer。官网称能够将 Subject 看做一个桥梁或者代理

Subject 的分类

Subject 包含 4 种类型,分别是 AsyncSubject、BehaviorSubject、ReplaySubject、PublishSubject

AsyncSubject

Observer 会接 AsyncSubject 的 onComplete() 以前的最后一个数据

AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("AsyncSubject#1");
subject.onNext("AsyncSubject#2");
subject.onComplete();
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        // 出现异常才会输出
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("AsyncSubject#3");
subject.onNext("AsyncSubject#4");

// 执行结果
Next-> AsyncSubject#2
Complete.

修改一下上面代码,将subject.onComplete();放到最后,执行结果

Next-> AsyncSubject#4
 Complete.

subject.onComplete() 必需要调用才会开始发送数据,不然观察者将不接收任何数据。

BehaviorSubject

Observer 会先接收到 BehaviorSubject 被订阅以前的最后一个数据,再接收订阅以后发射过来的数据。若是 BehaviorSubject 被订阅以前没有发送任何数据,则会发送一个默认数据。

BehaviorSubject<String> subject = BehaviorSubject.createDefault("BehaviorSubject#1");
// 订阅
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("BehaviorSubject#2");
subject.onNext("BehaviorSubject#3");

// 执行结果
 Next-> BehaviorSubject#1
 Next-> BehaviorSubject#2
 Next-> BehaviorSubject#3

BehaviorSubject#1 是默认值。

修改一下上面的代码,在订阅以前发送数据 subject.onNext("BehaviorSubject#4"); 执行结果以下:

Next-> BehaviorSubject#4
 Next-> BehaviorSubject#2
 Next-> BehaviorSubject#3

丢弃了默认值,而发射了 BehaviorSubject#4。由于 BehaviorSubject 每次只会发射调用
subscribe() 方法以前的最后一个事件和调用 subscribe() 方法以后的事件.

BehaviorSubject 还能够缓存最近一次发出信息的数据.

ReplaySubject

ReplaySubject 会发射全部来自原始 Observable 的数据给观察者,不管它们是什么时候订阅的.

ReplaySubject<String> subject = ReplaySubject.create();
subject.onNext("ReplaySubject#1");
subject.onNext("ReplaySubject#2");
// 订阅
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("ReplaySubject#3");
subject.onNext("ReplaySubject#4");

// 执行结果
 Next-> ReplaySubject#1
 Next-> ReplaySubject#2
 Next-> ReplaySubject#3
 Next-> ReplaySubject#4

若是将create() 修改成 createWithSize(1),表示只缓存订阅前最后发送的一条数据。执行结果以下:

Next-> ReplaySubject#2
 Next-> ReplaySubject#3
 Next-> ReplaySubject#4

这个执行结果与 BehaviorSubject 的相同。可是从并发的角度来看, ReplaySubject 在处理并发subscribe() 和 onNext() 时会更加复杂。

ReplaySubject 除了能够限制缓存数据的数量,还能限制缓存的时间,使用 createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) 便可。

PublishSubject

Observer 只接收 PublishSubject 被订阅以后发送的数据

PublishSubject<String> subject = PublishSubject.create();
subject.onNext("PublishSubject#1");
subject.onNext("PublishSubject#2");
subject.onComplete();

// 订阅
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("PublishSubject#3");
subject.onNext("PublishSubject#4");

// 执行结果
 Complete.

由于 subject 在订阅以前己经执行了 onComplete() 方法,因此没法发射数据。修改一下,将 onComplete() 方法放在最后。执行结果以下:

Next-> PublishSubject#3
 Next-> PublishSubject#4
 Complete.

4 个 Subject 的特性总结:

可能错过的事件

Subject 做为一个 Observable 时,能够不停地调用 onNext() 来发送事件,直至遇到 onComplete() 才会结束。

PublishSubject<String> subject = PublishSubject.create();
// 订阅
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("PublishSubject#1");
subject.onNext("PublishSubject#2");
subject.onComplete();

// 执行结果
 Next-> PublishSubject#1
 Next-> PublishSubject#2
 Complete.

若是使用 subscribeOn 操做符将 subject 切换到 I/O 线程,则可使用 Thread.sleep(2000) 让主线程休眠 2s

PublishSubject<String> subject = PublishSubject.create();
// 订阅
subject.subscribeOn(Schedulers.io()) // 切换到I/O线程
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next-> " + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error->" + throwable.getMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

subject.onNext("PublishSubject#1");
subject.onNext("PublishSubject#2");
subject.onComplete();

try {
    Thread.sleep(2000L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

// 执行结果
 Complete.

没有打印 PublishSubject#1 和 PublishSubject#2 ???

由于 subject 发射元素的线程被指派到了 I/O 线程,此时 I/O 线程正在初始化还没起来,subject 发射前,PublishSubject#1 和 PublishSubject#2 这两个元素还在主线程中,而在从主线程往 I/O 线程转发的过程当中,因为 I/O 线程尚未起来,因此就被丢弃了。此时,不管 Thread 睡了多少秒, PublishSubject#1 和 PublishSubject#2 都不会被打印出来。

其实,解决方法很简单,使用 Observable.create() 来替代 subject ,它容许为每一个订阅者精确控制事件的发迭,这样就不会少打印 PublishSubject#1 和 PublishSubject#2

使用 PublishSubject 实现简化的 RxBus

下面的代码实现了一个简化版本的 Android EventBus ,在这里使用了 PublishSubject 。由于事件总线是基于发布/订阅模式实现的,若是某一事件在多个 Activity/Fragment 中被订阅,则在 App 的任意地方一旦发布该事件,多个订阅的地方均能收到这一事件(在这里,订阅事件的 Activity/Fragment 不能被损坏,一旦被损坏就不能收到事件),这很符合 Hot Observable 的特性。因此,咱们使用 PublishSubject ,考虑到多钱程的状况,还须要使用 Subject 的 toSerialized() 方法。

public class RxBus {
    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    private Subject<Object> mBus;

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
    }

    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public <T> Observable<T> toObservable(Class<T> clazz) {
        return mBus.ofType(clazz);
    }

    public void post(Object object) {
        mBus.onNext(object);
    }
}

在这里, Subject 的 toSerialized(), 使用 SerializedSubject 包装了原先的 Subject

public final Subject<T> toSerialized() {
    if (this instanceof SerializedSubject) {
        return this;
    }
    return new SerializedSubject<T>(this);
}

这个版本的 EventBus 较简单,井没有考虑背压的状况,由于在 RxJava 2.x 中, SuSubject 已经再也不支持背压了。若是要增长背压的处理,可使用 Processor,须要将 PublishSubject 改为 PublishProcessor,对应的 Observable 须要改为 Flowable

使用 BehaviorSubject 实现预加载

预加载能够很好地提升程序的用户体验。每当用户处于弱网络时,打开 App 极可能会出现一片空白或者一直在 loading,此时用户必定很烦躁。而若是可以预先加载一些数据,例如上一次打开 App 时保存的数据,那么必定会有效提高 App 的用户体验

下面是借助 BehaviorSubject 的特性来实现一个简单的预加载类 RxPreLoader

public class RxPreLoader<T> {

    // 可以缓存订阅以前的最新数据
    private BehaviorSubject<T> dataSubject;
    private Disposable disposable;

    public RxPreLoader(T defaultValue) {
        dataSubject = BehaviorSubject.createDefault(defaultValue);
    }

    // 发送事件
    public void publish(T data) {
        dataSubject.onNext(data);
    }

    // 订阅事件
    public Disposable subscribe(Consumer<? super T> onNext) {
        disposable = dataSubject.subscribe(onNext);
        return disposable;
    }

    // 取消订阅
    public void disposable() {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
            disposable = null;
        }
    }

    // 获取缓存数据的Subject
    public BehaviorSubject<T> getCacheDataSubject() {
        return dataSubject;
    }

    // 直接获取最近的一条数据
    public T getLastCacheData() {
        return dataSubject.getValue();
    }

}

Processor

Processor 和 Subject 的做用相同。 Processor 是 RxJava2.0 新增的功能,它是一个接口,继承自 Subscriber 和 Publisher,可以支持背压(Back Pressure)控制。这是 Processor 和 Subject 最大区别。

其实, Publisher 和 Processor 都是 Reactive Streams 的接口, Reactive Streams 项目提供了一个非阻塞异步流处理的背压标准。 RxJava 2.0 己经基于 Reactive Streams 库进行重写 ,包括 Single、Completable ,都是基于 Reactive Streams 规范重写的, Flowable 实现了 Reactive Streams Publisher 接口等。

Reactive Streams 的目标是管制流数据在跨越异步边界进行流数据交换,能够认为是将元素传递到另外一个线程或线程池,同时确保在接收端不是被迫缓冲任意数量的数据。换句话说,背压是该模型的重要组成部分,经过设置协调线程之间的队列大小进行限制 。当各异步模型之间采用同步通讯时会削弱异步带来的好处,所以必须采起谨慎措施,强制彻底无阻塞反应流能在系统的各个方面都作到异步实施。

Reactive Streams 规范的主要目标:

  • 经过异步边界( Asynchronous Boundary )来解糯系统组件 。解祸的先决条件,分离事件/数据流的发送方和接收方的资源使用
  • 为背压处理定义一种模型。流处理的理想范式是将数据从发布者推送到订阅者,这样发布者就能够快速发布数据,同时经过压力处理来确保速度更快的发布者不会对速度较慢的订阅者形成过载。背压处理经过使用流控制来确保操做的稳定性并能实现优雅降级,从而提供弹性能力。

Reactive Streams JVM接口 由如下四个接口组成:

  • Publisher:消息发布者
  • Subscriber :消息订阅者
  • Subscription: 一个订阅
  • Processor: Publisher + Subscriber 的结合体

RxJava 2.0 中使用 Processor 来处理背压。同时,在新发布的 Java 9 中也已经引入 Reactive
Streams 的思想,具体来讲是构建在 java.util.concurrent.Flow 容器中,包含了四个接口类。

RxJava 的 Subject 是一种特殊的存在,它的灵活性在使用时也会伴随着风险,如果没有用好则可能会错过事件。注意,Subject 不是线程安全的。固然不少开源框架都在使用 Subject,例如大名鼎鼎的 RxLifecycle 就使用了 BehaviorSubject

若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)

相关文章
相关标签/搜索