안드로이드에서 Flow로 읽어오는 데이터는 UI에 사용될 시, 주로 viewmodel에 저장하는 livedata형태로 변환해서 사용하게 된다. UI 업데이트는 observable한 데이터를 필요로 하기 때문이다. Flow는 cold stream이기 때문에 observable한 형태는 불가능하다. 옵저버 패턴은 다른말로 발행-구독(Publisher-Subscriber) 모델로 말하기도 하는데, 발행하는 쪽이 데이터가 바뀔 때마다 구독자들에게 브로드 캐스팅을 해야하기 때문에, hot stream 형태로 구현되어야 한다. 만약, livedata를 사용하지 않는다면? Flow만으로는 방법이 없다.
안드로이드에서 livedata는 lifecycle을 갖는 곳에서만 사용가능 하기 때문에, 데이터만 다루는 도메인 레이어에서 사용하기 부적합하다. 그럼에도 개발자들이 필요로 했고, livedata없이 지원하기 위한 Flow가 필요했다.
어디까지나 안드로이드 측면에서 이와같은 필요에 의해 나온것이 Shared Flow와 State Flow이다.
Shared Flow
Kotlin의 코루틴에서 발행-구독 모델은 처음에 BroadcastChannel로 구현되었었다. 그러나 버전이 올라가면서 더 나은 Shared Flow로 대체되었다. Shared Flow는 RxJava의 Subject에 해당한다고 한다. RxJava를 내가 안써봐서 모르겠지만, Shared Flow의 등장이 Reactive programming의 영향도 있었던거 같다.
당연하게도 이벤트의 브로드캐스팅에 최적화된 형태이다.

