-
[Coroutine] 코루틴 학습 - 17 (Flow processing)Java/Kotlin 2022. 5. 28. 17:27반응형
map, filter
suspend fun main() { flowOf(1, 2, 3, 4) .map { it * it } .filter { it % 2 == 0 } .collect { println(it) } } // 4 // 16
컬렉션에서 흔하게 사용하는 map과 filter 기능과 같다.
take and drop
suspend fun main() { ('A'..'Z').asFlow() .take(5) .collect { println(it) } println() ('A'..'Z').asFlow() .drop(20) .collect { println(it) } println() } // A // B // C // D // E // // U // V // W // X // Y // Z
take는 element의 앞부분부터 정해진 수만큼 사용하고, drop은 정해진 수만큼 앞의 element를 제외하고 그 뒤의 나머지 element를 사용한다.
merge
suspend fun main() { val ints: Flow<Int> = flowOf(1, 2, 3) .onEach { delay(1000) } val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3) val together: Flow<Number> = merge(ints, doubles) println(together.collect { println(it) }) } // 0.1 // 0.2 // 0.3 // (1 sec) // 1 // (1 sec) // 2 // (1 sec) // 3
merge는 여러 flow를 하나로 합치는데, 순서에 상관없이 한 flow가 지연되더라도 다른 flow는 기다리지 않고 처리된다.
zip
suspend fun main() { val flow1 = flowOf("A", "B", "C") .onEach { delay(400) } val flow2 = flowOf(1, 2, 3 ,4) .onEach { delay(1000) } flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}"} .collect { println(it) } } // (1 sec) // A_1 // (1 sec) // B_2 // (1 sec) // C_3
zip은 두 flow의 element를 한 쌍으로 만든다. 각 element는 오직 한 쌍의 부분 요소로만 사용할 수 있고, 한 쌍이 되지 않았다면 대기하게 된다. 한 쪽 flow가 완료되었을 때는 다른 flow에 element가 남아있더라도 종료된다.
combine
suspend fun main() { val flow1 = flowOf("A", "B", "C") .onEach { delay(400) } val flow2 = flowOf(1, 2, 3, 4) .onEach { delay(1000) } flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" } .collect { println(it) } } // (1 sec) // B_2 // (0.2 sec) // C_1 // (0.8 sec) // C_2 // (1 sec) // C_3 // (1 sec) // C_4
combine은 zip과 같이 두 flow를 합쳐서 한 쌍의 element로 만들어 emit 된다는 점은 같지만 두 flow 모두 한 쪽이 느린 flow를 기다려서 한 쌍의 element를 만든다는 점이 다르다. 새로운 element는 이전의 element를 대체하여 새로운 한 쌍을 만든다. 위의 예제 코드를 통해 이를 확인할 수 있다.
fold and scan
suspend fun main() { val list = flowOf(1, 2, 3, 4) .onEach { delay(1000) } val res = list.fold(0) { acc, i -> acc + i } println(res) } // (4 sec) // 10
fold는 종단 연산이고, collect() 처럼 flow가 완료될 때까지 일시중단된다.
suspend fun main() { flowOf(1, 2, 3, 4) .onEach { delay(1000) } .scan(0) { acc, v -> acc + v } .collect { println(it) } } // 0 // (1 sec) // 1 // (1 sec) // 3 // (1 sec) // 6 // (1 sec) // 10
flow에서의 scan은 중간 연산이며, 이전 단계에서의 값을 수신한 후에 새로운 값을 만들어낸다.
flatMapConcat, flatMapMerge, flatMapLatest
private fun flowFrom(element: String) = flowOf(1, 2, 3) .onEach { delay(1000) } .map { "${it}_${element}" } suspend fun main() { flowOf("A", "B", "C") .flatMapConcat { flowFrom(it) } .collect { println(it) } } // (1 sec) // 1_A // (1 sec) // 2_A // (1 sec) // 3_A // (1 sec) // 1_B // (1 sec) // 2_B // (1 sec) // 3_B // (1 sec) // 1_C // (1 sec) // 2_C // (1 sec) // 3_C
flatMapConcat은 두 플로우를 함께 처리하는데 flatMap과 같이 평탄화하여 작업을 수행한다. flatMapConcat은 생성된 flow를 차례로 처리한다.
private fun flowFrom(element: String) = flowOf(1, 2, 3) .onEach { delay(1000) } .map { "${it}_${element}" } suspend fun main() { flowOf("A", "B", "C") .flatMapMerge { flowFrom(it) } .collect { println(it) } } // (1 sec) // 1_A // 1_B // 1_C // (1 sec) // 2_A // 2_B // 2_C // (1 sec) // 3_A // 3_B // 3_C
flatMapMerge는 첫 번째 flow를 동시에 처리한다. flatMapMerge 함수에는 concurrency 파라미터를 통해서 동시에 처리할 element 수를 지정할 수 있다. 기본 값은 16이며, JVM에서 사용하는 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티에 따라서 달라질 수 있다.
private fun flowFrom(element: String) = flowOf(1, 2, 3) .onEach { delay(1000) } .map { "${it}_${element}" } suspend fun main() { flowOf("A", "B", "C") .flatMapLatest { flowFrom(it) } .collect { println(it) } } // (1 sec) // 1_C // (1 sec) // 2_C // (1 sec) // 3_C
flatMapLatest는 flow에서 새로운 element가 emit되면 이전의 element는 덮어씌워진다.
private fun flowFrom(element: String) = flowOf(1, 2, 3) .onEach { delay(1000) } .map { "${it}_${element}" } suspend fun main() { flowOf("A", "B", "C") .onEach { delay(1200) } .flatMapLatest { flowFrom(it) } .collect { println(it) } } // (2.2 sec) // 1_A // (1.2 sec) // 1_B // (1 sec) // 1_C // (1 sec) // 2_C // (1 sec) // 3_C
Terminal 연산
종단 연산에는 collect() 뿐만 아니라 다양한 연산을 지원한다.
- first(), firstOrNull()
- 처음으로 emit된 element를 찾는다.
- fold(), reduce()
- emit된 값들을 람다 표현식에 정의된 연산을 통해 하나의 결과 값으로 만들어낸다.
- count()
- emit된 element의 수를 반환한다.
종단 연산은 일시중단되며, flow가 완료되면 값을 리턴한다.
참고자료
https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837
반응형'Java > Kotlin' 카테고리의 다른 글
[Coroutine] 코루틴 학습 - 16 (Flow lifecycle functions) (0) 2022.05.25 [Coroutine] 코루틴 학습 - 15 (Flow Building) (0) 2022.05.21 [Coroutine] 코루틴 학습 - 14 (Hot and Cold data sources) (0) 2022.05.18 [Coroutine] 코루틴 학습 - 13 (Actors) (0) 2022.05.17 [Coroutine] 코루틴 학습 - 12 (Channel) (0) 2022.05.14 - first(), firstOrNull()