Kotlin Coroutines Flow 系列(四) 线程操做

photo-of-woman-wearing-denim-jacket-2419423.jpg

七. Flow 线程操做

7.1 更为简化的线程切换

相对于 RxJava 多线程的学习曲线,Flow 对线程的切换友好地多。java

在以前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾经介绍过 Flow 的切换线程,以及 flowOn 操做符。多线程

Flow 只需使用 flowOn 操做符,而没必要像 RxJava 须要去深刻理解 observeOn、subscribeOn 之间的区别。并发

7.2 flowOn VS RxJava 的 observeOn

RxJava 的 observeOn 操做符,接收一个 Scheduler 参数,用来指定下游操做运行在特定的线程调度器 Scheduler 上。app

Flow 的 flowOn 操做符,接收一个 CoroutineContext 参数,影响的是上游的操做。ide

例如:函数

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}
复制代码

flow builder 和 map 操做符都会受到flowOn的影响,并使用 Dispatchers.io 线程池。post

再例如:学习

val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .map {
            it+1
        }
        .flowOn(customerDispatcher)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}
复制代码

flow builder 和两个 map 操做符都会受到两个flowOn的影响,其中 flow builder 和第一个 map 操做符跟上面的例子同样,第二个 map 操做符会切换到指定的 customerDispatcher 线程池。ui

7.3 buffer 实现并发操做

Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中,曾介绍 buffer 操做符对应 RxJava Backpressure 中的 BUFFER 策略。spa

事实上 buffer 操做符也能够并发地执行任务,它是除了使用 flowOn 操做符以外的另外一种方式,只是不能显示地指定 Dispatchers。

例如:

fun main() = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }
        .buffer()
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}
复制代码

执行结果:

1
2
3
4
5
Collected in 1676 ms
复制代码

在上述例子中,全部的 delay 所花费的时间是2000ms。然而经过 buffer 操做符并发地执行 emit,再顺序地执行 collect 函数后,所花费的时间在 1700ms 左右。

若是去掉 buffer 操做符。

fun main() = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}
复制代码

执行结果:

1
2
3
4
5
Collected in 2039 ms
复制代码

所花费的时间比刚才多了300多ms。

7.4 并行操做

在讲解并行操做以前,先来了解一下并发和并行的区别。

并发(concurrency):是指一个处理器同时处理多个任务。 并行(parallelism):是多个处理器或者是多核的处理器同时处理多个不一样的任务。并行是同时发生的多个并发事件,具备并发的含义,而并发则不必定是并行。

RxJava 能够借助 flatMap 操做符实现并行,亦能够使用 ParallelFlowable 类实现并行操做。

下面,以 flatMap 操做符为例实现 RxJava 的并行:

Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(Schedulers.io())
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });
复制代码

Flow 也有相应的操做符 flatMapMerge 能够实现并行。

fun main() = runBlocking {

    val result = arrayListOf<Int>()
    for (index in 1..100){
        result.add(index)
    }

    result.asFlow()
        .flatMapMerge {
            flow {
                emit(it)
            }
            .flowOn(Dispatchers.IO)
        }
        .collect { println("$it") }
}
复制代码

整体而言,Flow 相比于 RxJava 更加简洁一些。

该系列的相关文章:

Kotlin Coroutines Flow 系列(一) Flow 基本使用

Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

Kotlin Coroutines Flow 系列(三) 异常处理

相关文章
相关标签/搜索