ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Coroutine] 코루틴 학습 - 16 (Flow lifecycle functions)
    Java/Kotlin 2022. 5. 25. 07:59
    반응형

     Flow는 한 쪽에서 다른 쪽으로 흐르는 파이프와 유사하다. Flow가 예외가 발생하거나 요청이 완료되었을 때, 이 정보는 전파되어 중단 단계에서 Flow를 close시킨다.

     Flow 연산의 결과로 값, 예외, 특정 이벤트들을 수신할 수 있다.

     

    onEach

    suspend fun main() {
        flowOf(1, 2, 3, 4)
            .onEach { println(it) }
            .collect()
    }
    // 1
    // 2
    // 3
    // 4

     flow의 각 값에 대한 연산을 수행할 경우엔 onEach를 사용할 수 있다. onEach의 람다 표현식은 일시중단 되는동안, 각 종단 요소들이 순차적으로 처리된다.

    suspend fun main() {
    
        flowOf(1, 2)
            .onEach { delay(1000) }
            .collect { println(it) }
    }
    // (1 sec)
    // 1
    // (1 sec)
    // 2

     따라서 위와 같이 onEach에서 delay가 추가되면 flow에서 각 요소들은 처리가 지연된다.

     

    onStart

     flow가 시작되자마자 바로 호출 될 수 있도록 리스너를 설정하는 함수이다.  onStart 내에서 emit을 호출할 수도 있다.

    suspend fun main() {
        flowOf(1, 2)
            .onEach { delay(1000) }
            .onStart {
                println("On Start!")
                emit(0)
            }
            .collect { println(it) }
    }
    // On start!
    // 0
    // (1 sec)
    // 1
    // (1 sec)
    // 2

     

    onCompletion

    suspend fun main() {
        flowOf(1, 2)
            .onEach { delay(1000) }
            .onCompletion {
                emit(-1)
                println("Completed")
            }
            .collect { println(it) }
    }
    // (1 sec)
    // 1
    // (1 sec)
    // 2
    // -1
    // Completed

     onCompletion은 완료되거나 예외, 취소가 발생하는 경우에 flow의 완료 처리를 위한 리스너로 사용될 수 있다.

    suspend fun main() = coroutineScope {
        val job = launch {
            flowOf(1, 2)
                .onEach { delay(1000) }
                .onCompletion { println("Completed") }
                .collect { println(it) }
        }
        delay(1100)
        job.cancel()
    }
    // (1 sec)
    // 1
    // (0.1 sec)
    // Completed

     

    onEmpty

     flow는 값을 내보내지 않고 완료될 수도 있다. 이런 케이스는 예상치 못한 상황의 케이스의 표시일 수 있는데 이를 위해 onEmpty를 지원한다. flow가 완료되었을 때 emit된 값이 없으면 호출되고, 람다 표현식 내부에서 디폴트 값을 생성하여 내보낼 수도 있다.

    suspend fun main() = coroutineScope {
        flow<List<Int>> { delay(1000) }
            .onEmpty { emit(emptyList()) }
            .collect { println(it) }
    }
    // (1 sec)
    // []

     

    Catch

     flow 처리 도중에 예외가 발생하는 경우, catch 함수를 설정하여 필요한 처리를 수행할 수 있다.

    private class MyError : Throwable("My error")
    
    val flow = flow {
        emit(1)
        emit(2)
        throw MyError()
    }
    
    suspend fun main(): Unit {
        flow.onCompletion { println("Completed!") }
            .onEach { println("Got $it") }
            .catch { 
                println("Caught $it")
                emit(-1)
            }
            .collect { println("Collected $it") }
    }
    // Got 1
    // Collected 1
    // Got 2
    // Collected 2
    // Caught MyError: My error
    // Collected -1
    // Completed!

     catch 리스너는 예외를 인수로 수신하고 복구 작업을 수행할 수 있도록 한다. catch 리스너 내부에서는 다시 새로운 값을 emit하여 값을 생성할 수도 있다.

     

     만약 에러가 catch되지 않았다면 flow는 즉시 cancel되고, collect()에서 예외를 던지게 된다. 해당 예외는 바깥쪽에서 try-catch 블록으로 잡아내는 것이 가능하다.

    private class MyError : Throwable("My error")
    
    private val flow = flow {
        emit("My message")
        throw MyError()
    }
    
    suspend fun main(): Unit {
        try {
            flow.collect { println("Collected $it")}
        } catch (e: MyError) {
            println("Caught")
        }
    }
    // Collected My message
    // Caught

     종단 연산에서의 예외는 catch 리스너가 동작하지 않으므로 주의해야한다. catch는 마지막 연산에 위치할 수 없기 때문이다.

     

    flowOn

     람다 표현식은 onEach, onStart, onCompletion 등 flow 연산의 인자로 사용되고, flow 빌더는 본질적으로 모두 일시중단된다. 일시중단 함수는 context가 필요로한데, structured concurrency를 위한 부모 컨텍스트와의 관계되어야 한다. 이러한 함수가 컨텍스트를 가져오는 위치는 collect()가 호출될 때이다.

    private fun usersFlow(): Flow<String> = flow {
        repeat(2) {
            val ctx = currentCoroutineContext()
            val name = ctx[CoroutineName]?.name
            emit("User$it in $name")
        }
    }
    
    suspend fun main() {
        val users = usersFlow()
        withContext(CoroutineName("Name1")) {
            users.collect { println(it) }
        }
        withContext(CoroutineName("Name2")) {
            users.collect { println(it) }
        }
    }
    // User0 in Name1
    // User1 in Name1
    // User0 in Name2
    // User1 in Name2

     종단 연산을 호출하면 업스트림으로부터 element를 요청하는데, 이때 coroutine context가 전파된다. 그리고 전파되는 컨텍스트는 flowOn()에서 수정이 가능하다.

     

    suspend fun present(place: String, message: String) {
        val ctx = coroutineContext
        val name = ctx[CoroutineName]?.name
        println("[$name] $message on $place")
    }
    
    fun messagesFlow(): Flow<String> = flow {
        present("flow builder", "Message")
        emit("Message")
    }
    
    suspend fun main() {
        val messages = messagesFlow()
        withContext(CoroutineName("Name1")) {
            messages.flowOn(CoroutineName("Name3"))
                .onEach { present("onEach", it) }
                .flowOn(CoroutineName("Name2"))
                .collect { present("collect", it) }
        }
    }
    // [Name3] Message on flow builder
    // [Name2] Message on onEach
    // [Name1] Message on collect

     flowOn()은 오직 업스트림 flow의 함수에 대해서만 동작한다.

     

    launchIn

     collect는 flow가 완료될 때까지 일시 중단하는 작업이다. 그리고 이를 다른 코루틴에서 flow 연산을 시작하기 위해서는 일반적으로 launch builder로 래핑한다. 이러한 케이스를 위해서 launchIn 함수를 이용할 수 있다.

     

     launchIn은 인자로 넘겨진 스코프에서 시작되는 새로운 코루틴에서 collect()를 처리한다. launchIn은 별도의 코루틴에서 flow를 처리할 때 자주 사용된다.

    public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
        collect()
    }
    suspend fun main(): Unit = coroutineScope {
        flowOf("User1", "User2")
            .onStart { println("Users:") }
            .onEach { println(it) }
            .launchIn(this)
    }
    // Users:
    // User1
    // User2

    참고자료

    https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837

    반응형

    댓글

Designed by Tistory.