코틀린 코루틴 가이드 6 — Channels

김종식
11 min readFeb 6, 2021

--

(공식문서 바로가기)

Channels

지연처리된 값(Deferred Value)은 코루틴 간 편리하게 하나의 값을 공유할 수 있습니다. Channels 은 값의 스트림을 전달할 수 있는 방법을 제공합니다.

Channel basic

ChannelBlockingQueue 와 개념적으로 매우 유사합니다.
한가지 중요한 차이점은, Blocking 에서 사용하는 ‘put()’ 함수 대신 send() 라는 중단 함수가 있으며, ‘take()’ 함수 대신 receive() 라는 중단 함수가 있다는 점입니다.

이 예제의 출력결과는 다음과 같습니다.

1
4
9
16
25
Done!

Closing and iteration over channels

Queue 와는 다르게, channel 은 종료를 통하여 더 이상 요소가 오지 않는다는 것을 나타낼 수 있습니다. 수신자 입장에서는 channel 통해 받는 요소를 코틀린 정식 표현, 즉 for 문법으로 사용이 가능합니다.

close() 는 특별한 종료 토큰을 channel 에 보내는 개념과 유사합니다.
반복(=iteration)은 close 토큰 수신 즉시 멈추게 되며, 따라서 close 가 수신 되기 전까지 발송(send) 되었던 요소들은 수신했다는 것이 보장됩니다.

for 루프 안에서 채널의 close 토큰을 받지 않으면(즉, 채널에서 close 호출이 없다면), channel 에서 발송되는 값을 수신 대기하게 됩니다.
이미 close 된 채널에 다시 send 하게 될 경우 [ClosedSendChannelException] 이 발생되며, close 이 후 수신하려고 하면 [ClosedReceiveChannelException] 이 발생됩니다. 자세한 내용은 close() 함수를 참조하세요.

Building channel producers

코루틴에서 순차적으로 요소를 생성해 내는 패턴은 꽤 흔한 일입니다.
이는 동시성처리 코드에서 흔히 확인되는 producer - consumer 패턴의 한 부분입니다. producer 생성 작업을 추상화 하기 위해서 채널을 파라미터로 전달받는 생성함수로 만들 수 있습니다. 하지만 이는 함수는 반드시 결과를 리턴한다는 상식과 어긋납니다.

producer 관점으로 편리하게 코루틴을 생성하는 produce 코루틴 빌더가 있으며, consumer 관점에서 반복 작업하는 처리를 위한 consumeEach 확장 함수가 있습니다.

Pipelines

Pipeline 은 먼저 값을 가능한한 무한대로 생산해내는 코루틴이 있고 (=produceNumbers 확장함수), 하나 또는 여러 코루틴이 생산되는 스트림을 필요한 만큼 소비하면서 처리하여 (=square 확장함수), 다른 결과를 생산하는 패턴입니다.

이 예제는 메인 코드에서 전체 파이프라인을 시작하고 연결하며,
단순 숫자를 제곱하는 결과를 확인할 수 있습니다.

우리는 코루틴에서 생성된 모든 함수는 CoroutineScope의 확장 함수로 정의 할 수 있으며, 이는 우리 어플리케이션에 글로벌 코루틴이 남지 않도록 구조화된 동시성을 보장할 수 있습니다.

파이프라이닝은 여러 명령어를 중첩하여 실행시키는 기술입니다.

Prime numbers with pipeline

코루틴의 파이프라인을 이용하여 소수(Prime numbers) 를 출력하는
조금 극단적인 예를 들어보겠습니다. 먼저 무한한 숫자를 순서대로 출발합니다. (=numbersFrom 확장함수) 다음 파이프라인은 들어오는 숫자를 필터링 하여 전달된 소수로부터 구분할 수 있는 모든 숫자를 제거합니다. (=filter 확장함수)

이제 우리는 2부터 숫자값을 스트리밍하는 파이프라인을 시작시키고,
현재 채널로부터 소수를 취하고, 새로운 소수가 발견될 때마다 새로운 파이프라인을 개시합니다.

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

