Reactive Stream규격의 Kotlin 구현. Asynchronouse cold stream. Asynchronouse 하게 동작하는게 코루틴과 찰떡으로 돌아간다.
데이터 스트림이라고 한다면, 한쪽(Producer)에서는 소스 데이터를 계속 넣어주고 이 데이터들이 일련의 파이프라인을 따라 처리된 후, 맞은편(Consumer)에서 데이터를 빼가는 모습을 생각할 수 있다. 주로 파일처리등의 대용량 데이터의 처리에 사용되지만, 이미 안드로이드에서 범용적으로 사용하고 있듯, 비동기적으로 데이터를 처리하는 reactive programming 형태에서는 데이터의 크기와 상관없이 보편적으로 사용되며 꽤 많은 편의성을 제공하는 것으로 보인다.
Channel
코틀린에서는 코루틴간에 이런 데이터 스트림을 위해 Channel이란걸 제공한다. 채널은 BlockingQueue와 유사한데, 값을 넣어주는 블록킹으로 동작하는 put() 대신에 suspend 함수인 send()를 사용하고, 마찬가지로 블록킹으로 동작하는 take() 대신에 suspend receive()를 사용한다.
아주 간단한 channel 예제를 살펴보자.
fun main(args: Array<String>): Unit = runBlocking {
channelSample()
println("\nend of main")
}
suspend fun channelSample() {
val channel = Channel<Int>()
CoroutineScope(Dispatchers.Default).launch {
for(x in 1..5) {
println("send $x^2")
channel.send(x*x)
}
}
repeat(5) {
println("received : ${channel.receive()}")
}
println("Done!")
}
Channel을 하나 만들어 Dispatchers.Default의 코루틴에서 send()로 데이터를 보내고, 메인에서 receive()로 데이터를 받고 있다. 결과는 다음과 같다.
send 1^2
send 2^2
received : 1
received : 4
send 3^2
send 4^2
received : 9
send 5^2
received : 16
received : 25
Done!
end of main
결과를 보면, Producer 쪽은 Consumer에 상관없이 데이터를 쓰고 있는게 보인다. 일반적인 큐를 이용하는 듯한 이러한 데이터 스트림은 Producer/Consumer가 독립적으로 동작하고, Consumer가 일을 안해도 Producer는 일을 하며 항상 활성화 되어 있기 때문에, ‘Hot’ stream이라고 말한다. Flow를 보면서 비교해보자.
Flow
Flow는 builder를 이용하여 먼저 Flow를 리턴하는 suspend 함수를 정의한다.
suspend fun flowSample(): Flow<Int> = flow {
for(i in 1..5) {
delay(100)
println("emit $i")
emit(i)
}
}
리턴값이 Flow<> 타입인 suspend 함수로, flow 빌더의 한 형태인 flow { }로 정의했다. Channel과 다르게 send()가 아닌 emit()으로 값을 보내는게 보인다. 값을 어떻게 받는지 살펴보자.
fun main(args: Array<String>): Unit = runBlocking {
flowSample().collect{value -> println("value = $value")}
println("\nend of main")
}
Consumer가 값을 가져오는건 Flow.collect()를 이용하고 있다. 받아오는 값을 처리하기 위한 코드는 람다 형태로 사용하고 있다. Channel처럼 값을 가져오기 위해 루프를 돌지 않고 값이 처리되는걸 볼 수 있다. 출력값을 보자.
emit 1
value = 1
emit 2
value = 2
emit 3
value = 3
emit 4
value = 4
emit 5
value = 5
end of main
값을 하나 emit()하면 collect의 람다가 실행되고, 이렇게 하나씩 순차적으로 실행되는걸 볼 수 있다. KotlinConf 2019: Asynchronous Data Streams with Kotlin Flow 에서 소개된 그래프로 보면 보다 직관적이다.

