StateFlow 및 SharedFlow

StateFlowSharedFlow는 흐름에서 최적으로 상태 업데이트를 내보내고 여러 소비자에게 값을 내보낼 수 있는 Flow API입니다.

StateFlow

StateFlow는 현재 상태와 새로운 상태 업데이트를 수집기에 내보내는 관찰 가능한 상태 홀더 흐름입니다. value 속성을 통해서도 현재 상태 값을 읽을 수 있습니다. 상태를 업데이트하고 흐름에 전송하려면 MutableStateFlow 클래스의 value 속성에 새 값을 할당합니다.

Android에서 StateFlow는 관찰 가능한 변경 가능 상태를 유지해야 하는 클래스에 아주 적합합니다.

Kotlin 흐름의 예를 따라, View가 UI 상태 업데이트를 리슨하고 구성 변경에도 기본적으로 화면 상태가 지속되도록 LatestNewsViewModel에서 StateFlow를 노출할 수 있습니다.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // Backing property to avoid state updates from other classes
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    // The UI collects from this StateFlow to get its state updates
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Update View with the latest favorite news
                // Writes to the value property of MutableStateFlow,
                // adding a new element to the flow and updating all
                // of its collectors
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}

// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
    data class Success(val news: List<ArticleHeadline>): LatestNewsUiState()
    data class Error(val exception: Throwable): LatestNewsUiState()
}

MutableStateFlow 업데이트를 담당하는 클래스가 생산자이고, StateFlow에서 수집되는 모든 클래스가 소비자입니다. flow 빌더를 사용하여 빌드된 콜드 흐름과 달리 StateFlow 흐름입니다. 흐름에서 수집해도 생산자 코드가 트리거되지 않습니다. StateFlow는 항상 활성 상태이고 메모리 내에 있으며 가비지 컬렉션 루트에서 달리 참조가 없는 경우에만 가비지 컬렉션에 사용할 수 있습니다.

새로운 소비자가 흐름에서 수집을 시작하면 스트림의 마지막 상태와 후속 상태가 수신됩니다. LiveData 같이 관찰 가능한 다른 클래스에서 이 동작을 찾을 수 있습니다.

View는 다른 흐름과 마찬가지로 StateFlow를 리슨합니다.

class LatestNewsActivity : AppCompatActivity() {
    private val latestNewsViewModel = // getViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        // Start a coroutine in the lifecycle scope
        lifecycleScope.launch {
            // repeatOnLifecycle launches the block in a new coroutine every time the
            // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                // Trigger the flow and start listening for values.
                // Note that this happens when lifecycle is STARTED and stops
                // collecting when the lifecycle is STOPPED
                latestNewsViewModel.uiState.collect { uiState ->
                    // New value received
                    when (uiState) {
                        is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                        is LatestNewsUiState.Error -> showError(uiState.exception)
                    }
                }
            }
        }
    }
}

흐름을 StateFlow로 변환하려면 stateIn 중간 연산자를 사용합니다.

StateFlow, Flow, LiveData

StateFlowLiveData는 비슷한 점이 있습니다. 둘 다 관찰 가능한 데이터 홀더 클래스이며, 앱 아키텍처에 사용할 때 비슷한 패턴을 따릅니다.

그러나 StateFlowLiveData는 다음과 같이 다르게 작동합니다.

  • StateFlow의 경우 초기 상태를 생성자에 전달해야 하지만 LiveData의 경우는 그렇지 않습니다.
  • 뷰가 STOPPED 상태가 되면 LiveData.observe()는 소비자를 자동으로 등록 취소하는 반면, StateFlow 또는 다른 흐름에서 수집하는 경우 자동으로 수집을 중지하지 않습니다. 동일한 동작을 실행하려면 Lifecycle.repeatOnLifecycle 블록에서 흐름을 수집해야 합니다.

shareIn을 사용하여 콜드 흐름을 핫 흐름으로 만들기

StateFlow 흐름으로, 흐름이 수집되는 동안 또는 가비지 컬렉션 루트에서 다른 참조가 있는 경우 메모리에 남아 있습니다. shareIn 연산자를 사용하여 콜드 흐름을 핫 흐름으로 전환할 수 있습니다.

