ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Coroutine] 코루틴 학습 - 12 (Channel)
    Java/Kotlin 2022. 5. 14. 13:05
    반응형

    Channel

    interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
    • Channel은 여러 Sender와 Receiver를 지원하며, Sender가 전송하는 모든 값은 Receiver를 통해 수신한다.
    • Channel은 인터페이스이고, SendChannel과 ReceiveChannel을 상속하고 있다.
    public interface SendChannel<in E> {
        public suspend fun send(element: E)
        public fun close(cause: Throwable? = null): Boolean
        // ...
    }
    • SendChannel은 element를 전송하거나 채널을 close하는데 사용된다.
    public interface ReceiveChannel<out E> {
        public suspend fun receive(): E
        public fun cancel(cause: CancellationException? = null)
        // ...
    }
    • ReceiveChannel은 SendChannel이 전송하는 element를 수신하는데 사용된다.

     

     SendChannel 인터페이스와 ReceiveChannel 인터페이스를 살펴보면 send()와 receive()는 일시중단 함수임을 알 수 있다. 두 메서드가 일시중단 함수로 동작함으로써 아래와 같은 중요한 특징을 갖게 된다.

    • channel에서 element를 수신하려고 receive를 시도했지만 수신할 element가 없을 경우에는 수신이 가능할 때까지 코루틴은 일시중단된다.
    • send는 channel이 수용 가능한 공간이 모두 찼을 때, 일시중단된다.
    • 만약 일시중단 없이 send나 receive를 해야한다면 trySend()와 tryReceive()를 사용하면 된다. 두 메서드는 호출에 성공했으면 ChannelResult를 반환한다.
    • Channel에는 여러 Sender와 Receiver를 가질 수 있고, 대부분의 경우에는 Sender, Receiver 양쪽에 하나씩 코루틴이 존재한다.
    suspend fun main(): Unit = coroutineScope {
        val channel = Channel<Int>()
        launch {
            repeat(3) { index ->
                delay(1000)
                println("Send index")
                channel.send(index)
            }
        }
    
        launch {
            repeat(3) {
                val received = channel.receive()
                println(received)
            }
        }
    }
    // (1 sec)
    // Send index
    // 0
    // (1 sec)
    // Send index
    // 1
    // (1 sec)
    // Send index
    // 2

     위 예시는 Channel을 이용하여 간단하게 구현했지만 문제점이 있다. 리시버는 얼마나 많은 element를 전송받게 될지 알아야 한다. 리시버는 얼마나 많은 element가 전송될 지 알 수 없으므로 송신자가 전송을 계속하는 한 리스닝을 하고 있어야 한다.

     

     Channel이 닫힐 때까지 element를 수신하려면 for-loop나 consumeEach() 함수를 이용하면 된다.

    suspend fun main(): Unit = coroutineScope {
        val channel = Channel<Int>()
        launch {
            repeat(5) { idx ->
                println("Send idx")
                delay(1000)
                channel.send(idx)
            }
            channel.close()
        }
    
        launch {
            for (element in channel) {
                println(element)
            }
            /*
            or
            channel.consumeEach { element ->
                println(element)
            }
            */
        }
    }

     위와 같이 element를 전송하는 방법은 채널을 닫아주는 것을 깜빡하는 실수를 하기 쉽다. 만약 코루틴이 예외 발생으로 인해 프로듀싱을 멈추게 되면 반대편에서 전송받는 코루틴은 element가 수신될 때까지 영원히 기다리게 된다. 이를 편리하게 관리할 수 있도록 코루틴 빌더가 지원되는데, ReceiveChannel을 반환하는 produce() 함수를 이용하면 된다.

    public fun <E> CoroutineScope.produce(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = 0,
        block: suspend ProducerScope<E>.() -> Unit
    ): ReceiveChannel<E>
    suspend fun main(): Unit = coroutineScope {
        val channel = produce {
            repeat(5) { idx ->
                println("Send idx")
                delay(1000)
                send(idx)
            }
        }
    
        for (i in channel) {
            println(i)
        }
    }

     produce 함수는 코루틴이 어떤 이유로 종료되었을 때(완료, 중단, 취소 등..) 채널을 close한다.

     

    Channel types

     설정하는 수용량에 따라 채널은 4가지 타입으로 나뉜다.

    • Unlimited
      • Channel.UNLIMITED로 채널을 설정하는 경우로, 버퍼 수용량에 제한이 없다. 따라서 send()는 일시중단되지 않는다. (send()는 버퍼가 가득 차있을 경우에 버퍼에 공간이 생길 때까지 일시중단 되기 떄문이다.)
    • Buffered
      • 구체적인 수용량을 명시하거나 Channel.BUFFERED를 설정한 채널이다.
      • Channel.BUFFERED의 기본 값은 64이고, JVM에 kotlinx.coroutines.channels.defaultBuffer 시스템 프로퍼티로 오버라이드 할 수도 있다.
    • Rendezvous (default)
      • 수용량을 0 또는 Channel.RENDEZVOUS(0과 같다)로 설정한 채널이다.
      • Rendezvous 타입의 의미는 Sender와 Receiver가 만나는 경우에만 element 교환이 일어난다는 의미이다. 즉, 한 쪽이 다른 한 쪽을 기다리는 동안 적어도 한 순간은 일시중단 된다.
    • Conflated
      • Channel.Conflated로 설정된 채널로, 버퍼 사이즈가 1이다. 즉, 새로운 element는 이전의 element를 대체하게 된다.

     

    Buffer overflow

     채널은 버퍼가 가득 찼을 경우를 제어하기 위해 채널 생성자에 onBufferOverflow 옵션을 지정할 수 있다. 옵션의 종류로는 아래와 같이 있다.

    • SUSPEND (기본값)
      • 버퍼가 가득 찼을 경우에 send()가 일시중단된다.
    • DROP_OLDEST
      • 버퍼가 가득 찼을 경우에 가장 오래된 element를 드롭시킨다.
    • DROP_LATEST
      • 버퍼가 가득 찼을 경우에 가장 최근의 element를 드롭시킨다.

     채널의 타입에서 살펴보았던 Conflate 타입은 수용 크기를 1로 설정하고, onBufferOverflow를 DROP_OLDEST로 설정하는 것과 같다.

     

     현재 produce 코루틴 빌더는 onBufferOverflow를 지원하지 않고 있으므로, 설정을 커스텀하여 지정해야 한다면, Channel() 함수를 사용해야 한다.

    suspend fun main(): Unit = coroutineScope {
        val channel = Channel<Int>(
            capacity = 2,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        )
    
        launch {
            repeat(5) { idx ->
                channel.send(idx)
                delay(100)
                println("Sent")
            }
        }
    
        delay(1000)
        for (i in channel) {
            println(i)
            delay(1000)
        }
    }

     

     Channel() 함수에는 onUndeliveredElement 파라미터를 설정할 수도 있는데, element가 어떠한 이유로 처리될 수 없을 때 호출된다.

     주로 채널이 닫혔거나, 취소된 상태 또는 send, receive, receiveOrNull, hasNext가 예외를 던질 때 사용된다.

     일반적으로 채널을 통해 전송된 리소스를 close  해야할 때 많이 사용한다.

     

    Fan-out

     다수의 코루틴은 단일 채널로부터 수신받을 수 있는데, 코루틴들이 적절하게 수신받으려면 for-loop를 사용해야 한다.

    private fun CoroutineScope.produceNumbers() = produce {
        repeat(10) {
            delay(100)
            send(it)
        }
    }
    
    private fun CoroutineScope.launchProcessor(
        id: Int,
        channel: ReceiveChannel<Int>,
    ) = launch {
        for (msg in channel) {
            println("#$id received $msg")
        }
    }
    
    suspend fun main(): Unit = coroutineScope {
        val channel = produceNumbers()
        repeat(3) { id ->
            delay(10)
            launchProcessor(id, channel)
        }
    }
    // #0 received 0
    // #1 received 1
    // #2 received 2
    // #0 received 3
    // #1 received 4
    // ...

     출력 결과를 살펴보면 element 들이 공평하게 분배되는 것을 알 수 있다. 채널에는 element를 기다리는 코루틴의 FIFO 큐를 가지고 있다. 이러한 이유로 위 예제에서 element가 순차적으로 수신되는 것이다.

     

    Fan-in

     다수의 코루틴은 단일 채널로 element를 전송할 수도 있다.

    suspend fun sendString(
        channel: SendChannel<String>,
        text: String,
        time: Long,
    ) {
        while (true) {
            delay(time)
            channel.send(text)
        }
    }
    
    fun main() = runBlocking {
        val channel = Channel<String>()
        launch { sendString(channel, "foo", 200L) }
        launch { sendString(channel, "BAR!", 500L) }
        repeat(50) {
            println(channel.receive())
        }
        coroutineContext.cancelChildren()
    }

     

     때로는 여러 채널을 하나로 병합해야 하는 경우도 있는데, 이는 produce()를 이용해서 아래 예시처럼 병합할 수도 있다.

    fun <T> CoroutineScope.fanIn(
        channels: List<ReceiveChannel<T>>
    ): ReceiveChannel<T> = produce {
        for (channel in channels) {
            launch {
                for (element in channel) {
                    send(element)
                }
            }
        }
    }

     

    Pipelines

     파이프라인은 한 채널이 다른 채널로부터 받은 element를 기반으로 element를 생성하는 방식이다.

    fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce {
        repeat(3) { num ->
            send(num + 1)
        }
    }
    
    fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce {
        for (number in numbers) {
            send(number * number)
        }
    }
    
    suspend fun main() = coroutineScope {
        val numbers = numbers()
        val squared = square(numbers)
        for (num in squared) {
            println(num)
        }
    }
    // 1
    // 4
    // 9

     


    참고자료

    https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837

    반응형

    댓글

Designed by Tistory.