처음에는 Flow<R>을 정의만 해놓고 collect를 호출하면, 그 때부터 실행이 되면서 하나씩 emit을 기준으로 번갈아가며 실행되는 것이다.
Flow는 Channel처럼 producer와 consumer가 별도로 실행되는게 아니다. Flow는 미리 정의해놓고, collect()를 호출해주면, Flow의 코드를 하나의 값이 emit()될 때까지 실행한 후, collect()의 람다 함수가 그 값으로 실행된다. 이 과정이 emit이 모두 끝날 때까지 반복된다. 눈치가 빠른 사람은 알텐데, 형태만 Channel과 같지, 그냥 Flow의 collect() 함수를 람다 형태의 callback 함수와 함께 호출해 주는 것과 같다.
Flow is Cold stream!
Flow의 구현을 좀 더 깊게 알아보자. Flow 정의를 살펴보면, 하나의 인터페이스로 suspend 함수인 collect()를 갖도록 정의되어 있다.
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
인자값인 FlowCollector를 보면, 함수 객체 인터페이스로, suspend 함수인 emit()을 갖도록 정의되어 있다.
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
동작을 정확히 알기위해, 구현 부분인 flow{ } 빌더의 구현을 보면,
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
우리가 flow{ … } 블럭안에 넣는 람다 함수는 block이란 이름의 FlowCollector<T>의 확장함수로 넘어가는걸 볼 수 있다. 구현체는 SafeFlow(block)이니까 따라가보면,
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
AbstractFlow<T>를 따라가야 알 수 있을거 같다. 또 들어가보자.
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
...
이제 좀 보인다. 우리가 Flow의 collect{ … } 를 호출해주면, 값을 받아 처리하는 Consumer 람다 함수가 FlowCollector<T> 함수객체로 넘어간 것이다. 이 collector로 safeCollector를 만들고 이걸 collectSafely()의 인자로 넘긴다.
거꾸로 다시 거슬러 올라가면, SafeFlow<T>에서 collectSafely()를 오버라이드 했는데, collector.block()을 불러주고 있다. 앞에서도 말했지만, block은 flow 빌더로 flow를 정의할 때 구현한 람다 함수 이다. 이 block을 collector의 확장함수로 정의되었기 때문에, collector.block()으로 실행이 가능하다. 즉, collect()를 실행하면, 데이터 소스를 emit하는 producer 코드를 실행하는 것이다. emit()의 구현까지 찾아보면, collector 본체와 block 사이를 오고가는 내용까지 알겠지만, 그건 알아서 해보던지 🙂 핵심은 Flow의 경우, 단순히 코루틴 내에서 함수의 호출로 구현된 점이라는 것이다. Channel처럼 Consumer가 동작을 멈췄을 때, Producer가 데이터를 계속 생성하면서 살아있는 일은 있을 수가 없다. 그렇기 때문에 Channel은 ‘hot’ stream이라고 부르듯, Flow는 collect를 실행할 때만 동작하는 ‘Cold’ stream이다.
Context Preservation
Flow의 collection은 단순한 함수 호출이므로, 항상 부모 코루틴의 context내에서 이루어진다. 이걸 context Preservation이라 한다. 만약, 다른 context에서 값을 emit하도록 구현하고싶다면, Flow가 단순한 함수 콜 형태이기 때문에, 여러가지로 복잡한 문제가 생긴다. 하지만, 시간이 걸리는 작업의 결과물로 UI 업데이트를 하고싶은 너무나도 평범하고 흔한 경우에도 context의 변경은 필요해 보인다. 이를 위해 제공하는 것이 flowOn(Dispatcher) 연산자이다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
결과값을 살펴보면,
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
emit하는 코루틴과 collect하는 코루틴이 다른 것을 볼 수 있다. 어떻게 구현된건지 조금 궁금하긴 하지만, 이렇게 알아갈려면 한도 끝도 없을거 같아서 일단 넘어간다. 내부적으로 채널을 쓰는건가 싶기도?
flowOn은 중간 연산자인데, upstream의 CoroutineContext를 변경한다. 즉, flowOn 이전 파이프라인에 적용된다. kotlin api 문서에 예제코드가 나와있다.
withContext(Dispatchers.Main) {
val singleValue = intFlow // will be executed on IO if context wasn't specified before
.map { ... } // Will be executed in IO
.flowOn(Dispatchers.IO)
.filter { ... } // Will be executed in Default
.flowOn(Dispatchers.Default)
.single() // Will be executed in the Main
}
주석에 달려있듯이, flowOn 이전 부분에만 해당 context가 적용되는걸 알 수 있다.
Flow Builders
앞에서 기본적인 flow builder를 다뤘는데, 여러가지 flow builder들이 존재한다.
val myFlow = flow {...}
val myFlow = flowOf("aaa", "bbb", "ccc", "ddd")
val myFlow = listOf("aaa", "bbb", "ccc", "ddd").asFlow()
뭐, 요정도?
Flow Operators
앞에서 잠깐 다뤘던 flowOn이나 collect등은 다 연산자(Operator)들이다. Flow가 reactive programming에 기원했듯, 거기서 사용되는 각종 연산자들을 사용할 수 있다. 이 연산자들은 파이프라인처럼 작동하여 데이터를 가공한다.
Flow 연산자(Operator)에는 terminal operator와 intermediate operator가 있다. terminal operator는 최종적으로 데이터를 collect하는 것들이다. 먼저 terminal operator들을 살펴보자. 예제들은 Kotlin Flow: Terminal Operators 포스팅과 Android-9 Useful Kotlin Flow Operators You Need to Know 등에서 가져왔다.
- collect() : 가장 기본적인 terminal operator
val numbersFlow = flowOf(1, 2, 3, 4, 5)
numbersFlow.collect { number ->
println("Received number: $number")
}
- first : collect한 데이터중에 조건에 맞는 첫번째 아이템을 돌려주고 멈춘다.
val numbersFlow = flowOf(1, 2, 3, 4, 5)
val firstEvenNumber = numbersFlow.first { it % 2 == 0 }
println(firstEvenNumber) // prints: 2
- last : 마지막으로 emit된 값을 돌려준다.
val numbersFlow = flowOf(1, 2, 3, 4, 5)
println(numbersFlow.last())
- toList : collect한 데이터를 리스트로 바꿔준다.
- toSet : collect한 데이터를 셋으로 바꿔준다.
val numbersFlow = flowOf(1, 2, 3, 4, 5)
val numbersList = numbersFlow.toList()
println(numbersList) // prints: [1, 2, 3, 4, 5]
- reduce : 데이터를 합산하는 방법을 제공한다.
flowOf(1, 2, 3)
.reduce { acc, value -> acc + value }
람다 함수의 인자가 두개인데, 앞이 accumulator로 합산되는 값이고, value는 매번 들어오는 하나의 값이다. 위 코드에서는 accumulator에 value를 모두 더해주고 있다.
- fold : reduce와 동일하지만, 초기값이 존재한다.
val numbersFlow = flowOf(1, 2, 3, 4, 5)
val sumFromTen = numbersFlow .fold(10) { total, num -> total + num }
println("folded: $sumFromTen")
이 외에도 여러가지가 있음을 알아두자.
이제 Intermediate operator들을 알아보자. 이것들은 파이프라인 체인처럼 동작하며, 위에서 아래(또는 앞에서 뒤)의 순서대로 데이터를 변환한다. collect를 하는 경우, 반대로 아래에서 위(또는 뒤에서 앞)로 데이터 collect를 요청한다. flowOn이 그 이전단계에 영향을 미치는 이유도 이렇게 반대로 요청하기 때문이다.
- flowOn(Dispatcher) : flow를 해당 Dispatcher로 동작하게 만든다. flowOn을 호출한 이전 단계에 영향을 준다. 예제는 앞에서 나왔던 것과 동일한 예제.
withContext(Dispatchers.Main) {
val singleValue = intFlow // will be executed on IO if context wasn't specified before
.map { ... } // Will be executed in IO
.flowOn(Dispatchers.IO)
.filter { ... } // Will be executed in Default
.flowOn(Dispatchers.Default)
.single() // Will be executed in the Main
}
- map : 말 그대로 value를 value에 해당하는 다른 값으로 매핑 하는 것이다. 블럭 안에서 suspend 함수들을 사용할 수 있다.
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
- filter : 조건에 맞지 않는 값들을 걸러내고 맞는 값들만 내보낸다.
val numbersFlow = flowOf(1, 2, 3, 4, 5, 6)
val evenNumbersFlow = numbersFlow.filter { it % 2 == 0 }
evenNumbersFlow.collect { println(it) } // Output: 2, 4, 6
- onEach : 데이터 값들에 변형은 하지 않고, 내보내기 전에 코드 블럭을 실행한다. 예를들어, 그냥 값을 찍어볼 수 있다.
flowOf(1, 2, 3)
.onEach { println(it) }
- transform : map이나 filter보다 일반적인 변형을 하는 경우 사용하는 연산자이다. 코드 블럭 내에서 emit을 사용할 수 있다.
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
- take(number) : 사이즈를 제한하는 연산자이다. number 만큼 처리하면, flow를 cancel 시킨다.
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
이것들은 기본적인 연산자들이고, 이 외에도 여러가지가 있는데, 하나로 정리된 문서를 못찾겠다. 공식문서가 그나마 가장 잘 정리되어 있는거 같다.
요정도로 정리하고, 개별 포스팅으로 stateFlow와 sharedFlow를 다뤄보겠다. 안드로이드에서 어떻게 사용하는지도 다뤄야겠네.