Koltin Flow 基础详解

Koltin Flow 基础详解

kotlinx-coroutines-core 1.4.2版本出来了,没有了以前的实验方法警告,终于能够能够愉快的玩耍了,如今准备把有关flow的相关资料整理汇总一下java

1.概览

Flow基础详解

2.Kotlin Flow 介绍

A cold asynchronous data stream that sequentially emits values and completes normally or with an exception。
复制代码

意思是:按顺序发出值并正常完成或异常完成的Cold异步数据流。markdown

与rxjava做用相似,可能会在之后的开发中逐步代替rxjava,使整个开发生态更加趋向一体化异步

4.Flow的建立

  • Empty Flow
emptyFlow<String>()
复制代码
  • 经过flowOf函数
flowOf(1, 2, 3)
// 1, 2, 3
flowOf(listOf(1,2,3))
// [1, 2, 3]
复制代码
  • Iterable调用asFlow函数
listOf(1, 2, 3).asFlow()
// 1, 2, 3
复制代码
  • 无参可是有返回值的函数**(() -> T)**调用asFlow函数
fun flowBuilderFunction(): Int {
    return 10
}

::flowBuilderFunction.asFlow()

// 10
复制代码
  • 无参可是有返回值的挂起函数**(() -> T)**调用asFlow函数
suspend fun flowBuilderFunction(): Int {
    return 10
}

::flowBuilderFunction.asFlow()

// 10
复制代码
  • Array调用asFlow函数
LongRange(1, 5).asFlow().collect { value -> println(value) }
复制代码

5.Flow操做符

Delay相关的操做符

  • debounceasync

    特性:函数

    1. 若是两个相邻的值生产出来的时间间隔超过了[timeout]毫秒,就忽过滤掉前一个值

    最后一个值不受影响,老是会被释放emit。 [timeout]能够传毫秒,也能够传Durationui

    flow {
            emit(1)
            delay(3000)
            emit(2)
            delay(1000)
            emit(3)
            delay(1000)
            emit(4)
        }.debounce(2000)
    
        // 结果:1 4 
        // 解释:
        // 2和1的间隔大于2000,1被释放
        // 3和2的间隔小于2000, 2被忽略
        // 4和3的间隔小于2000, 3被忽略
        // 4是最后一个值不受timeout值的影响, 4被释放
    
    
    
    flow {
        emit(1)
        delay(3000)
        emit(2)
        delay(1000)
        emit(3)
        delay(1000)
        emit(4)
    }.debounce(2000.milliseconds)
    
    // 结果:1 4 
    
    应用:可用于搜索框的反复输入内容筛选
    
    
    
    
    
    复制代码

Distinct相关的操做符

  • distinctUntilChangedspa

    1.若是生产的值和上个发送的值相同,值就会被过滤掉code

    flow {
          emit(1)
          emit(1)
          emit(2)
          emit(2)
          emit(3)
          emit(4)
      }.distinctUntilChanged()
    
      // 结果:1 2 3 4
      // 解释:
      // 第一个1被释放
      // 第二个1因为和第一个1相同,被过滤掉
      // 第一个2被释放
      // 第二个2因为和第一个2相同,被过滤掉
      // 第一个3被释放
      // 第一个4被释放
    复制代码
    1. 能够传参(old: T, new: T) -> Boolean,进行自定义的比较
    private class Person(val age: Int, val name: String)
    
    flow {
        emit(Person(20, "张三"))
        emit(Person(21, "李四"))
        emit(Person(21, "王五"))
        emit(Person(22, "赵六"))
    }.distinctUntilChanged{old, new -> old.age == new.age }
    .collect{ value -> println(value.name) }
        
    // 结果:张三 李四 赵六
    // 解释:本例子定义若是年龄相同就认为是相同的值,因此王五被过滤掉了
    复制代码
    1. 能够用distinctUntilChangedBy转换成年龄进行对比
    flow {
        emit(Person(20, "张三"))
        emit(Person(21, "李四"))
        emit(Person(21, "王五"))
        emit(Person(22, "赵六"))
    }.distinctUntilChangedBy { person -> person.age }
    
    // 结果:张三 李四 赵六
    复制代码

