참조링크 — Flowmarble
Asynchronous Flow
하나의 중단 함수는 비동기적으로 하나의 값만 리턴하는데요, 어떻게 하면 비동기적으로 여러개의 연산수행된 값을 리턴되도록 할 수 있을까요? 코틀린 플로우에 대하여 확인할 차례입니다.
Representing multiple values
여러개의 값은 코틀린의 collections 이용하여 표현할 수 있습니다. 예를들어 simple 함수를 통해 세 개의 숫자 목록(List) 를 반환하는 함수를 만들고, forEach 를 이용하여 전체를 출력할 수 있습니다.
이 코드는 아래와 같이 출력됩니다.
1
2
3
Sequences
각 수에 대하여 CPU 가 많이 소모 후 계산되는 경우, Sequence 를 사용하여 숫자를 나타낼 수 있습니다. (예제에서는 각 연산에서 100ms 의 시간이 소요된다고 가정합니다.)
이 예제의 실행 결과는 동일한 숫자가 출력되지만, 각각 100ms 만큼 대기 후 출력됩니다.
Suspending functions
하지만 이전 예제코드의 코드블록은 메인스레드를 실행하고있는 코드블록을 정지 시킵니다. 결과값을 list로 반환하는데 코드블록 수행하는데 정지가 없도록 suspend 식별자를 suspendingSimple 함수 앞에 추가함으로써 처리할 수 있습니다.
이 예제는 1초 후 숫자가 출력됩니다.
Flows
결과로 전달되는 타입을 List<Int> 을 사용한다는 것은, 모든 값을 한번에 반환한다는 의미입니다. 비동기적으로 연산이 수행된 결과값을 스트림으로 나타내려면, 이전 예제코드 에서 동기적으로 처리되었던 Sequence<Int> 타입과 유사하게 Flow<Int> 타입을 사용할 수 있습니다.
다음 예제코드는 메인스레드를 중단하지 않고 각 숫자를 출력하기 위해 100ms 씩 대기합니다. 메인 스레드에서 실행중인 별도의 코루틴에서
"I'm not blocked" 를 매 100ms 마다 출력함으로써 해당 상황을 확인합니다. (=메인 스레드를 중단하지 않고 각 숫자 출력을 확인을 위함)
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
앞선 예제 코드들과 Flow 에서 다음과 같은 차이점에 주목하세요.
- Flow 생성을 위하여 flow 빌더 함수를 사용합니다.
- flow {...} 빌더의 내부 코드블록은 중단이 가능합니다.
- flowSimple() 은 더 이상 suspend 키워드로 마킹되지 않습니다.
- flow 에서는 emit 함수를 사용하여 결과값을 방출됩니다.
- flow 에서는 collect 함수를 사용하여 결과값을 수집됩니다.
flowSimple() 코드 블록에서 delay() 함수를 Thread.sleep() 으로 변경하면
메인스레드가 정지되는 것을 확인할 수 있습니다.
Flows are cold
Flow는 시퀀스처럼 cold-stream 입니다. - flow{} 빌더 내부의 코드블록은
flow가 수집될 때 까지 실행되지 않습니다. 이것은 다음 예제를 확인해보면 명확해 집니다.
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
주요 이유는 (flow 를 리턴하는)예제 코드에서는 suspend 식별자가 마킹되어있지 않기 때문입니다. ‘flowColdSimple()’ 함수 호출 즉시 결과를 반환하며, 아무것도 대기하지 않습니다. flow는 collect 함수 호출 때마다 매번 시작되며, 그것이 예제에서 "Flow started" 출력을 collect 함수 호출 시마다 확인하게 되는 이유입니다.
Data streaming 특징을 hot / cold 라는 키워드 이용하는데,
이해를 돕고자 아래 내용을 추가 합니다.
From: Anton Moiseev Book "Angular Development with Typescript, Second Edition." : Hot and cold observables
There are two types of observables: hot and cold.
The main difference is that a cold observable creates a data producer for each subscriber, whereas a hot observable creates a data producer first, and each subscriber gets the data from one producer, starting from the moment of subscription.
Let's compare watching a movie on Netflix to going into a movie theater. Think of yourself as an observer. Anyone who decides to watch Mission: Impossible on Netflix will get the entire movie, regardless of when they hit the play button. Netflix creates a new producer to stream a movie just for you. This is a cold observable.
If you go to a movie theater and the showtime is 4 p.m., the producer is created at 4 p.m., and the streaming begins. If some people (subscribers) are late to the show, they miss the beginning of the movie and can only watch it starting from the moment of arrival. This is a hot observable.
A cold observable starts producing data when some code invokes a subscribe() function on it. For example, your app may declare an observable providing a URL on the server to get certain products. The request will be made only when you subscribe to it.
If another script makes the same request to the server, it'll get the same set of data. A hot observable produces data even if no subscribers are interested in the data. For example, an accelerometer in your smartphone produces data about the position of your device, even if no app subscribes to this data. A server can produce the latest stock prices even if no user is interested in this stock.
Flow cancellation basics
Flow 는 코루틴의 취소에 대하여 일반적으로 협력적입니다.
flow 수집 중 취소 가능한 중단 함수(delay 와 같은)에서 취소 되었을 때 취소 될 수 있습니다. 아래 예제에서는 withTimeoutOrNull 코드블록을 수행하는 중 flow 가 취소되어 동작을 멈추는지 확인할 수 있습니다.
예제코드에서 simpleFlowCancellation 함수의 flow 에서 오직 2개의 숫자만 방출되는 것에 주목해야 합니다.
Emitting 1
1
Emitting 2
2
Done
Flow builders
앞 얘제들에서 확인한 flow {...} 빌더는 가장 기본적인 flow 빌더 중 하나입니다. 여기 flow를 좀 더 쉽게 선언하기 위한 다른 빌더들이 있습니다 :
- 고정된 값 집합을 flow emitting 하는 flowOf 빌더 (=고정된 값 집합)
- 다양한 컬렉션과 시퀀스를 flow로 변환해주는 .asFlow() 확장함수
따라서, flow 에서 숫자 1부터 3까지 출력하는 것은 다음과 같이 작성이 가능합니다.
Intermediate flow operators
flow 는 컬렉션이나 시퀀스에서 처럼 연산자를 통해 변환될 수 있습니다.
중간 연산자들은 업스트림 플로우에 적용되어 다운스트림을 반환합니다. 이러한 오퍼레이션들은 플로우가 그렇듯 콜드 타입으로 동작되며, 각 연산자 호출 자체는 중단함수가 아닙니다. 새롭게 변환된 flow 의 결과를 되돌려 주는것은 빠르게 동작됩니다.
기본적인 연산자로 map 이나 filter 와 같은 친숙한 이름을 가지고 있습니다.
시퀀스를 처리하는데 있어 중요한 것은, 연산 수행하는 내부 코드블록은 중단함수를 호출할 수 있다는 것입니다.
예를들어 요청 flow 는 map 연산자를 통해 결과값으로 매핑될 수 있으며,
오랜 시간 처리되는 각 요청들을 중단함수로 구현함으로써 동작됩니다.
이 예제는 3개의 라인을 생성하며, 각 라인은 1초 후에 표시됩니다.
response 1
response 2
response 3
Transform operator
flow 변환 연산자 중 가장 일반적인 것 중 하나가 transform 입니다.
map 이나 filter 같은 단순한 변환을 모방하여 사용하는 것 뿐만 아니라,
좀 더 복잡한 변환에도 사용이 가능합니다. transform 연산자를 사용하여,
임의의 값을 임의의 횟수로 emit 할 수 있습니다.
예를들어, transform 을 사용하면 오랜시간 수행해야하는 비동기 요청을 수행하기 전에 문자열을 우선 방출한 뒤, 요청에 대한 응답이 도착하면 그 결과를 방출할 수 있습니다.
이 예제는 다음과 같이 출력됩니다.
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Size-limiting operators
flow 에서 take 와 같은 크기를 제한하는 중간 연산자는 해당 한계에 도달했을 때 flow 실행을 취소합니다. 코루틴에서 취소는 항상 예외를 발생시키므로 취소될 때 자원 관리형식의 함수들 처리를 통해 (try {…} finally {…} 코드블록) 정상적으로 동작할 수 있게 합니다.
예제 코드에서는 flow 코드블록이 명확히 두 번째 숫자가 방출되고 나서 실행이 멈추는 것을 확인할 수 있습니다.
1
2
Finally in numbers
take 를 설정할 경우, flow 에서 취하려는 회수만큼 방출 후 즉시
kotlinx.coroutines.flow.internal.AbortFlowException 이 발생됩니다.
Terminal flow operators
flow 에서 종료 연산자는 flow 수집을 시작하는 중단함수입니다.
collect 는 가장 기본적인 연산자이지만, 다음과 같은 좀더 편리한 종료 연산자들이 있습니다.
- toList 나 toSet 같은 다양한 컬렉션으로 변환
- 첫 번째 값을 받기 위한 first 연산자나, 하나의 값을 보장해주는 single 연산자
- reduce 나 fold 같이 flow 의 값을 감소시키기 위한 연산자
예를들어, 예제 코드는 하나의 숫자만을 출력합니다 - ‘55'
Flows are sequential
flow 에서 수집되는 각각의 요소는 복수의 flow 를 제어하는 특별한 연산자가 없다면 순차적으로 수행됩니다. flow의 수집동작은 종료 연산자(terminal operator) 가 호출될 때 해당 코루틴에서 직접 실행됩니다. 기본적으로 새로운 코루틴을 생성하지 않습니다. 각 출력된 값은 중간 연산자에 의해 처리하면서 상류(=upstream)에서 하류로(=downstream) 전달되며, 그 후 종료 연산자에게 전달됩니다.
짝수를 찾아서 문자열로 변환하는 예를 봅시다.
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow context
flow 에서 수집동작은 항상 호출된 코루틴의 컨텍스트에서 발생됩니다.
아래와 같은 코드를 예를들면, simpleContext()에서 리턴하는 flow 구현 세부사항과 관계없이 코드 작성자가 지정한 컨텍스트에서 코드가 수행됩니다.
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}
이러한 flow 특징을 컨텍스트 보존법칙(context preservation) 이라고 부릅니다.
따라서, 기본적으로 flow{...} 빌더의 내부코드는 플로우 수집을 요청한 코루틴 컨텍스트에서 실행됩니다. 예를들어 호출한 스레드를 출력하면서, 3개의 숫자를 출력(emit)하는 함수를 생각해 봅시다. 예제코드를 실행시키면 다음과 같습니다.
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
‘simpleContext()’ 을 메인스레드에서부터 호출되므로, simpleContext() flow 의 구현부 또한 메인스레드에서 호출됩니다. 이것이 빠르게 동작되는 것을 보장하고, 호출자를 블록하지 않고 실행되는 컨텍스트를 고려하지 않는 비동기 처리를 위한 기본 방법입니다.
Wrong emission withContext
그러나, 장시간 실행으로 CPU 소모를 하는 코드의 경우 Dispatchers.DEFAULT
컨텍스트에서 수행되어야 할 수 있으며, 화면 갱신을 하는 코드의 경우 Dispatchers.Main 컨텍스트에서 수행되어야 할 수 있습니다. 일반적으로 withContext 는 코틀린 코루틴의 컨텍스트를 변경하는데 사용되지만,
flow {...} 빌더의 코드일 경우 컨텍스트 보존법칙(context preservation)을 따르기 때문에, 다른 컨텍스트에서 방출하는 것을 허용하지 않습니다.
예제 코드를 실행하면 다음과 같은 예외가 발생됩니다.
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@6ddb07a, BlockingEventLoop@3664a205],
but emission happened in [DispatchedCoroutine{Active}@6d3e070b, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn operator
예외 메시지(=앞 내용에서 에러 메시지 참조)에서 flow 실행을 위한
컨텍스트를 변경하기 위하여 flowOn 을 알려주고 있습니다. 컨텍스트를 올바르게 변경하는 예제는 이번에 확인이 가능하며, 또한 실행되는 스레드명을 출력하여 어떻게 모든 것이 정상적으로 동작되는지 확인할 수 있습니다.
flow {...} 은 백그라운드 스레드에서 동작하며, 수집 이 후 처리는 메인스레드에서 동작하는 것을 명심하세요.
또 한가지 눈여겨볼 부분은 flowOn 연산자가 flow 의 순차적 특성을 변경했다는 것입니다. 수집 동작은 하나의 스레드에서 실행되는 코루틴에서 발생되고(‘coroutin#1') flow 값 반환은 다른 스레드에서 실행되는 코루틴(‘coroutine#2’)에서 발생됩니다. flowOn 연산자는 CoroutineDispatcher 컨텍스트를 변경해야 할 때 값을 상류(=upstream)로 전달하기위한 또다른 코루틴을 생성합니다.
Buffering
flow 에서 장시간 수행되는 동작에 대하여 별도 코루틴을 실행시키는 것은
특히 flow 에서 전달하는 데이터를 수집하는 동작 전체적인 시간 관점에서 도움이 될 수 있습니다. 예를들어, simpleBuffering() 에서 방출이 느리고, 요소들을 생산하는데 100ms 가 소요되며, 각 요소를 수집하는 부분도 300ms 걸리는 것을 생각해 봅시다. 이러한 flow 가 3개의 숫자를 collect 하는데 얼마나 시간이 소요되는지 확인해 봅시다.
1
2
3
Collected in 1226 ms
약 1200ms (3개의 숫자이며 각각 400ms가 소요) 가 전체 수집에 시간이 소요되는 것을 확인할 수 있습니다.
flow 의 buffer() 연산자를 활용하여 방출 코드블록과 수집 코드블록을 동시에 실행할 수 있습니다.
예제코드는 첫 번째 숫자를 전달하는데 100ms 를 대기하고, 이 후부터는 각 숫자를 처리하는데 300ms 씩 만큼 소요되도록 처리하는 파이프라인을 효과적으로 만들었기 때문에 같은 숫자를 더 빨리 만듭니다.
이러한 방법으로 실행했을 경우 약 1000ms 정도 소요됩니다.
1
2
3
Collected in 1052msflowOn 연산자는 CoroutineDispatcher 가 변경될 때 동일한 buffering 처리 메커니즘을 사용하는 것에 유의하세요. 예제에서는 context 변경 없이 명시적으로 buffer() 연산자를 사용하여 버퍼링을 수행했습니다.
conflation
flow 에서 연산의 일부분이나 연산 상태의 업데이트 될 때 각각의 값을 처리하는 것은 불필요하며, 가장 최근의 결과값만 처리가 필요할 수 있습니다. 이러한 경우, conflate 연산자를 사용하여 수집 후 처리가 너무 느릴 경우, 수집된 결과의 중간 값들을 스킵할 수 있습니다.
이전 예제를 기반으로 다음 예제를 확인 해 봅시다.
첫 번째 번호가 아직 처리 중인데 flow 에서 이미 세 번째 숫자가 생성되어 두번째 값은 스킵되고, 가장 최근의 값(세 번째 값)이 전달되어 수집됩니다.
1
3
Collected in 753ms
processing the latest value
Conflation 은 방출자 와 수집가 모두 느릴 경우 속도 향상을 시키는 방법 중 하나입니다. 그것은 방출된 결과값을 삭제하는 방법으로 동작됩니다. 다른 개선 방법은 느린 수집가를 취소시키고 다시 새로운 값이 방출되었을 때 재시작 하는 방법입니다. ‘xxxLatest’ 연산자 패밀리는 ‘xxx’ 연산자의 필수 로직을 수행하지만, 새로운 값에 대하여 코드블록을 취소하는 경우도 있습니다. 이전 예제에서 conflate 연산자를 collectLatest 로 변경해봅시다.
collectLatest 의 코드블록은 300ms가가 소모되지만, 새로운 값은 매 100ms 마다 방출되며, 우리는 모든 코드블록이 실행되는 것을 확인할 수 있습니다. 하지만, 마지막으로 방출된 값에 대해서만 완료되는 것을 확인할 수 있습니다.
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 658 ms
Composing multiple flow
다중의 flow를 결합하는 방법은 다양합니다.
Zip
코틀린 표준 라이브러리의 Sequence.zip 확장함수 처럼 flow 에도 두 flow 의 해당 값을 결합하는 zip 연산자가 있습니다. 예제코드는 다음과 같이 출력됩니다.
1 -> one
2 -> two
3 -> three
만약, flow가 방출되는 원소의 수가 다르다면 적은 수를 기준으로 zip 결과가 수집됩니다. 예를들어 nums 가 ‘1..5’ 로 원소의 수가 증가 하더라도, 동일한 출력 결과를 확인할 수 있습니다.
두 개의 flow 에서 각 값을 방출하는 타이밍이 다를 경우, 나중에 방출되는 값과 함께 zip 결과를 확인할 수 있습니다. 예제에서, nums 를 ‘simpleFlowForZip()’ 으로 변경 후 실행 결과를 확인 해 보세요.
Combine
flow 가 어떤 연산이나 상태의 최근 값을 나타낼 때(Conflation 관련 내용을 참조하세요.), 그 flow의 최근 값에 추가 연산을 수행하거나 또는 별도의 업스트림 플로우가 값을 방출할 때마다 이를 다시 계산해야 할 수 있습니다. 이와 관련된 연산자를 combine 이라고 부릅니다.
이전의 예제에서 숫자는 300ms 마다 업데이트 되고, 문자열은 400ms 마다 업데이트 될 때, zip 연산자를 활용할 경우 앞선 예제의 출력결과와 동일함을 확인할 수 있습니다. (다만 매 출력은 약 400ms 정도 소요됩니다.)
이 예제에서 사용한 onEach 중간 연산자는 각 원소의 방출 주기를 지연시키면서 코드를 보다 선언적이고 짧게 만들어 줍니다.
여기에 zip 대신 combine 연산자로 변경할 경우, 아래처럼 출력됩니다.
1 -> one at 437 ms from start
2 -> one at 638 ms from start
2 -> two at 840 ms from start
3 -> two at 939 ms from start
3 -> three at 1244 ms from start
출력된 결과가 꽤 차이가 있는 것을 확인할 수 있는데, nums 와 strs 플로우에서 각각 방출 될 경우 출력됩니다.
Flattening flows
flow 는 순차적인 결과를 비동기적으로 나타내기 때문에, 각각의 값을 다른 값들의 시퀀스로 요청해야 하는 상황은 자주 발생됩니다. 예를들어, 500ms 간격으로 두개의 문자열을 리턴하는 flow 를 정의할 수 있습니다.
만약 3 개의 숫자를 방출하는 flow가 있고, requestFlow() 의 flow 각각을 호출한다면 아래와 같을 것입니다:
(1..3).asFlow().map { requestFlow(it) }
이 경우 우리는 flow 들의 flow 를 얻게 되며, (=Flow<Flow<String>>)
더 나은 처리를 위한 single flow 로의 평탄화 작업(flattened)이 필요하게 됩니다. 컬렉션이나 시퀀스는 이러한 처리를 위하여 flatten, flatMap 연산자가 있습니다. 하지만 flow 에서는 비동기 처리 방식에 따라 플래트닝 처리 모드에 차이가 있어, flow 에도 유사한 플래트닝 연산자들이 있습니다.
flattening 도입에서 소개한 문제점이 동작되도록 코드를 처리하려면 아래와 같이 해야 할 것입니다. Flow 가 중첩으로 전달되는 형태여서 코드 분석 및 유지보수가 어려운 코드가 됩니다.
flatMapConcat
연결 모드는 flatMapConcat, flattenConcat 연산자로 구현되어 있습니다. 이것들은 시퀀스 연산자들과 가장 유사하게 동작됩니다. 다음 예제에서 확인할 수 있듯이, 내부 flow 가 종료될 때 까지 다음 flow 가 시작을 대기하고 있습니다.
1: First at 130 ms from start
1: Second at 631 ms from start
2: First at 732 ms from start
2: Second at 1233 ms from start
3: First at 1337 ms from start
3: Second at 1837 ms from start
flatMapConcat 연산자의 순차적인 특징을 출력 결과로 확인이 가능합니다.
flatMapMerge
다른 플래트닝 방법은 가능한 빠르게 각 flow 에서 방출된 flow 들을 비동기로 수집하여 처리하는 것입니다. 이것은 flatMapMerge 와 flattenMerge 연산자를 구현하고 있습니다. 두 가지 연산자 모두 한번에 수집하려는 flow 의 동작들의 수를 제한할 수 있는 파라미터를 선택적으로 지정할 수 있습니다. (기본적으로, DEFAULT_CONCURRENCY 와 동일합니다.)
1: First at 154 ms from start
2: First at 247 ms from start
3: First at 353 ms from start
1: Second at 655 ms from start
2: Second at 752 ms from start
3: Second at 859 ms from start
flatMapMerge 의 동시성 특징은 출력 결과에서 명확히 확인됩니다.
flatMapMerge 는 자신의 코드블록 (예제에서는 { requestFlow(it) })은 순차적으로 수집하지만, 각 flow의 결과는 비동기적으로 수집됩니다. 이것은 순차으로 map { requestFlow(it) } 을 호출하고, 결과를 flattenMerge 을 수행하는 것과 동일합니다.
flatMapLatest
‘최신 값 처리(collectLatest)’ 섹션에서 확인한 collectLatest 와 유사하게 새로운 flow 가 방출 되었을 때 이전의 flow 를 취소하고 "최신의" 플로우로 플래트닝 방법도 있습니다. 그것은 flatMapLatest 연산자가 구현하고 있습니다.
이 예제의 출력 결과는 flatMapLatest 의 동작 방식을 잘 보여줍니다.
1: First at 151 ms from start
2: First at 261 ms from start
3: First at 362 ms from start
3: Second at 862 ms from startflatMapLatest 는 새로운 값을 받았을 때 코드블록 (예제에서는 { requestFlow(it) }) 을 취소시킵니다. requestFlow 호출 자체는 빠르고, 중단되지 않으며 취소도 되지 않아 이 예제에서는 특별한 차이점이 없습니다. 하지만, 이 부분에서 delay 같은 중단함수를 사용한다면 flatMapLatest 의 특징을 좀 더 명확하게 확인할 수 있습니다.
Flow exceptions
flow 컬렉션은 방출자 혹은 연산자 코드블록에서 예외 발생시 예외와 함께 종료될 수 있습니다. 여기 이러한 예외들을 처리하는 몇 가지 방법을 확인할 수 있습니다.
Collector try and catch
수집가는 코틀린의 try / catch 블록을 이용하여 예외 처리가 가능합니다.
이 코드는 성공적으로 수집연산자(collect) 종료에서 예외를 받아 처리하고,
출력된 결과에서 보는 것 처럼 이 후에는 더이상 값이 방출되지 않습니다.
Everything is caught
앞의 예제는 방출자, 중간 혹은 종료 연산자에서 발생되는 어떠한 예외라도
잡아내게 됩니다. 예를들어, 방출된 수를 문자열로 변환되지만, 변환 코드에서 예외를 발생하도록 코드를 변경해 봅시다.
이 예제는 여전히 예외가 발생되고 수집활동이 중지됩니다.
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
Exception transparency
그렇다면, 방출되는 코드가 어떻게 예외 처리 동작을 캡슐화 할 수 있을까요?
flow 는 예외 처리에 대하여 반드시 투명해야 하고(transparency),
flow 빌더 코드 블록 내에서 ‘try/catch’ 블록으로 예외 처리한 후 값을 방출하는 것은 투명성을 위반하는 것입니다. 수잡기 측에서 예외를 던지는 것은 항상 예외를 수집할 수 있고, ‘try/catch’ 에서 그러한 예제를 통해 보장되는 것을 확인했습니다.
방출가 측에서는 catch 연산자를 사용하여 예외 투명성을 보장할 수 있으며,
예외처리에 대하여 캡슐화가 가능합니다. catch 연산자의 코드블록은 어떤 예외를 포착 했는지에 따라 다르게 대응이 가능합니다.
- ‘throw’ 를 이용하여 다시 예외를 전달 수 있습니다.
- catch 로직에서 emit 을 사용하여 값으로 방출 할 수 있습니다.
- 예외를 무시하거나, 기록(logging), 기타 처리가 가능합니다.
이 예제의 경우 ‘try/catch’ 가 코드에 있지 않지만, 동일한 결과를 확인할 수 있습니다.
Transparent catch
예외처리의 투명성을 갖고 있는 catch 중간 연산자는 업스트림시 발생되는 예외만 캐치합니다. 예제 코드는 ‘collect {...}’ 안의 코드블록(‘catch’ 아래에 위치한)은 예외를 발생시키고 난 후 탈출됩니다:
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at ...
이 예제에서는 catch 연산자가 있음에도 불구하고, “Caught …” 메시지는 출력되지 않습니다.
Catching declaratively
모든 예외 상황에 대한 확인이 필요할 경우, collect 내부에서 처리하지 말고
catch 연산자 앞 onEach 에서 코드 블록을 두어 선언적으로 코드 작성이 가능합니다. 이 flow 에서는 수집 시 특별한 파라미터 없이 collect() 를 호출함으로써 트리거가 되어야 합니다.
Emitting 1
onEach 1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
이제 ‘try/catch’ 코드 블록을 명시적으로 사용하지 않아도 예외를 잡아서
"Caught ..." 가 출력되는 것을 확인할 수 있습니다:
Flow Completion
flow 수집 활동이 종료될 때 (일반적으로든, 예외적으로든) 어떤 동작 처리가 필요할 수 있습니다. 이미 알아차리셨겠지만, 두 가지 방법으로 처리 가능합니다: 명령적(imperative) or 선언적(declarative) 방법
Imperative finally block
‘try/catch’ 구문 외에도, collector 는 또한 수집 동작이 완료된 후 ‘finally’ 구문을 실행합니다. 예제 코드는 simpleFinally 로부터 생성된 flow 가 3개의 숫자를 생성한 이 후 "Done" 문자열를 출력합니다.
1
2
3
Done
Declarative handling
종료처리에 대하여 선언적으로 접근해 본다면, flow 가 수집이 완전히 종료되었을 때 호출되는 onCompletion 중간 연산자가 있습니다. 이전의 예제를 onCompletion 을 이용할 경우 동일하게 출력되게 변경이 가능합니다.
onCompletion 의 주요 장점은 정상적으로 종료 되었거나, 예외적으로 종료된 것에 대하여 판단을 할 수 있는 lambda 로부터 전달되는 Throwable 파라미터 입니다. 이 예제에서는 flow 에서 숫자 1을 방출하고 이 후에 예외상황을 발생시킵니다.
예상 되는 것 처럼 아래와 같이 출력됩니다.
1
Flow completed exceptionally
Caught exception
onCompletion 연산자는 catch 연산자와 다르게, 예외상항을 처리하지 않습니다. 예제코드에서 알 수 있듯, 예외는 여전히 하류(downstream) 로 전달됩니다. onCompletion 연산자 코드블록으로 예외가 전달되고, catch 연산자와 함께 처리도 가능합니다.
예제 코드는 onComplete, catch 연산자의 순서로 코드블록이 작성되었습니다. 만약 catch, onComplete 로 순서를 변경한다면 어떻게 될까요? 실행 결과는 아래와 같습니다. (catch 연산자의 내부 구현체를 함께 확인하면 좋습니다.)
1
Caught exception
Immerative versus declarative
이제 우리는 flow 를 수집하는 방법과 완료 및 예외에 대하여 명령적, 선언적 방법을 알게 되었습니다.
'어떠한 방식을 더 선호하는지, 그리고 그 이유는 무엇인지?'
라는 질문이 자연스럽게 나올 수 있습니다. 라이브러리로서 두 가지 방식이 모두 유효하기 때문에, 우리는 특별히 어떠한 방식을 지지 하지 않으며, 당신의 선호도나 코딩 스타일에 따라 선택 되어도 좋다고 생각합니다.
Launching flow
어떤 소스에서 비동기 이벤트들을 획득하고 나타내기 위해서 flow 를 사용하여 쉽게 표현할 수 있습니다. 이 경우, 각 이벤트를 획득하고 이에 대한 추가 작업을 계속하는 'addEventListener' 같은 아날로그 방식의 함수를 등록학여 처리가 가능합니다. flow 에서 onEach 연산자는 이러한 역할을 수행합니다.
하지만 onEach 연산자는 중간 연산자입니다. 우리는 flow 수집을 시작하기 위한 목적으로 종료 연산자가 필요합니다. 그렇지 않으면 onEach 만 호출하는 것은 아무런 효과가 없습니다.
만약 collect 종료 연산자를 onEach 이 후에 호출하면, 이 예제코드는 flow 가 수집 완료될 때 까지 기다립니다. 출력 결과는 다음과 같습니다.
Event: 1
Event: 2
Event: 3
Done
launchIn 종료 연산자가 여기에서 유용하게 사용될 수 있습니다.
(이전 예제에서) collect 를 launchIn 으로 변경함으로써 flow 의 수집 동작을 분리된 코루틴으로 실행이 가능하며, 따라서 이 후에 작성된 코드가 즉시 즉시 실행됩니다.
아래와 같이 출력됩니다 :
Done
Event: 1
Event: 2
Event: 3
launchIn 연산자는 flow 가 실행되어 수집을 수행하기 위하여 명시적인 CoroutineScope 지정이 필요합니다. 위 예에서는 runBlocking 코루틴 빌더로부터 획득한 스코프를 사용함으로써, runBlocking 스코프는 하위 코루틴의 완료를 기다리기 때문에 메인 함수 가 반환되어 프로그램이 종료되지 않도록 유지합니다.
실제 어플리케이션에서 스코프는 제한된 생명주기를 갖는 요소에서 전달될 수 있습니다. 이러한 요소의 생명주기가 종료 되면, 그 즉시 엔티티로부터 전달된 스코프는 취소가 되어지며, flow 의 수집 또한 취소됩니다. 이런식으로, onEach {...}.launchIn(scope) 와 addEventListener 는 동일하게 수행됩니다.(=개념적으로 그렇다는 의미) 반면, 취소(=cancellation) 와 구조화된 동시성처리를 제공하기 때문에 removeEventListener 는 굳이 필요하지 않습니다.
launchIn 또한 job 을 반환하는 것을 주의하세요. 이는 전체 스코프를 취소하거나 특정 Job 에 대한 join 하지 호출하지 않고, flow 의 수집동작을 수행하는 코루틴만 취소하는데 사용할 수 있습니다.
Flow cancellation checks
flow 빌더는 편의를 위하여 ensureActive 를 통해 각 방출되어지는 값에 대한 취소 여부 체크를 수행합니다. 이것은 flow {...} 에서 방출되는 루프가 취소 가능하다는 것을 의미합니다.
예제코드에서 3개의 숫자가 수집되는 것을 확인하고, 4번째 숫자를 방출할 때 CancellationException 이 발생되는 것을 확인할 수 있습니다.
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@de0a01f
하지만, 대부분의 다른 flow 연산자들은 성능상의 이유로 스스로 추가적인 취소 여부 체크를 수행하지 않습니다. 예를들어, 만약 IntRange.asFlow 확장함수로 동일한 반복 작업을 수행하고 아무 곳에서도 일시중단하지 않을 경우, 취소에 대하여 확인을 하지 않습니다.
예제코드 실행결과, 1부터 5까지 모든 숫자가 수집 후 runBlocking 에서 결과를 반환하기 전에 취소가 runBlocking 에서 감지됩니다.
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled;
job=BlockingCoroutine{Cancelled}@5ecddf8f
Making busy flow cancellable
코루틴으로 사용중인 루프가 있는 경우 명시적으로 취소를 체크해야 합니다. `.onEach { currentCoroutineContext().ensureActive() }` 를 추가할 수 있지만, 다음과 같은 작업을 위하여 cancellable 연산자를 제공할 준비가 되어 있습니다.
예제에서 cancellable 연산자를 추가하여 실행할 경우, 1부터 3까지의 숫자만 수집되어집니다.
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled;
job=BlockingCoroutine{Cancelled}@5479e3f
Flow and Reactive Streams
Reactive Stream 이나 리액티브 프레임워크 (RxJava 나 Reactor 프로젝트 등)
에 익숙하신 분들은 아마 flow 의 디자인이 매우 유사해 보일 것입니다.
실제로 flow 는 Reactive Stream 과 다양한 구현체에서 영감을 얻었습니다.
하지만 flow 의 중요한 목표는 구조화된 동시성을 따르며 코틀린과 중단 함수를 이용하여 가능한 단순화 된 디자인을 갖는 것입니다. 이 목표를 달성하는 것은, Reactive 개척자들의 엄청난 성과 없이는 불가능했을 것입니다. Reactive Stream 과 Kotlin Flow 에 대한 완전한 이야기는 이 아티클을 통해 읽어보실 수 있습니다.
개념적으로는 다르지만 Flow 는 Reactive Stream 이며, reactive Publisher 로 변환하거나 그 반대의 경우 (=reactive Publisher 를 flow 로 변환)도 가능합니다. (스펙 및 TCK 를 준수함) 그러한 변환기는 kotlinx.coroutins 에 의해 제공되며, 그에 대응하는 리액티브 모듈에서 찾아볼 수 있습니다.
- kotlin-coroutines-reactive : Reactive Streams
- kotlin-coroutines-reactor : Project Reactor
- kotlin-coroutines-rx2 / kotlin-coroutines-rx3 : RxJava2 / RxJava3
통합 모듈은 flow 와의 상호 변환과 Reactor context 와의 통합, 다양한 리액티브 요소들을 사용할 수 있는 suspension 친화적인 방법이 포함되어 있습니다.