ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Coroutine] 코루틴 학습 - 10 (Dispatchers)
    Java/Kotlin 2022. 5. 10. 09:22
    반응형

    Dispatchers

     코틀린 코루틴 라이브러리가 제공하는 중요한 기능은 코루틴이 실행되어야 하는 쓰레드(또는 쓰레드 풀)를 결정할 수 있게 해주는 것이다. 이러한 기능은 디스패처를 사용하여 수행된다. 특정 코루틴에서 실행될 쓰레드를 결정하는 것은 CoroutineContext이다.

     

    Default dispatcher

     디스패처를 설정하지 않는다면 디폴드로 Dispatchers.Default가 선택된다. 디폴트 디스패처는 CPU 집약적인 작업을 하도록 설계되었다. 쓰레드 풀의 크기는 코드가 실행되는 머신의 코어 수(2개 이상)와 같다. 적어도 이론적으로는 쓰레드를 효율적으로 사용하고 있다고 가정할 때의 쓰레드 수다.

    suspend fun main() = coroutineScope {
        repeat(1000) {
            launch {
                List(1000) { Random.nextLong() }.maxOrNull()
    
                val threadName = Thread.currentThread().name
                println("Running on $threadName")
            }
        }
    }
    // Running on DefaultDispatcher-worker-2
    // Running on DefaultDispatcher-worker-15
    // Running on DefaultDispatcher-worker-8
    // Running on DefaultDispatcher-worker-12
    // ...
    • runBlocking의 경우, 다른 디스패처가 설정되지 않는 경우, 자체 디스패처를 설정하므로 내부에서 디폴트 디스패처가 자동으로 설정되지 않는다.
    • 따라서 coroutineScope 대신 runBlocking을 사용하는 경우에는 모든 코루틴이 main 쓰레드에서 실행된다.
    fun main() = runBlocking {
        repeat(1000) {
            launch {
                List(1000) { Random.nextLong() }.maxOrNull()
    
                val threadName = Thread.currentThread().name
                println("Running on $threadName")
            }
        }
    }
    // Running on main
    // Running on main
    // Running on main
    // ...

     

    Main dispatcher

     Dispatchers.Main은 메인 쓰레드에 코루틴이 실행되도록 하는 디스패처다. 단위 테스트의 경우 kotlinx-coroutines-test의 Dispatchers.setMain(dispatcher)을 사용하여 이 디스패처 대신 다른 디스패처를 설정할 수도 있다.

    class Test {
        private val dispatcher = Executors
            .newSingleThreadExecutor()
            .asCoroutineDispatcher()
    
        @ExperimentalCoroutinesApi
        @BeforeEach
        fun setup() {
            Dispatchers.setMain(dispatcher)
        }
    
        @ExperimentalCoroutinesApi
        @AfterEach
        fun tearDown() {
            Dispatchers.resetMain() // 메인 디스패처를 원래의 메인 디스패처로 리셋한다.
            dispatcher.close()
        }
    
        @Test
        fun testSomething(): Unit = runBlocking {
            launch(Dispatchers.Main) {
                val name = Thread.currentThread().name
                println(name)
            }
        }
    }
    // pool-3-thread-1 @coroutine#3
    • Dispatchers.Main을 사용할 때는 블로킹이 아닌 일시중단 라이브러리를 사용하고, 복잡한 계산을 하지 않는 작업에서 사용하는 것이 좋다.
    • CPU 집약적인 연산을 하는 작업을 한다면 Dispatchers.Default를 사용해야 한다.
    • 만약 I/O 연산과 같은 블로킹 연산이 필요로 할 때 Dispatchers.Main을 사용한다면 메인 쓰레드가 블로킹되어 쓰레드 풀의 모든 쓰레드가 블로킹될 위험이 있다. 이럴 때는 Dispatchers.IO를 사용한다.

     

    IO Dispatcher

    • IO Dispatchersms I/O 연산과 같이 쓰레드를 오랫동안 블로킹할 때 사용될 수 있도록 설계되었다.
    • 이 쓰레드 풀의 추가적인 쓰레드는 필요에 따라 생성되고, 제거된다.
    • 디스패처의 작업에서 사용되는 쓰레드의 수는 "kotlinx.coroutines.io.parallelism" 시스템 프로퍼티의 값에 따라 제한된다.
    • 디폴트로는 64개의 쓰레드로 제한되어 있으며, 코어의 수가 그것보다 크면 코어의 수만큼 제한된다.
    suspend fun main() = coroutineScope {
        repeat(1000) {
            launch(Dispatchers.IO) {
                Thread.sleep(200)
    
                val threadName = Thread.currentThread().name
                println("Running on thread: $threadName")
            }
        }
    }
    // Running on thread: DefaultDispatcher-worker-77
    // Running on thread: DefaultDispatcher-worker-76
    // Running on thread: DefaultDispatcher-worker-73
    // Running on thread: DefaultDispatcher-worker-3
    // ...
    • IO Dispatcher는 Dispatchers.Deault와 쓰레드를 공유하기 때문에 withContext(Dispatchers.IO)를 사용해도 실제로 쓰레드 스위칭이 일어나진 않는다.
    suspend fun main(): Unit = coroutineScope {
        launch(Dispatchers.Default) {
            println("Running on thread: ${Thread.currentThread().name}")
            withContext(Dispatchers.IO) {
                Thread.sleep(200)
                println("Running on thread: ${Thread.currentThread().name}")
            }
        }
    }
    // Running on thread: DefaultDispatcher-worker-1
    // Running on thread: DefaultDispatcher-worker-1
    • 쓰레드 공유의 결과로, IO Dispatcher를 통해 작업하는 동안 64개의 쓰레드보다 더 많은 쓰레드가 생성될 수 있다.
    • Dispatcher.IO는 제한된 허용하는 쓰레드보다 많이 사용하지는 않지만 Dispatcher.Default는 사용 가능한 쓰레드가 충분하도록 보장된다.

     

    Dispatcher with a pool threads

     쓰레드를 블로킹하는 상황은 종종 있는데, 예를 들어 블로킹 연산을 사용하는 라이브러리를 구현하는 경우이다. 하지만 모든 라이브러리가 IO 디스패처를 사용하는경우 64개의 모든 쓰레드가 모두 차단될 수 있다. 이련 경우에 블로킹이 발생하는 연산에만 집중적으로 사용할 수 있도록 독립적인 쓰레드 풀에 대한 디스패처를 선언하여 사용하는 방법이 있다.

    suspend fun main() = coroutineScope {
        val dispatcher = Executors
            .newFixedThreadPool(5)
            .asCoroutineDispatcher()
        repeat(1000) {
            launch(dispatcher) {
                Thread.sleep(200)
                println("Running on thread: ${Thread.currentThread().name}")
            }
        }
        dispatcher.close()	// asCoroutineDispatcher로 생성된 디스패처는 close하지 않으면 프로그램은 종료되지 않는다.
    }
    // Running on thread: pool-1-thread-4
    // Running on thread: pool-1-thread-2
    // Running on thread: pool-1-thread-5
    // Running on thread: pool-1-thread-3
    // Running on thread: pool-1-thread-1
    // ...

     마지막에 디스패처를 close() 하는 것을 볼 수 있는데, 이를 깜빡하는 경우에는 쓰레드가 누수될 수 있다. 또 다른 문제는 고정된 쓰레드 풀을 생성했을 때, 이를 효과적으로 사용하지 않는다는 점이다. 사용하지 않은 쓰레드는 다른 서비스와 공유하지 않고 활성 상태로 유지한다. 이 방법은 대량의 쓰레드가 블로킹되는 상황에서는 유용하지만 제한된 병렬성을 가진 디스패처가 더 유용하다.

     

    Limited parallelism

     코틀린 코루틴 1.6 버전 이후로 디스패처를 병렬 처리가 제한된 디스패처로 변환하는 옵션이 있다. 이 옵션은 우리가 동일 시간에 사용할 수 있는 쓰레드의 수를 제한한다. 이는 특정한 작업이 Dispatcher.IO를 사용하여 너무 많은 쓰레드를 블로킹하지 않도록 제한하고, 우리는 제한했던 쓰레드 수 이상으로 쓰레드가 동시에 사용되지 않는다는 것을 확신할 수 있다.

     

    싱글 쓰레드로 제한하는 디스패처

     여러 쓰레드를 사용하는 디스패처를 사용하면 상태를 공유하여 발생하는 문제에 대해서도 고려를 해야한다. 아래의 예시를 보면 10,000개의 코루틴이 공유 변수 i를 증가시키는데, 쓰레드 간 경합으로 인해 최종적으로 i는 10,000이 아니게 된다.

    var i = 0
    
    suspend fun main(): Unit = coroutineScope {
        repeat(10_000) {
            launch(Dispatchers.IO) { i++ }
        }
        delay(1000)
        println(i)
    }
    // 9996

     이러한 문제를 해결하는 간단한 방법 중 하나는 쓰레드 간 경합이 일어나지 않도록 싱글 쓰레드를 사용하는 것이다.

    var i = 0
    
    suspend fun main(): Unit = coroutineScope {
        val dispatcher = Executors.newSingleThreadExecutor()
            .asCoroutineDispatcher()
        repeat(10_000) {
            launch(dispatcher) { i++ }
        }
        delay(1000)
        println(i)
        dispatcher.close()
    }
    // 10000

     하지만 위와 같은 방식은 아까 살펴보았던 예제처럼 디스패처를 개발자가 일일이 close() 해야한다는 문제가 있다. 조금더 모던한 방법은 병렬 처리 수가 1개로 제한된 Dispatchers.Default나 Dispatchers.IO를 사용하는 것이다.

    var i = 0
    
    @ExperimentalCoroutinesApi
    suspend fun main(): Unit = coroutineScope {
        val dispatcher = Dispatchers.IO
            .limitedParallelism(1)
    
        repeat(10_000) {
            launch(dispatcher) { i++ }
        }
        delay(1000)
        println(i)
    }
    // 10000

     싱글 쓰레드를 이용할 때, 해당 쓰레드를 블로킹 한다면 작업이 순차적으로 처리되어 처리 시간이 길어질 수 있다는 단점도 유의해야 한다.

     

    Unconfined dispatcher

     Unconfined dispatcher는 이전에 살펴보았던 디스패처와 다르게 작업을 시작하면 일시중단 지점을 만날 때까지는 호출된 쓰레드에서 작업이 수행되다가, 일시중단 지점 이후에 코루틴이 재개될 때는 일시중단 함수를 재개한 쓰레드에서 작업이 수행된다.

     성능의 관점에서 Unconfined 디스패처는 쓰레드 스위칭을 요구하지 않음으로써 더 저렴한 비용이 든다. 만약 코드가 어떤 쓰레드에서 수행되어도 상관이 없다면 Unconfined 디스패처를 사용하는 것이 좋다.

     


    참고자료

    https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837

    반응형

    댓글

Designed by Tistory.