ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Coroutine] 코루틴 학습 - 15 (Flow Building)
    Java/Kotlin 2022. 5. 21. 19:36
    반응형

     Flow는 빌더를 통해 비교적 간단하게 생성할 수 있다.

     

    Flow raw values

    suspend fun main() {
        flowOf(1, 2, 3, 4, 5)
            .collect { println(it) }
            
         emptyFlow<Int>()
            .collect { println(it) }
    }
    // 1
    // 2
    // 3
    // 4
    // 5

     

     listOf와 같이 유사하게 flowOf()를 통해 간단하게 Flow를 생성할 수 있다. emptyFlow는 emptyList와 비슷하며, collect를 호출해도 아무일도 일어나지 않는다.

     

    Converters

     Iterable 인터페이스의 확장함수로, asFlow()를 호출하면 컬렉션을 손쉽게 Flow로 변환할 수 있다. (Sequence 또한 asFlow()를 지원한다.)

    suspend fun main() {
        listOf(1, 2, 3, 4, 5)
            .asFlow()
            .collect { println(it) }
    
        val function = suspend {
            delay(1000)
            "UserName"
        }
    }

     

    suspend fun main() {
        val function = suspend {
            delay(1000)
            "UserName"
        }
    
        function.asFlow()
            .collect { println(it) }
    
        ::getUserName
            .asFlow()
            .collect { println(it) }
    }
    
    private suspend fun getUserName(): String {
        delay(1000)
        return "UserName"
    }
    
    // (1 sec)
    // UserName
    // (1 sec)
    // UserName

     지연된 단일 값을 반환하는 일시중단 함수 또한 Flow로 변환이 가능하다. 위의 예시는 일시중단 람다 표현식으로 선언된 함수를 asFlow()를 통해 Flow로 변환하는 코드이다. asFlow 확장 함수는 함수 타입에 대해서도 지원한다. (suspend() -> T 또는 () -> T)

     

    Flow와 리액티브 스트림

     Flow는 Reactive Stream(Reactor, RxJava 2.x, RxJava 3.x)에 대해서도 kotlinx-coroutines-reactive 라이브러리를 이용하면 손쉽게 변환이 가능하다.

     kotlinx-coroutines-reactor 라이브러리를 활용하면 Flow를 Flux로 변환할 수도 있다. kotlinx-coroutines-rx3(또는 kotlinx-coroutines-rx2) 라이브러리를 이용하면 Flow를 Flowable 또는 Observable로 변환할 수도 있다.

    suspend fun main() = coroutineScope {
        Flux.range(1, 5).asFlow()
            .collect { println(it) }
    
        println()
        
        flowOf(1, 2, 3, 4, 5).asFlux()
            .doOnNext { println(it) }
            .subscribe()
    }
    
    // 1
    // 2
    // 3
    // 4
    // 5
    
    // 1
    // 2
    // 3
    // 4
    // 5

     

    Flow Builders

     Flow를 생성하는 방법 중 가장 흔한 방법은 flow 빌더를 사용하는 것이다. flow 빌더는 sequence 빌더와 produce 빌더와 비슷하게 람다 표현식 정의를 통해 Flow를 생성하는 함수이다. emit()를 통해서 다음 값을 보내거나, emitAll()를 통해서 모든 값을 Channel 또는 Flow로 보낼 수 있다. emitAll(flow)은 내부적으로 flow.collect { emit(it) } 로 정의되어 있다.

    private fun makeFlow(): Flow<Int> = flow {
        repeat(3) { num ->
            delay(1000)
            emit(num)
        }
    }
    
    suspend fun main() = coroutineScope {
        makeFlow()
            .collect { println(it) }
    }
    // (1 sec)
    // 0
    // (1 sec)
    // 1
    // (1 sec)
    // 2

     

    channelFlow

     element를 처리하는 중에 페이지를 미리 가져와야 하는 경우가 있다. 페이지를 미리 가져오면 네트워크 호출은 더 많이 이루어질 수 있지만 더 빠른 결과를 얻을 수 있다. 이를 위해서는 독립적인 프로듀싱과 컨슈밍이 필요한데, 이러한 독립성은 Channel과 같은 Hot data stream의 형태에서 자주 볼 수 있다. 따라서 위와 같은 기능을 위해서는 Channel과 Flow의 특징을 모두 가진 하이브리드 형태가 channelFlow 함수이다.

     

    public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
        ChannelFlowBuilder(block)
    
    public interface FusibleFlow<T> : Flow<T>
    
    public abstract class ChannelFlow<T>(
        // upstream context
        public val context: CoroutineContext,
        // buffer capacity between upstream and downstream context
        public val capacity: Int,
        // buffer overflow strategy
        public val onBufferOverflow: BufferOverflow
    ) : FusibleFlow<T>

     channelFlow 빌더는 Flow 인터페이스를 구현하고 있다. 그리고 collect와 같은 종단연산에 의해 시작된다. 동시에 Channel과 같이 일단 시작되면, 분리된 코루틴 속에서 리시버를 기다리지 않고 값을 프로듀싱한다.

     

     덕분에 아래의 예시처럼 다음 페이지를 가져오는 것과 사용자를 확인하는 작업이 동시에 수행될 수 있다.

    private data class User(val name: String)
    
    private interface UserApi {
        suspend fun takePage(pageNumber: Int): List<User>
    }
    
    private class FakeUserApi : UserApi {
        private val users = List(20) { User("User $it") }
        private val pageSize: Int = 3
    
        override suspend fun takePage(pageNumber: Int): List<User> {
            delay(1000)
            return users
                .drop(pageSize * pageNumber)
                .take(pageSize)
        }
    }
    
    private fun allUsersFlow(api: UserApi): Flow<User> = channelFlow {
        var page = 0
        do {
            println("Fetching page $page")
            val users = api.takePage(page++)
            users.forEach { send(it) }
        } while (!users.isNullOrEmpty())
    }
    
    suspend fun main() {
        val api = FakeUserApi()
        val users = allUsersFlow(api)
        val user = users
            .first {
                println("Checking $it")
                delay(1000)
                it.name == "User 3"
            }
        println(user)
    }

     channelFlow 내부에는 ProducerScope<T>에서 연산이 수행되는데, produce 빌더에서 사용되는 타입과 같다. ProducerScope는 CoroutineScope를 구현하고 있다. 따라서 channelFlow 빌더를 통해서 새로운 코루틴을 시작할 수 있다.

     

     element를 프로듀싱하기 위해서 emit 대신에 send()를 사용하고, SendChannel를 통해서 channel에 직접 접근하거나 제어할 수 있다.

    public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
        public val channel: SendChannel<E>
    }

     channelFlow는 독립적으로 값을 연산해야할 필요가 있을 때 주로 사용한다. 이를 위해 channelFlow는 코루틴 스코프를 생성한다. 따라서 launch와 같이 코루틴 빌더를 바로 실행할 수 있다.  다른 코루틴들과 마찬가지로 channelFlow도 모든 자식 코루틴이 종료 상태가 될 때까지 기다린다.

     

    callbackFlow

     클라이언트의 이벤트를 리스닝할 때는 처리하는 프로세스와 독립적이어야 하므로 channelFlow가 적합하지만 더 나은 대안으로 callbackFlow가 있다. channelFlow는 ProducerScope<T>위에서 수행되지만, Callback을 래핑함으로써 channelFlow와 차이점을 보인다.

    • awaitClose { /* ... */ }
      • channel이 닫힐 때까지 일시중단 시키는 함수이다. 채널이 닫히고나면, 인자로 넘어온 본문을 호출한다.
    • trySendBlocking(value)
      • send와 유사하지만 일시중단 대신에 블로킹한다. 일시중단 함수가 아닌 함수에 사용할 수 있다.
    • close()
      • 채널을 종료시킨다.
    • cancel(throwable)
      • 채널을 종료하고, flow로 예외를 보낸다.

     


    참고자료

    https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837

    반응형

    댓글

Designed by Tistory.