Emitters相关的操做符

  • transformorm

    对每一个值进行转换协程

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.transform {
        if (it % 2 == 0) {
            emit(it * it)
        }
    }
    
    // 结果:4 16
    // 解释:
    // 1 不是偶数,被忽略
    // 2 是偶数,2的平方4
    // 3 不是偶数,被忽略
    // 4 是偶数,4的平方16
    复制代码
  • onStart

    第一个值被释放以前被执行

    flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }.onStart { emit(1000) }
    
        // 结果:1000 1 2 3 4
        // 解释:
        // 第一个值1被释放的时候调用了emit(10 00), 因此1000在1以前被释放
    复制代码
  • onCompletion

    最后一个值释放完成以后被执行

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.onCompletion { emit(1000) }
    
      // 结果:1 2 3 4 1000
      // 解释:
      // 第一个值4被释放的时候调用了emit(100 0), 因此1000在4以后被释放
    复制代码

Limit相关的操做符

  • drop

    忽略最开始的[count]个值

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.drop(2)
    
      // 结果:3 4
      // 解释:
      // 最开始释放的两个值(1,2)被忽略了
    复制代码
  • dropWhile

    判断第一个值若是知足(T) -> Boolean这个条件就忽略

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.dropWhile {
          it % 2 == 0
      }
    
      // 结果:1 2 3 4
      // 解释:
      // 第一个值不是偶数,因此1被释放
    
      flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.dropWhile {
          it % 2 != 0
      }
    
      // 结果:2 3 4
      // 解释:
      // 第一个值是偶数,因此1被忽略
    复制代码
  • take

    只释放前面[count]个值

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.take(2)
    
      // 结果:1 2
      // 解释:
      // 前面两个值被释放
    复制代码
  • takeWhile

    判断第一个值若是知足(T) -> Boolean这个条件就释放

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.takeWhile { it%2 != 0 }
    
      // 结果:1
      // 解释:
      // 第一个值知足是奇数条件
    
      flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.takeWhile { it%2 == 0 }
    
      // 结果:无
      // 解释:
      // 第一个值不知足是奇数条件
    
    
    复制代码

CoroutineContext相关的操做符

  • flowOn

    能够切换CoroutineContext 说明:flowOn只影响该运算符以前的CoroutineContext,对它以后的CoroutineContext没有任何影响

  • buffer

    将flow的多个任务分配到不一样的协程中去执行,加快执行的速度。

  • conflate

    若是值的生产速度大于值的消耗速度,就忽略掉中间将来得及处理的值,只处理最新的值。

val flow1 = flow {
        delay(2000)
        emit(1)
        delay(2000)
        emit(2)
        delay(2000)
        emit(3)
        delay(2000)
        emit(4)
    }.conflate()

    flow1.collect { value ->
        println(value)
        delay(5000)
    }

    // 结果: 1 3 4
    // 解释:
    // 2000毫秒后生产了1这个值,交由collect 执行,花费了5000毫秒,当1这个值执行co llect完成后已经通过了7000毫秒。
    // 这7000毫秒中,生产了2,可是collect还 没执行完成又生产了3,因此7000毫秒之后 会直接执行3的collect方法,忽略了2这 个值
    // collect执行完3后,还有一个4,继续执 行。
复制代码

Flatten相关的操做符

  • flatMapConcat
将原始的Flow<T>经过[transform]转换成Flow<Flow<T>>,而后将Flow<Flow<T>>释放的Flow<T>其中释放的值一个个释放。
  
  flow {
      delay(1000)
      emit(1)
      delay(1000)
      emit(2)
      delay(1000)
      emit(3)
      delay(1000)
      emit(4)
  }.flatMapConcat {
      flow {
          emit("$it 产生第一个flow值")
          delay(2500)
          emit("$it 产生第二个flow值")
      }
  }.collect { value ->
      println(value)
  }
  
  // 结果
  // I/System.out: 1 产生第一个flow值
  // I/System.out: 1 产生第二个flow值
  // I/System.out: 2 产生第一个flow值
  // I/System.out: 2 产生第二个flow值
  // I/System.out: 3 产生第一个flow值
  // I/System.out: 3 产生第二个flow值
  // I/System.out: 4 产生第一个flow值
  // I/System.out: 4 产生第二个flow值
  
  // 解释:
  // 原始Flow<Int>经过flatMapConcat被转换成Flow<Flow<Int>>
  // 原始Flow<Int>首先释放1,接着Flow<Flow<Int>> 就会释放 1产生第一个flow值 和 1产生第二个flow值 两个值
  // Flow<Int>释放2,...
  // Flow<Int>释放3,...
  // Flow<Int>释放4,...
  
