[Coroutine] 코루틴 학습 - 12 (Channel)
Channel
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
- Channel은 여러 Sender와 Receiver를 지원하며, Sender가 전송하는 모든 값은 Receiver를 통해 수신한다.
- Channel은 인터페이스이고, SendChannel과 ReceiveChannel을 상속하고 있다.
public interface SendChannel<in E> {
public suspend fun send(element: E)
public fun close(cause: Throwable? = null): Boolean
// ...
}
- SendChannel은 element를 전송하거나 채널을 close하는데 사용된다.
public interface ReceiveChannel<out E> {
public suspend fun receive(): E
public fun cancel(cause: CancellationException? = null)
// ...
}
- ReceiveChannel은 SendChannel이 전송하는 element를 수신하는데 사용된다.
SendChannel 인터페이스와 ReceiveChannel 인터페이스를 살펴보면 send()와 receive()는 일시중단 함수임을 알 수 있다. 두 메서드가 일시중단 함수로 동작함으로써 아래와 같은 중요한 특징을 갖게 된다.
- channel에서 element를 수신하려고 receive를 시도했지만 수신할 element가 없을 경우에는 수신이 가능할 때까지 코루틴은 일시중단된다.
- send는 channel이 수용 가능한 공간이 모두 찼을 때, 일시중단된다.
- 만약 일시중단 없이 send나 receive를 해야한다면 trySend()와 tryReceive()를 사용하면 된다. 두 메서드는 호출에 성공했으면 ChannelResult를 반환한다.
- Channel에는 여러 Sender와 Receiver를 가질 수 있고, 대부분의 경우에는 Sender, Receiver 양쪽에 하나씩 코루틴이 존재한다.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(3) { index ->
delay(1000)
println("Send index")
channel.send(index)
}
}
launch {
repeat(3) {
val received = channel.receive()
println(received)
}
}
}
// (1 sec)
// Send index
// 0
// (1 sec)
// Send index
// 1
// (1 sec)
// Send index
// 2
위 예시는 Channel을 이용하여 간단하게 구현했지만 문제점이 있다. 리시버는 얼마나 많은 element를 전송받게 될지 알아야 한다. 리시버는 얼마나 많은 element가 전송될 지 알 수 없으므로 송신자가 전송을 계속하는 한 리스닝을 하고 있어야 한다.
Channel이 닫힐 때까지 element를 수신하려면 for-loop나 consumeEach() 함수를 이용하면 된다.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) { idx ->
println("Send idx")
delay(1000)
channel.send(idx)
}
channel.close()
}
launch {
for (element in channel) {
println(element)
}
/*
or
channel.consumeEach { element ->
println(element)
}
*/
}
}
위와 같이 element를 전송하는 방법은 채널을 닫아주는 것을 깜빡하는 실수를 하기 쉽다. 만약 코루틴이 예외 발생으로 인해 프로듀싱을 멈추게 되면 반대편에서 전송받는 코루틴은 element가 수신될 때까지 영원히 기다리게 된다. 이를 편리하게 관리할 수 있도록 코루틴 빌더가 지원되는데, ReceiveChannel을 반환하는 produce() 함수를 이용하면 된다.
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
suspend fun main(): Unit = coroutineScope {
val channel = produce {
repeat(5) { idx ->
println("Send idx")
delay(1000)
send(idx)
}
}
for (i in channel) {
println(i)
}
}
produce 함수는 코루틴이 어떤 이유로 종료되었을 때(완료, 중단, 취소 등..) 채널을 close한다.
Channel types
설정하는 수용량에 따라 채널은 4가지 타입으로 나뉜다.
- Unlimited
- Channel.UNLIMITED로 채널을 설정하는 경우로, 버퍼 수용량에 제한이 없다. 따라서 send()는 일시중단되지 않는다. (send()는 버퍼가 가득 차있을 경우에 버퍼에 공간이 생길 때까지 일시중단 되기 떄문이다.)
- Buffered
- 구체적인 수용량을 명시하거나 Channel.BUFFERED를 설정한 채널이다.
- Channel.BUFFERED의 기본 값은 64이고, JVM에 kotlinx.coroutines.channels.defaultBuffer 시스템 프로퍼티로 오버라이드 할 수도 있다.
- Rendezvous (default)
- 수용량을 0 또는 Channel.RENDEZVOUS(0과 같다)로 설정한 채널이다.
- Rendezvous 타입의 의미는 Sender와 Receiver가 만나는 경우에만 element 교환이 일어난다는 의미이다. 즉, 한 쪽이 다른 한 쪽을 기다리는 동안 적어도 한 순간은 일시중단 된다.
- Conflated
- Channel.Conflated로 설정된 채널로, 버퍼 사이즈가 1이다. 즉, 새로운 element는 이전의 element를 대체하게 된다.
Buffer overflow
채널은 버퍼가 가득 찼을 경우를 제어하기 위해 채널 생성자에 onBufferOverflow 옵션을 지정할 수 있다. 옵션의 종류로는 아래와 같이 있다.
- SUSPEND (기본값)
- 버퍼가 가득 찼을 경우에 send()가 일시중단된다.
- DROP_OLDEST
- 버퍼가 가득 찼을 경우에 가장 오래된 element를 드롭시킨다.
- DROP_LATEST
- 버퍼가 가득 찼을 경우에 가장 최근의 element를 드롭시킨다.
채널의 타입에서 살펴보았던 Conflate 타입은 수용 크기를 1로 설정하고, onBufferOverflow를 DROP_OLDEST로 설정하는 것과 같다.
현재 produce 코루틴 빌더는 onBufferOverflow를 지원하지 않고 있으므로, 설정을 커스텀하여 지정해야 한다면, Channel() 함수를 사용해야 한다.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
repeat(5) { idx ->
channel.send(idx)
delay(100)
println("Sent")
}
}
delay(1000)
for (i in channel) {
println(i)
delay(1000)
}
}
Channel() 함수에는 onUndeliveredElement 파라미터를 설정할 수도 있는데, element가 어떠한 이유로 처리될 수 없을 때 호출된다.
주로 채널이 닫혔거나, 취소된 상태 또는 send, receive, receiveOrNull, hasNext가 예외를 던질 때 사용된다.
일반적으로 채널을 통해 전송된 리소스를 close 해야할 때 많이 사용한다.
Fan-out
다수의 코루틴은 단일 채널로부터 수신받을 수 있는데, 코루틴들이 적절하게 수신받으려면 for-loop를 사용해야 한다.
private fun CoroutineScope.produceNumbers() = produce {
repeat(10) {
delay(100)
send(it)
}
}
private fun CoroutineScope.launchProcessor(
id: Int,
channel: ReceiveChannel<Int>,
) = launch {
for (msg in channel) {
println("#$id received $msg")
}
}
suspend fun main(): Unit = coroutineScope {
val channel = produceNumbers()
repeat(3) { id ->
delay(10)
launchProcessor(id, channel)
}
}
// #0 received 0
// #1 received 1
// #2 received 2
// #0 received 3
// #1 received 4
// ...
출력 결과를 살펴보면 element 들이 공평하게 분배되는 것을 알 수 있다. 채널에는 element를 기다리는 코루틴의 FIFO 큐를 가지고 있다. 이러한 이유로 위 예제에서 element가 순차적으로 수신되는 것이다.
Fan-in
다수의 코루틴은 단일 채널로 element를 전송할 수도 있다.
suspend fun sendString(
channel: SendChannel<String>,
text: String,
time: Long,
) {
while (true) {
delay(time)
channel.send(text)
}
}
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(50) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
때로는 여러 채널을 하나로 병합해야 하는 경우도 있는데, 이는 produce()를 이용해서 아래 예시처럼 병합할 수도 있다.
fun <T> CoroutineScope.fanIn(
channels: List<ReceiveChannel<T>>
): ReceiveChannel<T> = produce {
for (channel in channels) {
launch {
for (element in channel) {
send(element)
}
}
}
}
Pipelines
파이프라인은 한 채널이 다른 채널로부터 받은 element를 기반으로 element를 생성하는 방식이다.
fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce {
repeat(3) { num ->
send(num + 1)
}
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce {
for (number in numbers) {
send(number * number)
}
}
suspend fun main() = coroutineScope {
val numbers = numbers()
val squared = square(numbers)
for (num in squared) {
println(num)
}
}
// 1
// 4
// 9
참고자료
https://www.amazon.com/Kotlin-Coroutines-Deep-Marcin-Moskala/dp/8396395837