sharedFlow의 형태를 다이어그램으로 보여주는 위 그림은 Shared flows, broadcast channels 에서 가져왔다.
SharedFlow는 emit과 collect가 비동기적으로 작동하므로, 당연하게도 back-pressure가 존재한다. back-pressure란, emit 속도가 collect 속도보다 빠를 경우, 데이터가 밀려서 버퍼오버플로우가 발생하는걸 말한다. Shared flow는 버퍼가 가득차면, emit을 멈춘다고 한다. 또한 BufferOverflow 파라미터로 조정이 가능하다.
간단한 예제를 살펴보자.
class SharedFlowClass() {
private val _mutableSharedFlow =
MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String>
get() = _mutableSharedFlow
suspend fun broadcast() {
delay(1000)
_mutableSharedFlow.emit("Message1")
_mutableSharedFlow.emit("Message2")
}
}
fun main(args: Array<String>): Unit = runBlocking {
val sharedFlowClass = SharedFlowClass()
launch {
sharedFlowClass.sharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlowClass.sharedFlow.collect {
println("#2 received $it")
}
}
sharedFlowClass.broadcast()
println("\nend of main")
}
MutableSharedFlow는 가장 기본적인 SharedFlow의 구현이다. 클래스에서 MutableSharedFlow를 정의하고 읽기전용으로 SharedFlow만 노출시켜서 emit은 이 클래스에서만 가능하도록 만들었다. 메인함수에서는 두 개의 코루틴을 만들어 collect를 사용하여 두개의 구독자를 만들었다. sharedFlowClass.broadcast()는 여유있게 나중에 실행하여 구독이 제대로 동작하게 하였다. 결과를 보면,
#1 received Message1
#2 received Message1
#1 received Message2
#2 received Message2
end of main
구독 코루틴에서 각각 메세지를 접수한걸 확인가능하다. 주의할점은, 프로세스가 끝나지 않는다는 점이다. runBlocking이 코루틴의 종료를 대기중인데, SharedFlow는 종료가 없기 때문에 무한 대기중인 구독 코루틴이 끝나지 않기 때문이다. SharedFlow는 종료가 안되지만, 구독 코루틴을 cancel하여 종료가 가능하다. 메인함수를 다음과 같이 변형하여 종료시켜보자.
fun main(args: Array<String>): Unit = runBlocking {
val sharedFlowClass = SharedFlowClass()
val job1 = launch {
sharedFlowClass.sharedFlow.collect {
println("#1 received $it")
}
}
val job2 = launch {
sharedFlowClass.sharedFlow.collect {
println("#2 received $it")
}
}
sharedFlowClass.broadcast()
delay(1000)
job1.cancelAndJoin()
job2.cancelAndJoin()
println("\nend of main")
}
실행시켜 보면, 정상 종료됨을 확인할 수 있다.
안드로이드에서 Shared flow가 자주 쓰이는 거 같지 않지만, Shared Flow의 한 형태인 State Flow는 매우 유용하게 쓰인다. State Flow를 살펴보자.
State Flow
State flow는 Shared flow의 한 형태이다. 바로 위에서 back-pressure에 대해 다뤘었는데, buffer overflow를 해결하는 전략중 하나로, 오래된 데이터를 드롭해 버리고 새 데이터를 넣는 방법이 있다. State flow는 이런식으로 항상 최신의 데이터, 곧 state만 유지한다.
State Flow는 값의 상태변화를 브로드캐스트하는 Shared Flow로 해석할 수 있다. 안드로이드에서 이게 중요한 이유는, state flow가 livedata처럼 작동하기 때문에, 안드로이드의 lifecycle에 구애받지 않고 도메인 레이어까지 얼만든지 사용가능하다.
간단한 예제를 살펴보자.
class StateFlowClass() {
private val _mutableStateFlow = MutableStateFlow<String>("hello? Initial state")
val stateFlow: StateFlow<String>
get() = _mutableStateFlow
suspend fun changeState(msg: String) {
_mutableStateFlow.value = msg
}
}
fun main(args: Array<String>): Unit = runBlocking {
val stateFlowClass = StateFlowClass()
launch {
stateFlowClass.stateFlow.collect {
println("state : $it")
}
}
delay(1000)
stateFlowClass.changeState("I'm solid state")
delay(1000)
stateFlowClass.changeState("I'm liquid state")
delay(1000)
stateFlowClass.changeState("I'm gas state")
println("\nend of main")
}
메인에서 collect를 하고 값을 1초 간격으로 계속 바꿔주고 있다. 주의할 점은, changeState()를 보면, emit을 쓰지않고, value의 값을 바꿔주는 것을 볼 수 있다. 단일 값에 대해 변경만 해주는 것이다. 또한, livedata와 다르게 초기값이 필요한 것도 알 수 있다. 결과를 보면,
state : hello? Initial state
state : I'm solid state
state : I'm liquid state
end of main
state : I'm gas state
실행시, delay(1000)에 따라 1초간격으로 상태가 바뀌는걸 확인 할 수 있다. 또한, SharedFlow와 동일하게, 프로세스가 종료되지 않는다. StateFlow도 종료가 없기 때문이다.
State Flow in Android
공식 가이드에 나온 예제를 첨부해 본다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
// Backing property to avoid state updates from other classes
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// The UI collects from this StateFlow to get its state updates
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Update View with the latest favorite news
// Writes to the value property of MutableStateFlow,
// adding a new element to the flow and updating all
// of its collectors
.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
data class Success(val news: List<ArticleHeadline>): LatestNewsUiState()
data class Error(val exception: Throwable): LatestNewsUiState()
}
repository로부터 읽어오는 favoriteLatestNews도 Flow로 읽어오는게 보인다. repository에서 emit되어 뉴스가 업데이트 되면, StateFlow인 uiState의 value에 뉴스 헤드라인을 업데이트한다. UI에서 가져가는 코드도 살펴보면,
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
repeatOnLifecycle()을 쓰고있다. 주석에도 써있지만, 이것은 인자로 넘겨준 Lifecycle을 따라서 코루틴을 생성하고, 벗어나면 cancel시켜 멈추고, 다시 들어오면 코루틴을 새로 생성한다. Lifecycle을 자동으로 따라가는 Livedata와의 차별점인데, 코루틴은 생성과 종료의 CoroutineScope만 존재하고 Lifecycle의 state는 상관이 없기 때문에 이런 방식을 사용한다. 이렇게 하는건 당연하게도 UI가 보여질 때만 의미가 있고, UI 컴포넌트들도 유효하기 때문이다.
StateFlow를 collect하는건 간단하고 일반적인 Flow랑 동일하게 보인다.
일단 마무리
뭔가… 더 많은게 있었던거 같은데, 이정도면 기본적인건 다룬거 같아서 일단 여기서 마무리 짓겠다. 복잡한 경우를 다 나열하기도 힘들고, 나도 겪어봐야 아는거고, 그런 각각의 케이스들은 이제 찾아보면서 해결책을 찾아야지 뭐. 어쨌든 Flow는 여기서 끝.