复制代码
  • flattenConcat

    和flatMapConcat相似,只是少了一步Map操做。

    flow {
        delay(1000)
        emit(flow {
            emit("1 产生第一个flow值")
            delay(2000)
            emit("1 产生第二个flow值") })
        delay(1000)
        emit(flow {
            emit("2 产生第一个flow值")
            delay(2000)
            emit("3 产生第二个flow值") })
        delay(1000)
        emit(flow {
            emit("3 产生第一个flow值")
            delay(2000)
            emit("3 产生第二个flow值") })
        delay(1000)
        emit(flow {
            emit("4 产生第一个flow值")
            delay(2500)
            emit("4 产生第二个flow值") })
        }.flattenConcat()
        
    // 结果
    // I/System.out: 1 产生第一个flow值
    // I/System.out: 1 产生第二个flow值
    // I/System.out: 2 产生第一个flow值
    // I/System.out: 2 产生第二个flow值
    // I/System.out: 3 产生第一个flow值
    // I/System.out: 3 产生第二个flow值
    // I/System.out: 4 产生第一个flow值
    // I/System.out: 4 产生第二个flow值
    复制代码
  • flatMapMerge

    将原始的Flow经过[transform]转换成Flow<Flow>,而后将Flow<Flow>释放的Flow其中释放的值一个个释放。 它与flatMapConcat的区别是:Flow<Flow>释放的Flow其中释放的值没有顺序性,谁先产生谁先释放。

flow {
      delay(1000)
      emit(1)
      delay(1000)
      emit(2)
      delay(1000)
      emit(3)
      delay(1000)
      emit(4)
  }.flatMapMerge {
      flow {
          emit("$it 产生第一个flow值")
          delay(2500)
          emit("$it 产生第二个flow值")
      }
  }.collect { value ->
      println(value)
  }
    
复制代码
  • merge

    将Iterable<Flow>合并成一个Flow

val flow1 = listOf(
     flow {
         emit(1)
         delay(500)
         emit(2)
     },
     flow {
         emit(3)
         delay(500)
         emit(4)
     },
     flow {
         emit(5)
         delay(500)
         emit(6)
     }
 )
 flow1.merge().collect { value -> println("$value") }
 
 // 结果: 1 3 5 2 4 6
 // 解释:
 // 按Iterable的顺序和耗时顺序依次释放值
复制代码
  • transformLatest

    原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow

  • flatMapLatest

    和transformLatest相似, 原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow

    区别:flatMapLatest的transform转换成的是Flow, transformLatest的transform转换成的是Unit

  • mapLatest

    和transformLatest相似, 原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow

    区别:mapLatest的transform转换成的是T,flatMapLatest的transform转换成的是Flow,transformLatest的transform转换成的是Unit

Transform相关的操做符

  • filter

    经过predicate进行过滤,知足条件则被释放

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.filter { it % 2 == 0 }
    
    // 结果: 2 4
    // 解释:
    // 2和4知足it % 2 == 0,被释放
    复制代码
  • filterNot

    经过predicate进行过滤,不知足条件则被释放

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.filterNot { it % 2 == 0 }
    
    // 结果: 1 3
    // 解释:
    // 1和3不知足it % 2 == 0,被释放
    复制代码
  • filterIsInstance

    若是是某个数据类型则被释放

    flow {
        emit(1)
        emit("2")
        emit("3")
        emit(4)
    }.filterIsInstance<String>()
    
    // 结果: "2" "3"
    // 解释:
    // "2" "3"是String类型,被释放
    复制代码
  • filterNotNull

    若是数据是非空,则被释放

    flow {
        emit(1)
        emit("2")
        emit("3")
        emit(null)
    }.filterNotNull()
    
    // 结果: 1 "2" "3"
    复制代码
  • map

    将一个值转换成另一个值

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.map { it * it }
    
    // 结果: 1 4 9 16
    // 解释:
    // 将1,2,3,4转换成对应的平方数
    复制代码
  • mapNotNull

    将一个非空值转换成另一个值

  • withIndex

    将值封装成IndexedValue对象

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.withIndex()
    
    // 结果:
    // I/System.out: IndexedValue(index=0, value=1)
    // I/System.out: IndexedValue(index=1, value=2)
    // I/System.out: IndexedValue(index=2, value=3)
    // I/System.out: IndexedValue(index=3, value=4)
    复制代码
  • onEach

    每一个值释放的时候能够执行的一段代码

  • scan

    有一个初始值,而后每一个值都和初始值进行运算,而后这个值做为后一个值的初始值

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.scan(100) { acc, value ->
        acc * value
    }
    
    // 结果: 100 100 200 600 2400
    // 解释:
    // 初始值 100
    // 1 100 * 1 = 100
    // 2 100 * 2 = 200
    // 3 200 * 3 = 600
    // 4 600 * 4 = 2400
    复制代码
  • runningReduce

    和scan相似,可是没有初始值,最开始是它自己

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.runningReduce { acc, value ->
        acc * value
    }
    
    // 结果: 1 2 6 24
    // 解释:
    // 1 1
    // 2 1 * 2 = 2
    // 3 2 * 3 = 6
    // 4 6 * 4 = 24
    复制代码

