Java/Kotlin

[Coroutine] 코루틴 학습 - 17 (Flow processing)

Icarus8050 2022. 5. 28. 17:27
반응형

map, filter

suspend fun main() {
    flowOf(1, 2, 3, 4)
        .map { it * it }
        .filter { it % 2 == 0 }
        .collect { println(it) }
}
// 4
// 16

 컬렉션에서 흔하게 사용하는 map과 filter 기능과 같다.

 

take and drop

suspend fun main() {
    ('A'..'Z').asFlow()
        .take(5)
        .collect { println(it) }

    println()

    ('A'..'Z').asFlow()
        .drop(20)
        .collect { println(it) }

    println()
}
// A
// B
// C
// D
// E
//
// U
// V
// W
// X
// Y
// Z

 take는 element의 앞부분부터 정해진 수만큼 사용하고, drop은 정해진 수만큼 앞의 element를 제외하고 그 뒤의 나머지 element를 사용한다.

 

merge

suspend fun main() {
    val ints: Flow<Int> = flowOf(1, 2, 3)
        .onEach { delay(1000) }
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)

    val together: Flow<Number> = merge(ints, doubles)
    println(together.collect { println(it) })
}
// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3

 merge는 여러 flow를 하나로 합치는데, 순서에 상관없이 한 flow가 지연되더라도 다른 flow는 기다리지 않고 처리된다.

 

zip

suspend fun main() {
    val flow1 = flowOf("A", "B", "C")
        .onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3 ,4)
        .onEach { delay(1000) }
    flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}"}
        .collect { println(it) }
}
// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3

 zip은 두 flow의 element를 한 쌍으로 만든다. 각 element는 오직 한 쌍의 부분 요소로만 사용할 수 있고, 한 쌍이 되지 않았다면 대기하게 된다. 한 쪽 flow가 완료되었을 때는 다른 flow에 element가 남아있더라도 종료된다.

 

combine

suspend fun main() {
    val flow1 = flowOf("A", "B", "C")
        .onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
        .collect { println(it) }
}
// (1 sec)
// B_2
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4

 combine은 zip과 같이 두 flow를 합쳐서 한 쌍의 element로 만들어 emit 된다는 점은 같지만 두 flow 모두 한 쪽이 느린 flow를 기다려서 한 쌍의 element를 만든다는 점이 다르다. 새로운 element는 이전의 element를 대체하여 새로운 한 쌍을 만든다. 위의 예제 코드를 통해 이를 확인할 수 있다.

 

fold and scan

suspend fun main() {
    val list = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    val res = list.fold(0) { acc, i -> acc + i }
    println(res)
}
// (4 sec)
// 10

 fold는 종단 연산이고, collect() 처럼 flow가 완료될 때까지 일시중단된다.

 

suspend fun main() {
    flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
        .scan(0) { acc, v -> acc + v }
        .collect { println(it) }
}
// 0
// (1 sec)
// 1
// (1 sec)
// 3
// (1 sec)
// 6
// (1 sec)
// 10

 flow에서의 scan은 중간 연산이며, 이전 단계에서의 값을 수신한 후에 새로운 값을 만들어낸다.

 

flatMapConcat, flatMapMerge, flatMapLatest

private fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element}" }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapConcat { flowFrom(it) }
        .collect { println(it) }
}
// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

 flatMapConcat은 두 플로우를 함께 처리하는데 flatMap과 같이 평탄화하여 작업을 수행한다. flatMapConcat은 생성된 flow를 차례로 처리한다.

 

private fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element}" }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge { flowFrom(it) }
        .collect { println(it) }
}
// (1 sec)
// 1_A
// 1_B
// 1_C
// (1 sec)
// 2_A
// 2_B
// 2_C
// (1 sec)
// 3_A
// 3_B
// 3_C

 flatMapMerge는 첫 번째 flow를 동시에 처리한다. flatMapMerge 함수에는 concurrency 파라미터를 통해서 동시에 처리할 element 수를 지정할 수 있다. 기본 값은 16이며, JVM에서 사용하는 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티에 따라서 달라질 수 있다.

 

private fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element}" }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

 flatMapLatest는 flow에서 새로운 element가 emit되면 이전의 element는 덮어씌워진다. 

private fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element}" }

suspend fun main() {
    flowOf("A", "B", "C")
        .onEach { delay(1200) }
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}
// (2.2 sec)
// 1_A
// (1.2 sec)
// 1_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

 

Terminal 연산

 종단 연산에는 collect() 뿐만 아니라 다양한 연산을 지원한다.

  • first(), firstOrNull()
    • 처음으로 emit된 element를 찾는다.
  • fold(), reduce()
    • emit된 값들을 람다 표현식에 정의된 연산을 통해 하나의 결과 값으로 만들어낸다.
  • count()
    • emit된 element의 수를 반환한다.

 종단 연산은 일시중단되며, flow가 완료되면 값을 리턴한다.

 


참고자료

https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837

반응형