이 예제에서는 첫 10개의 소수점을 출력하며, 전체 파이프라인을 메인스레드의 컨텍스트에서 실행합니다. 실행된 모든 코루틴들은 메인 코루틴 (runBlocking) 스코프에서 수행되므로, 우리가 실행시킨
모든 코루틴들을 명시적으로 유지할 필요가 없습니다. 첫 10개의 소수를 출력하고 난 후, cancelChildren 확장 함수를 이용하여 모든 자식 코루틴들을 취소합니다. 이 예제코드는 다음과 같이 출력됩니다:

2 
3
5
7
11
13
17
19
23
29

표준 라이브러리에서 제공하는 iterator 코루틴 빌더를 통해 동일한 파이프라인을 만들 수 있습니다. produce > iterator, send > yield, receive > next,
ReceivedChannel > Iterator, 로 변경하고 코루틴 스코프를 제거하면 됩니다. 이렇게 처리하면 더 이상 runBlocking 필요하지 않습니다. 하지만 채널을 사용하는 파이프라인 구현방식은 멀티코어 환경에서 Dispatchers.Default 컨텍스트를 사용하여 동시에 실행시키는 이점이 있습니다.

어쨌든, 이 예제는 소수를 찾는데 극도로 비실용적인 방법입니다. 실제는, 파이프라인은 몇몇 다른 중단함수를 호출하는 것을 포함하는데 (원격 서비스에 대한 비동기 호출처럼) 이러한 파이프라인은 sequence/iterator 같은 것으로 만들 수 없습니다. 왜냐하면, 이것들은 produce 와는 다르게 완전히 비동기적으로 다른 중단함수 실행을 허용하지 않기 때문입니다.

본문의 iterator 로 변경하는 내용은 아래 코드를 참조하세요.

Fan-out

여러 코루틴이 동일한 채널에서 수신되어 서로간 작업을 분배할 수 있습니다.
주기적으로 정수를 생산하는 생산자 코루틴을 만들어 봅시다. (초당 10개의 숫자) (=produceDelayNumbers 확장함수)

그리고 여러 개의 프로세서 코루틴을 만듭니다. 이 예에서는 각각의 id 와 수신받은 숫자를 출력합니다. (=launchProcessor 확장함수, launch 부분 확인)

이제 다섯개의 프로세서를 실행시키고 거의 1초간 동작되도록 합니다.
그리고 어떤 일이 발생되는지 확인해 봅시다. 아마 출력된 결과는 각 정수를 수신하는 프로세서 ID에 차이가 있을 수 있으나 거의 대부분 아래와 유사할 것입니다.

생산자 코루틴을을 취소하면 채널이 닫히므로, 결국 프로세서 코루틴들 또한 닫히는 점에 유의하세요.

또한 launchProcessor 코드에서 fan-out 을 수행하기 위해 채널에 for 구문을 사용한 점에 주목하세요. comsumeEach 확장함수와 차이점은, for 반복문은 채널을 사용하는 다른 코루틴으로부터 완벽하게 안전합니다. 만약 하나의 코루틴 프로세서가 실패하면 다른 코루틴이 여전히 채널에 대하여 프로세스를
처리하는 반면, consumeEach 확장함수를 이용하는 프로세서 코루틴의 경우 정상 혹은 비정상 종료에 대하여 consumes(취소)를 전파받아 모두 종료될 것입니다.

Processor #0 received 1
Processor #1 received 2
Processor #2 received 3
Processor #3 received 4
Processor #4 received 5
Processor #0 received 6
Processor #1 received 7
Processor #2 received 8
Processor #3 received 9
Processor #4 received 10

생산자 코루틴을을 취소하면 채널이 닫히므로, 결국 프로세서 코루틴들 또한 닫히는 점에 유의하세요. 또한 launchProcessor 코드에서 fan-out 을 수행하기 위해 채널에 for 구문을 사용한 점에 주목하세요. comsumeEach 확장함수와 차이점은, for 반복문은 채널을 사용하는 다른 코루틴으로부터 완벽하게 안전합니다. 만약 하나의 코루틴 프로세서가 실패하면 다른 코루틴이 여전히 채널에 대하여 프로세스를 처리하는 반면, consumeEach 확장함수를 이용하는 프로세서 코루틴의 경우 정상 혹은 비정상 종료에 대하여 consumes(취소)를 전파받아 모두 종료될 것입니다.

Fan-In