合并的操做符

  • zip

    将两个Flow在回调函数中进行处理返回一个新的值 R 当两个flow的长度不等时只发送最短长度的事件

    val nums = (1..4).asFlow() 
    val strs = flowOf("one", "two", "three") 
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
    
    // 结果:
    1 -> one
    2 -> two
    3 -> three
    复制代码
  • combine

    任意一个flow释放值且都有释放值后会调用combine后的代码块,且值为每一个flow的最新值。 和zip的区别: 组合两个流,在通过第一次发射之后,任意方有新数据来的时候就能够发射,另外一方有多是已经发射过的数据

    val flow1 = flowOf(1, 2, 3, 4).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c", "d").onEach { delay(20) }
    
    flow1.combine(flow2) { first, second ->
        "$first$second"
    }.collect { println("$it") }
    
    // 结果:1a 2a 2b 3b 4b 4c 4d
    
    // 解释:
    // 开始 --- flow1 释放 1,flow2 释放 a, 释放1a
    // 10毫秒 --- flow1 释放 2,释放2a
    // 20毫秒 --- flow2 释放 b,此时释放2b
    // 30毫秒 --- flow1 释放 3,此时释放3b
    // 40毫秒 --- flow1 释放 4,此时释放4b
    // 40毫秒 --- flow2 释放 c,此时释放4c
    // 60毫秒 --- flow2 释放 d,此时释放4d
    复制代码

retry相关操做符

  • retry
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重试次数 predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T> 复制代码
  • retryWhen
public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean ): Flow<T>
复制代码