각 수집기에서 새 흐름을 만들 필요 없이 Kotlin 흐름에서 예로 생성한 callbackFlow를 사용하면 Firestore에서 가져온 데이터를 shareIn을 통해 수집기 간에 공유할 수 있습니다. 다음을 전달해야 합니다.

  • 흐름을 공유하는 데 사용되는 CoroutineScope. 공유 흐름을 필요한 만큼 유지하기 위해 이 범위는 소비자보다 오래 지속되어야 합니다.
  • 각 새 수집기로 재생할 항목의 수
  • 시작 동작 정책
class NewsRemoteDataSource(...,
    private val externalScope: CoroutineScope,
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        ...
    }.shareIn(
        externalScope,
        replay = 1,
        started = SharingStarted.WhileSubscribed()
    )
}

위 예시에서 latestNews 흐름은 마지막으로 내보낸 항목을 새 수집기로 재생하며, externalScope가 활성 상태이고 활성 수집기가 있는 한 활성 상태로 유지됩니다. SharingStarted.WhileSubscribed() 시작 정책은 활성 구독자가 있는 동안 업스트림 생산자를 활성 상태로 유지합니다. 다른 시작 정책도 사용할 수 있습니다. 예를 들면, SharingStarted.Eagerly를 사용하여 생산자를 즉시 시작하거나, SharingStarted.Lazily를 사용하여 첫 번째 구독자가 표시된 후 공유를 시작하고 흐름을 영구적으로 활성 상태로 유지할 수 있습니다.

SharedFlow

shareIn 함수는 수집하는 모든 소비자에게 값을 내보내는 핫 흐름인 SharedFlow를 반환합니다. SharedFlowStateFlow의 유연한 구성 일반화입니다.

shareIn을 사용하지 않고 SharedFlow를 만들 수 있습니다. 예를 들어 SharedFlow를 사용하면 모든 콘텐츠가 주기적으로 동시에 새로고침되도록 앱의 나머지 부분에 틱을 전송할 수 있습니다. 최신 뉴스를 가져오는 것 외에도 좋아하는 주제 컬렉션으로 사용자 정보 섹션을 새로고침할 수도 있습니다. 다음 코드 스니펫에서 TickHandler는 다른 클래스가 콘텐츠를 새로고침할 시기를 알 수 있도록 SharedFlow를 노출합니다. StateFlow의 경우처럼 클래스에서 MutableSharedFlow 유형의 지원 속성을 사용하여 항목을 흐름으로 보냅니다.

// Class that centralizes when the content of the app needs to be refreshed
class TickHandler(
    private val externalScope: CoroutineScope,
    private val tickIntervalMs: Long = 5000
) {
    // Backing property to avoid flow emissions from other classes
    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
    val tickFlow: SharedFlow<Event<String>> = _tickFlow

    init {
        externalScope.launch {
            while(true) {
                _tickFlow.emit(Unit)
                delay(tickIntervalMs)
            }
        }
    }
}

class NewsRepository(
    ...,
    private val tickHandler: TickHandler,
    private val externalScope: CoroutineScope
) {
    init {
        externalScope.launch {
            // Listen for tick updates
            tickHandler.tickFlow.collect {
                refreshLatestNews()
            }
        }
    }

    suspend fun refreshLatestNews() { ... }
    ...
}

다음과 같은 방법으로 SharedFlow 동작을 맞춤설정할 수 있습니다.

  • replay를 사용하면 이전에 내보낸 여러 값을 새 구독자를 위해 다시 보낼 수 있습니다.
  • onBufferOverflow를 사용하면 버퍼가 전송할 항목으로 가득 찬 경우에 적용할 정책을 지정할 수 있습니다. 기본값은 호출자를 정지시키는 BufferOverflow.SUSPEND입니다. 다른 옵션은 DROP_LATEST 또는 DROP_OLDEST입니다.

또한 MutableSharedFlow에는 활성 수집기의 수가 포함된 subscriptionCount 속성이 있어서 비즈니스 로직을 적절하게 최적화할 수 있습니다. MutableSharedFlow에는 흐름에 전송된 최신 정보를 재생하지 않으려는 경우를 위한 resetReplayCache 함수도 있습니다.

추가 흐름 리소스