여러개의 코루틴들이 동일 대상 채널에 전송할 수도 있습니다. 예를들어, 문자열 형식의 채널이 있고, 이 채널로 지정된 지연시간동안 반복적으로 문자열을 전송하는 중단함수가 있다고 생각해 봅시다. (=sendString 함수)

이제 두 개의 코루틴에서 문자열을 전송할 때 어떤 일이 발생하는지 확인해 봅시다. (이 예제에서는 이 코루틴들을 메인 스레드의 메인 코루틴의 자식으로 실행시킵니다.)

foo
foo
BAR!
foo
foo
BAR!

Buffered channels

지금까지 확인한 채널들은 버퍼가 없었습니다.
버퍼가 없는 채널은 송신자(sender)와 수신자(receiver)가 만날 때 요소들을 전송하게 됩니다. (마치 랑데뷰처럼) 만약 송신이 먼저 발생되면, 수신이 발생되기 전까지 중단(suspended) 되며, 반대로 수신이 먼저 발생되면, 송신이 될 때까지 중단 됩니다.

Channel<T> 팩토리 함수와 produce{} 빌더는 버퍼 사이즈를 명시하기 위하여 옵션으로 ‘capacity’ 를 파라미터로 지정 할 수 있습니다. 버퍼는 송신자가 여러개의 요소를 중단되기 전까지 송신할 수 있도록 허용하며, 이는 버퍼가 가득 찼을 때 차단 되는 ‘BlockingQueue’ 에 명시적으로 capacity 를 지정하는 것과 유사합니다.

예제 코드의 동작을 살펴봅니다.
4개의 버퍼가 있는 채널에 "Sending" 을 다섯번 출력하게 됩니다.

처음 4개의 요소는 버퍼에 추가가 되고 다섯번째 요소를 송신하려고 하면 중단 됩니다.

Channels are fair

채널에 여러개의 코루틴에서 송신 및 수신을 수행한다면 그 실행순서는 호출 순서에 따라 공정하게(fair) 실행합니다. FIFO (First-in First-out) 방식으로 스케쥴링되며, 이것은 먼저 수신를 호출한 코루틴이 요소를 전달받는다고 할 수 있습니다. 이 예제에서는 "ping" 과 "pong" 두 개의 코루틴이 "ball" 객체를
"table" 이라는 하나의 채널에서 수신하는 예제입니다.

"ping" 코루틴이 먼저 시작되었기 때문에, 먼저 ball 을 수신하게 됩니다.
"ping" 코루틴에서 table 채널로 송신 후 즉시 수신하려 하지만, "pong" 코루틴에서 이미 수신을 기다리고 있었기 때문에 "pong" 코루틴이 수신을 하게 됩니다.

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

때때로 채널이 공정하지 않게(unfair) 실행되는 모습을 보이기도 하는데,
이는 채널을 사용하는 Executor의 특성 때문입니다.
자세한 내용은 해당 이슈를 참조하세요.

Ticker channels

Ticker 채널은 마지막으로 수신된 내용이 소비되고 난 후, 주어진 시간마다 반복적으로 지연된 Unit 을 생성하는 특별한 랑데뷰(rendezvous) 채널입니다. 혼자서는 쓸모없어 보이지만, 시간을 기반으로 복잡하게 운용되는 produce 파이프라인이나 다른 시간과 의존적인 연산들과는 유용할 수 있습니다. Ticker 채널은 select 를 이용하여 "on tick" 액션을 수행합니다.

Ticker 채널은 ticker 팩토리 함수로 생성이 가능합니다. 그리고 더 이상 요소가 필요없다면 ReceivedChanel.cancel 함수를 사용하세요.

이제 예제를 보면서 어떻게 동작하는지 확인 해 봅시다. 이 예제코드는 아래와 같이 출력됩니다.

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

ticker 는 기본적으로 정지 발생 시 소비자가 중단되었을 가능성을 인지하고 있으며, 생산되는 요소의 고정 비율(속도)를 유지하려고 시도합니다.

선택적으로 ticker 는 요소들 사이에 지연간격을 고정적으로 적용할 수 있도록
mode 파라미터에 TickerMode.FIXED_DELAY 를 사용할 수 있습니다.

--

--

김종식
김종식

Written by 김종식

앱 개발자 / 꿈은 축구선수 / 쌍둥이 아빠

No responses yet