ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Coroutine] 코루틴 학습 - 17 (Flow processing)
    Java/Kotlin 2022. 5. 28. 17:27
    반응형

    map, filter

    suspend fun main() {
        flowOf(1, 2, 3, 4)
            .map { it * it }
            .filter { it % 2 == 0 }
            .collect { println(it) }
    }
    // 4
    // 16

     컬렉션에서 흔하게 사용하는 map과 filter 기능과 같다.

     

    take and drop

    suspend fun main() {
        ('A'..'Z').asFlow()
            .take(5)
            .collect { println(it) }
    
        println()
    
        ('A'..'Z').asFlow()
            .drop(20)
            .collect { println(it) }
    
        println()
    }
    // A
    // B
    // C
    // D
    // E
    //
    // U
    // V
    // W
    // X
    // Y
    // Z

     take는 element의 앞부분부터 정해진 수만큼 사용하고, drop은 정해진 수만큼 앞의 element를 제외하고 그 뒤의 나머지 element를 사용한다.

     

    merge

    suspend fun main() {
        val ints: Flow<Int> = flowOf(1, 2, 3)
            .onEach { delay(1000) }
        val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
    
        val together: Flow<Number> = merge(ints, doubles)
        println(together.collect { println(it) })
    }
    // 0.1
    // 0.2
    // 0.3
    // (1 sec)
    // 1
    // (1 sec)
    // 2
    // (1 sec)
    // 3

     merge는 여러 flow를 하나로 합치는데, 순서에 상관없이 한 flow가 지연되더라도 다른 flow는 기다리지 않고 처리된다.

     

    zip

    suspend fun main() {
        val flow1 = flowOf("A", "B", "C")
            .onEach { delay(400) }
        val flow2 = flowOf(1, 2, 3 ,4)
            .onEach { delay(1000) }
        flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}"}
            .collect { println(it) }
    }
    // (1 sec)
    // A_1
    // (1 sec)
    // B_2
    // (1 sec)
    // C_3

     zip은 두 flow의 element를 한 쌍으로 만든다. 각 element는 오직 한 쌍의 부분 요소로만 사용할 수 있고, 한 쌍이 되지 않았다면 대기하게 된다. 한 쪽 flow가 완료되었을 때는 다른 flow에 element가 남아있더라도 종료된다.

     

    combine

    suspend fun main() {
        val flow1 = flowOf("A", "B", "C")
            .onEach { delay(400) }
        val flow2 = flowOf(1, 2, 3, 4)
            .onEach { delay(1000) }
        flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
            .collect { println(it) }
    }
    // (1 sec)
    // B_2
    // (0.2 sec)
    // C_1
    // (0.8 sec)
    // C_2
    // (1 sec)
    // C_3
    // (1 sec)
    // C_4

     combine은 zip과 같이 두 flow를 합쳐서 한 쌍의 element로 만들어 emit 된다는 점은 같지만 두 flow 모두 한 쪽이 느린 flow를 기다려서 한 쌍의 element를 만든다는 점이 다르다. 새로운 element는 이전의 element를 대체하여 새로운 한 쌍을 만든다. 위의 예제 코드를 통해 이를 확인할 수 있다.

     

    fold and scan

    suspend fun main() {
        val list = flowOf(1, 2, 3, 4)
            .onEach { delay(1000) }
        val res = list.fold(0) { acc, i -> acc + i }
        println(res)
    }
    // (4 sec)
    // 10

     fold는 종단 연산이고, collect() 처럼 flow가 완료될 때까지 일시중단된다.

     

    suspend fun main() {
        flowOf(1, 2, 3, 4)
            .onEach { delay(1000) }
            .scan(0) { acc, v -> acc + v }
            .collect { println(it) }
    }
    // 0
    // (1 sec)
    // 1
    // (1 sec)
    // 3
    // (1 sec)
    // 6
    // (1 sec)
    // 10

     flow에서의 scan은 중간 연산이며, 이전 단계에서의 값을 수신한 후에 새로운 값을 만들어낸다.

     

    flatMapConcat, flatMapMerge, flatMapLatest

    private fun flowFrom(element: String) = flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_${element}" }
    
    suspend fun main() {
        flowOf("A", "B", "C")
            .flatMapConcat { flowFrom(it) }
            .collect { println(it) }
    }
    // (1 sec)
    // 1_A
    // (1 sec)
    // 2_A
    // (1 sec)
    // 3_A
    // (1 sec)
    // 1_B
    // (1 sec)
    // 2_B
    // (1 sec)
    // 3_B
    // (1 sec)
    // 1_C
    // (1 sec)
    // 2_C
    // (1 sec)
    // 3_C

     flatMapConcat은 두 플로우를 함께 처리하는데 flatMap과 같이 평탄화하여 작업을 수행한다. flatMapConcat은 생성된 flow를 차례로 처리한다.

     

    private fun flowFrom(element: String) = flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_${element}" }
    
    suspend fun main() {
        flowOf("A", "B", "C")
            .flatMapMerge { flowFrom(it) }
            .collect { println(it) }
    }
    // (1 sec)
    // 1_A
    // 1_B
    // 1_C
    // (1 sec)
    // 2_A
    // 2_B
    // 2_C
    // (1 sec)
    // 3_A
    // 3_B
    // 3_C

     flatMapMerge는 첫 번째 flow를 동시에 처리한다. flatMapMerge 함수에는 concurrency 파라미터를 통해서 동시에 처리할 element 수를 지정할 수 있다. 기본 값은 16이며, JVM에서 사용하는 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티에 따라서 달라질 수 있다.

     

    private fun flowFrom(element: String) = flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_${element}" }
    
    suspend fun main() {
        flowOf("A", "B", "C")
            .flatMapLatest { flowFrom(it) }
            .collect { println(it) }
    }
    // (1 sec)
    // 1_C
    // (1 sec)
    // 2_C
    // (1 sec)
    // 3_C

     flatMapLatest는 flow에서 새로운 element가 emit되면 이전의 element는 덮어씌워진다. 

    private fun flowFrom(element: String) = flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_${element}" }
    
    suspend fun main() {
        flowOf("A", "B", "C")
            .onEach { delay(1200) }
            .flatMapLatest { flowFrom(it) }
            .collect { println(it) }
    }
    // (2.2 sec)
    // 1_A
    // (1.2 sec)
    // 1_B
    // (1 sec)
    // 1_C
    // (1 sec)
    // 2_C
    // (1 sec)
    // 3_C

     

    Terminal 연산

     종단 연산에는 collect() 뿐만 아니라 다양한 연산을 지원한다.

    • first(), firstOrNull()
      • 처음으로 emit된 element를 찾는다.
    • fold(), reduce()
      • emit된 값들을 람다 표현식에 정의된 연산을 통해 하나의 결과 값으로 만들어낸다.
    • count()
      • emit된 element의 수를 반환한다.

     종단 연산은 일시중단되며, flow가 완료되면 값을 리턴한다.

     


    참고자료

    https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837

    반응형

    댓글

Designed by Tistory.