博客主页java
被观察者( Observable/Flowable/Single/Completable/May )发射的数据流能够经历各类线程切换,可是数据流的各个元素之间不会产生并行执行的效果。井行不是并发,也不是同步,更不是异步。算法
并发( concurrency )是指一个处理器同时处理多个任务。并行( parallelism )是多个处理器
或者是多核的处理器同时处理多个不一样的任务。井行是同时发生的多个并发事件,具备井发的含义,而并发则不必定是并行。segmentfault
在 RxJava 中能够借助 flatMap 操做符来实现服务器
Observable.range(1, 100) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(integer) .subscribeOn(Schedulers.computation()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } });
flatMap 操做符的原理是将这个 Observable 转化为为多个以原 Observable 发射的数据做为源数据 Observable,而后再将这多个 Observable 发射的数据整合发射出来。最后的顺序可能会交错地发射出来。并发
flatMap 会对原始 Observable 发射的每一项数据执行变换操做。在这里,生成的每一个 Observable 使用线程池(指定了 computation 做为 Scheduler )并发地执行。app
还可使用 ExecutorService 来建立一个 Scheduler 对刚才的代码稍微作一些改动。负载均衡
int threadNum = Runtime.getRuntime().availableProcessors() + 1; final ExecutorService executorService = Executors.newFixedThreadPool(threadNum); final Scheduler scheduler = Schedulers.from(executorService); Observable.range(1, 100) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(integer) .subscribeOn(scheduler) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .doFinally(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Finally."); executorService.shutdown(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error."); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } });
当完成全部的操做以后, executorService 须要执行 shutdown() 来关闭 ExecutorService。咱们可使用 doFinally 操做符来执行 shutdown()。异步
doFinally 操做符能够在 onError 或者 onComplete 以后调用指定的操做,或由下游处理。ide
Round-Robin 算法是最简单的一种负载均衡算法。它的原理是把来自用户的请求轮流分配给内部的服务器:从服务器 1 开始,直到服务器 N、,而后从新开始循环,也被称为啥希取模法,是很是经常使用的数据分片方法。 Round-Robin 算法的优势是简洁,它无须记录当前全部链接的状态,因此是一种无状态调度。fetch
经过 Round-Robin 算法把数据按线程数分组,例如分红 5 组,每组个数相同,一块儿发送处理。这样作的目的是能够减小 Observable 的建立 ,从而节省系统资源,可是会增长处理时间。Round-Robin 算法能够当作是对时间和空间的综合考虑。
final AtomicInteger batch = new AtomicInteger(0); Observable.range(1, 100) .groupBy(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { return batch.getAndIncrement() % 5; } }) .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception { return integerIntegerGroupedObservable.observeOn(Schedulers.io()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } });
RxJava 2.0.5 版本新增了 ParallelFlowable API ,它容许并行地执行一些操做符,例如 map、filter、concatMap、flatMap、collect、reduce 等。
public abstract class ParallelFlowable<T> { }
ParallelFlowable 是并行的 Flowable 版本,并非新增的被观察者类型。在 ParallelFlowable 中,不少典型的操做符( take、skip 等)是不可用的。
在 RxJava 并无 ParallelObservable ,由于在 RxJava 2.x 以后, Observable 再也不支持背压。
然而在并行处理中背压是必不可少的, 不然会淹没在并行操做符的内部队列中。
也不存在 ParallelSingle、ParallelCompletable、ParallelMaybe
在相应的操做符上调用 Flowable 的 parallel() 就会返回 ParallelFlowable
ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 100).parallel(); parallelFlowable .runOn(Schedulers.io()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }) .sequential() .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } });
其中 parallel() 调用了 ParallelFlowable.from
public final ParallelFlowable<T> parallel() { return ParallelFlowable.from(this); } public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source) { return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize()); }
Paralle!ParallelFlowable 的 from() 方法是经过 Publisher 并以循环的方式在多个“轨道” (CPU 数)上消费它的。
默认状况下,并行级别被设置为可用 CPU 的数量 ( Runtime.getRuntime().availableProcessors() ),井且顺序源的预取量设置为 Flowable.bufferSize()。二者均可以经过重载 parallel() 方法来指定。
public final ParallelFlowable<T> parallel(int parallelism) { ObjectHelper.verifyPositive(parallelism, "parallelism"); return ParallelFlowable.from(this, parallelism); } public final ParallelFlowable<T> parallel(int parallelism, int prefetch) { ObjectHelper.verifyPositive(parallelism, "parallelism"); ObjectHelper.verifyPositive(prefetch, "prefetch"); return ParallelFlowable.from(this, parallelism, prefetch); }
若是己经使用了必要的井行操做,则能够经过 ParallelFlowable.sequential() 操做符返回到顺序流。
ParallelFlowable 遵循与 Flowable 相同的异步原理,所以 parallel() 自己并不引入顺序源的异
步消耗,只准备并行流,可是能够经过 runOn(Scheduler) 操做符定义异步。这点与 Flowable
很大不一样, Flowable 使用 subscribeOn, observeOn 操做符。
runOn() 能够指定 prefetch 的数量。
public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler) { return runOn(scheduler, Flowable.bufferSize()); } public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) { ObjectHelper.requireNonNull(scheduler, "scheduler"); ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch)); }
并不是全部的顺序操做在并行世界中都是有意义的。目前 ParallelFlowable 只支持以下操做
map, filter, flatMap, concatMap, reduce, collect, sorted, toSortedList, compose,fromArray, doOnCancel, doOnError, doOnComplete, doOnNext, doAfterNext, doOnSubscribe, doAfterTerminated, doOnRequest
这些 ParallelFlowable 可用的操做符,使用方法与 Flowable 中的用法
Observable.flatMap 来实现并行, Flowable.flatMap 实现井行的原理和 Observable.flatMap 实现并行的原理相同。
那么何时使用 flatMap 进行并行处理比较好,何时使用 ParallelFlowable 比较好呢?
RxJava 本质上是连续的,借助 flatMap 操做符进行分离和加入一个序列可能会变得很复杂,
并引发必定的开销 。是若是使用 ParallelF!owable ,则开销会更小。
然而, parallelFlowable!Flowable 的操做符颇有限,若是有一些特殊的操做须要并行执行,而这些操
做不能用 parallelFlowable 所支持的操做符来表达,那么就应该使用基于 Flowable.flatMap 来实
现井行。
所以,优先推荐使用 parallelFlowable ,对于没法使用 parallelFlowable 的操做符,则可使
flatMap 来实现井行。
若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)