-
[Coroutine] 코루틴 학습 - 12 (Channel)Java/Kotlin 2022. 5. 14. 13:05반응형
Channel
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
- Channel은 여러 Sender와 Receiver를 지원하며, Sender가 전송하는 모든 값은 Receiver를 통해 수신한다.
- Channel은 인터페이스이고, SendChannel과 ReceiveChannel을 상속하고 있다.
public interface SendChannel<in E> { public suspend fun send(element: E) public fun close(cause: Throwable? = null): Boolean // ... }
- SendChannel은 element를 전송하거나 채널을 close하는데 사용된다.
public interface ReceiveChannel<out E> { public suspend fun receive(): E public fun cancel(cause: CancellationException? = null) // ... }
- ReceiveChannel은 SendChannel이 전송하는 element를 수신하는데 사용된다.
SendChannel 인터페이스와 ReceiveChannel 인터페이스를 살펴보면 send()와 receive()는 일시중단 함수임을 알 수 있다. 두 메서드가 일시중단 함수로 동작함으로써 아래와 같은 중요한 특징을 갖게 된다.
- channel에서 element를 수신하려고 receive를 시도했지만 수신할 element가 없을 경우에는 수신이 가능할 때까지 코루틴은 일시중단된다.
- send는 channel이 수용 가능한 공간이 모두 찼을 때, 일시중단된다.
- 만약 일시중단 없이 send나 receive를 해야한다면 trySend()와 tryReceive()를 사용하면 된다. 두 메서드는 호출에 성공했으면 ChannelResult를 반환한다.
- Channel에는 여러 Sender와 Receiver를 가질 수 있고, 대부분의 경우에는 Sender, Receiver 양쪽에 하나씩 코루틴이 존재한다.
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(3) { index -> delay(1000) println("Send index") channel.send(index) } } launch { repeat(3) { val received = channel.receive() println(received) } } } // (1 sec) // Send index // 0 // (1 sec) // Send index // 1 // (1 sec) // Send index // 2
위 예시는 Channel을 이용하여 간단하게 구현했지만 문제점이 있다. 리시버는 얼마나 많은 element를 전송받게 될지 알아야 한다. 리시버는 얼마나 많은 element가 전송될 지 알 수 없으므로 송신자가 전송을 계속하는 한 리스닝을 하고 있어야 한다.
Channel이 닫힐 때까지 element를 수신하려면 for-loop나 consumeEach() 함수를 이용하면 된다.
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(5) { idx -> println("Send idx") delay(1000) channel.send(idx) } channel.close() } launch { for (element in channel) { println(element) } /* or channel.consumeEach { element -> println(element) } */ } }
위와 같이 element를 전송하는 방법은 채널을 닫아주는 것을 깜빡하는 실수를 하기 쉽다. 만약 코루틴이 예외 발생으로 인해 프로듀싱을 멈추게 되면 반대편에서 전송받는 코루틴은 element가 수신될 때까지 영원히 기다리게 된다. 이를 편리하게 관리할 수 있도록 코루틴 빌더가 지원되는데, ReceiveChannel을 반환하는 produce() 함수를 이용하면 된다.
public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E>
suspend fun main(): Unit = coroutineScope { val channel = produce { repeat(5) { idx -> println("Send idx") delay(1000) send(idx) } } for (i in channel) { println(i) } }
produce 함수는 코루틴이 어떤 이유로 종료되었을 때(완료, 중단, 취소 등..) 채널을 close한다.
Channel types
설정하는 수용량에 따라 채널은 4가지 타입으로 나뉜다.
- Unlimited
- Channel.UNLIMITED로 채널을 설정하는 경우로, 버퍼 수용량에 제한이 없다. 따라서 send()는 일시중단되지 않는다. (send()는 버퍼가 가득 차있을 경우에 버퍼에 공간이 생길 때까지 일시중단 되기 떄문이다.)
- Buffered
- 구체적인 수용량을 명시하거나 Channel.BUFFERED를 설정한 채널이다.
- Channel.BUFFERED의 기본 값은 64이고, JVM에 kotlinx.coroutines.channels.defaultBuffer 시스템 프로퍼티로 오버라이드 할 수도 있다.
- Rendezvous (default)
- 수용량을 0 또는 Channel.RENDEZVOUS(0과 같다)로 설정한 채널이다.
- Rendezvous 타입의 의미는 Sender와 Receiver가 만나는 경우에만 element 교환이 일어난다는 의미이다. 즉, 한 쪽이 다른 한 쪽을 기다리는 동안 적어도 한 순간은 일시중단 된다.
- Conflated
- Channel.Conflated로 설정된 채널로, 버퍼 사이즈가 1이다. 즉, 새로운 element는 이전의 element를 대체하게 된다.
Buffer overflow
채널은 버퍼가 가득 찼을 경우를 제어하기 위해 채널 생성자에 onBufferOverflow 옵션을 지정할 수 있다. 옵션의 종류로는 아래와 같이 있다.
- SUSPEND (기본값)
- 버퍼가 가득 찼을 경우에 send()가 일시중단된다.
- DROP_OLDEST
- 버퍼가 가득 찼을 경우에 가장 오래된 element를 드롭시킨다.
- DROP_LATEST
- 버퍼가 가득 찼을 경우에 가장 최근의 element를 드롭시킨다.
채널의 타입에서 살펴보았던 Conflate 타입은 수용 크기를 1로 설정하고, onBufferOverflow를 DROP_OLDEST로 설정하는 것과 같다.
현재 produce 코루틴 빌더는 onBufferOverflow를 지원하지 않고 있으므로, 설정을 커스텀하여 지정해야 한다면, Channel() 함수를 사용해야 한다.
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>( capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST ) launch { repeat(5) { idx -> channel.send(idx) delay(100) println("Sent") } } delay(1000) for (i in channel) { println(i) delay(1000) } }
Channel() 함수에는 onUndeliveredElement 파라미터를 설정할 수도 있는데, element가 어떠한 이유로 처리될 수 없을 때 호출된다.
주로 채널이 닫혔거나, 취소된 상태 또는 send, receive, receiveOrNull, hasNext가 예외를 던질 때 사용된다.
일반적으로 채널을 통해 전송된 리소스를 close 해야할 때 많이 사용한다.
Fan-out
다수의 코루틴은 단일 채널로부터 수신받을 수 있는데, 코루틴들이 적절하게 수신받으려면 for-loop를 사용해야 한다.
private fun CoroutineScope.produceNumbers() = produce { repeat(10) { delay(100) send(it) } } private fun CoroutineScope.launchProcessor( id: Int, channel: ReceiveChannel<Int>, ) = launch { for (msg in channel) { println("#$id received $msg") } } suspend fun main(): Unit = coroutineScope { val channel = produceNumbers() repeat(3) { id -> delay(10) launchProcessor(id, channel) } } // #0 received 0 // #1 received 1 // #2 received 2 // #0 received 3 // #1 received 4 // ...
출력 결과를 살펴보면 element 들이 공평하게 분배되는 것을 알 수 있다. 채널에는 element를 기다리는 코루틴의 FIFO 큐를 가지고 있다. 이러한 이유로 위 예제에서 element가 순차적으로 수신되는 것이다.
Fan-in
다수의 코루틴은 단일 채널로 element를 전송할 수도 있다.
suspend fun sendString( channel: SendChannel<String>, text: String, time: Long, ) { while (true) { delay(time) channel.send(text) } } fun main() = runBlocking { val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(50) { println(channel.receive()) } coroutineContext.cancelChildren() }
때로는 여러 채널을 하나로 병합해야 하는 경우도 있는데, 이는 produce()를 이용해서 아래 예시처럼 병합할 수도 있다.
fun <T> CoroutineScope.fanIn( channels: List<ReceiveChannel<T>> ): ReceiveChannel<T> = produce { for (channel in channels) { launch { for (element in channel) { send(element) } } } }
Pipelines
파이프라인은 한 채널이 다른 채널로부터 받은 element를 기반으로 element를 생성하는 방식이다.
fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce { repeat(3) { num -> send(num + 1) } } fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce { for (number in numbers) { send(number * number) } } suspend fun main() = coroutineScope { val numbers = numbers() val squared = square(numbers) for (num in squared) { println(num) } } // 1 // 4 // 9
참고자료
https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837
반응형'Java > Kotlin' 카테고리의 다른 글
[Coroutine] 코루틴 학습 - 14 (Hot and Cold data sources) (0) 2022.05.18 [Coroutine] 코루틴 학습 - 13 (Actors) (0) 2022.05.17 [Coroutine] 코루틴 학습 - 11 (runTest) (0) 2022.05.14 [Coroutine] 코루틴 학습 - 10 (Dispatchers) (0) 2022.05.10 [Coroutine] 코루틴 학습 - 9 (Coroutine scope function) (0) 2022.05.07