末端操做符

  • Collect相关的末端操做符

    • collect

      接收值

    • launchIn

      scope.launch { flow.collect() }的缩写, 表明在某个协程上下文环境中去接收释放的值

      val flow1 = flow {
          delay(1000)
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(1000)
          emit(4)
      }
      
      flow1.onEach { println("$it") }
          .launchIn(GlobalScope)
      
      // 结果:1 2 3 4
      复制代码
    • collectIndexed

      和withIndex对应的,接收封装的IndexedValue

      val flow1 = flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.withIndex()
      
      flow1.collectIndexed { index, value ->
          println("index = $index, value = $value")
      }
      
      // 结果:
      // I/System.out: index = 0, value = IndexedValue(index=0, value=1)
      // I/System.out: index = 1, value = IndexedValue(index=1, value=2)
      // I/System.out: index = 2, value = IndexedValue(index=2, value=3)
      // I/System.out: index = 3, value = IndexedValue(index=3, value=4)
      复制代码
    • collectLatest

      collectLatest与collect的区别是,若是有新的值释放,上一个值的操做若是没执行完则将会被取消

      val flow1 = flow {
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(2000)
          emit(4)
      }
      
      flow1.collectLatest {
          println("正在计算收到的值 $it")
          delay(1500)
          println("收到的值 $it")
      }
      
      // 结果:
      // I/System.out: 正在计算收到的值 1
      // I/System.out: 正在计算收到的值 2
      // I/System.out: 正在计算收到的值 3
      // I/System.out: 收到的值 3
      // I/System.out: 正在计算收到的值 4
      // I/System.out: 收到的值 4
      
      // 解释:
      // 1间隔1000毫秒后释放2,2间隔1000毫秒后释放3,这间隔小于须要接收的时间1500毫秒,因此当2和3 到来后,以前的操做被取消了。
      // 3和4 之间的间隔够长可以等待执行完毕,4是最后一个值也能执行
      复制代码
  • Collection相关的末端操做符

    • toList

      将释放的值转换成List

      flow {
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(2000)
          emit(4)
      }
      
      println(flow1.toList())
      
      // 结果:[1, 2, 3, 4]
      复制代码
    • toSet

      将释放的值转换成Set

      flow {
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(2000)
          emit(4)
      }
      
      println(flow1.toSet())
      
      // 结果:[1, 2, 3, 4]
      
      复制代码
  • Count相关的末端操做符

    • count

      1.计算释放值的个数

    val flow1 = flow {
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
            delay(2000)
            emit(4)
        }
        
        println(flow1.count())
                
        // 结果:4
        
        2.计算知足某一条件的释放值的个数
        val flow1 = flow {
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
            delay(2000)
            emit(4)
        }
        
        println(flow1.count { it % 2 == 0 })
                
        // 结果:2
        // 解释:
        // 偶数有2个值 2 4
        ```
        
        
        
        
    
    复制代码
  • Reduce相关的末端操做符

    • reduce

      和runningReduce相似,可是只计算最后的结果。

      val flow1 = flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }
      println(flow1.reduce { acc, value -> acc * value })
      
      // 结果:24
      // 解释:计算最后的结果,1 * 2 * 3 * 4 = 24
      复制代码
    • fold

      和scan相似,有一个初始值,可是只计算最后的结果。

      val flow1 = flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }
      println(flow1.fold(100) { acc, value -> acc * value })
      
      // 结果:2400
      // 解释:计算最后的结果,100 * 1 * 2 * 3 * 4 = 2400
      复制代码
    • single

      只接收一个值的Flow 注意:多于1个或者没有值都会报错

      val flow1 = flow {
          emit(1)
      }
      println(flow1.single())
      
      // 结果:1
      复制代码
    • singleOrNull

      接收一个值的Flow或者一个空值的Flow

    • first/firstOrNull

      1. 接收释放的第一个值/接收第一个值或者空值
    val flow1 = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }
        println(flow1.first())
        
        // 结果:1
        ```
    
        2. 接收第一个知足某个条件的值
        
        val flow1 = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }
        println(flow1.first { it % 2 == 0})
        
        // 结果:2
        ```
        
        
    
    复制代码

Flow的错误异常处理

  • 能够经过 try catch 捕获错误异常

    try {
        flow {
           for (i in 1..3) {
            emit(i)
         }
        }.collect {
            println("接收值 $it")
            check(it <= 1) { "$it 大于1"  }
        }
    } catch (e: Throwable) {
        println("收到了异常: $e")
    }
    
    // 结果:
    // I/System.out: 接收值 1
    // I/System.out: 接收值 2
    // I/System.out: 收到了异常: java.lang.IllegalStateException: 2 大于1
    
    // 解释:
    // 收到2的时候就抛出了异常,让后flow被取消,异常被捕获
    复制代码
  • 经过catch函数

    catch函数可以捕获以前产生的异常,以后的异常没法捕获。

    flow {
         for (i in 1..3) {
            emit(i)
         }
        }.map {
            check(it <= 1) { "$it 大于1" }
            it
        }
        .catch { e -> println("Caught $e") }
        .collect()
    
    // 结果:
    // Caught java.lang.IllegalStateException: 2 大于1
    复制代码

Flow的取消

  • CoroutineScope.cancel

    GlobalScope.launch {
        val flow1 = flow {
            for(i in 1..4){
              emit(i)
            }
        }
        flow1.collect { value ->
            println("$value")
            if (value >= 3) {
                cancel()
            }
        }
    }
            
     // 结果:1 2 3 
    复制代码
  • 流取消检测

    在协程处于繁忙循环的状况下,必须明确检测是否取消。 能够添加 .onEach { currentCoroutineContext().ensureActive() }, 可是这里提供了一个现成的 cancellable 操做符来执行此操做:

    (1..5).asFlow().cancellable().collect { value -> 
            if (value == 3) cancel()  
            println(value)
        } 
    复制代码
相关文章
相关标签/搜索