Kotlin na Androidzie

W współudziałach typ przepływ to typ, który może generować kilka wartości sekwencyjnie, w odróżnieniu od zawieszania funkcji, które zwracają tylko 1 wartość. Za pomocą przepływu można na przykład otrzymywać aktualne aktualizacje z bazy danych.

Przepływy są oparte na współrzędnych i mogą przyjmować wiele wartości. Przepływ to koncepcyjnie strumień danych, który można obliczyć asynchronicznie. Wyemitowane wartości muszą być tego samego typu. Na przykład Flow<Int> to przepływ, który generuje wartości całkowite.

Przepływ jest bardzo podobny do funkcji Iterator, która generuje sekwencję wartości, ale używa funkcji zawieszania do asynchronicznego generowania i wykorzystywania wartości. Oznacza to na przykład, że przepływ może bezpiecznie wysłać żądanie sieciowe, aby wygenerować kolejną wartość, bez blokowania wątku głównego.

W strumieniach danych są powiązane 3 encje:

  • Producent tworzy dane, które są dodawane do strumienia. Dzięki współudziałom przepływy mogą również generować dane asynchronicznie.
  • (Opcjonalnie) Pośrednicy mogą modyfikować każdą wartość przesyłaną do strumienia lub samego strumienia.
  • Konsument pobiera wartości ze strumienia.

podmioty zaangażowane w strumienie danych; konsument,
              opcjonalnie pośrednicy i producent;
Rysunek 1. Podmioty zaangażowane w strumienie danych: konsument, opcjonalnie pośrednicy i producent.

W przypadku Androida repozytorium jest zwykle producentem danych interfejsu, które jako konsument ostatecznie wyświetla dane za pomocą interfejsu użytkownika. Zdarza się też, że warstwa interfejsu jest producentem zdarzeń wejściowych użytkownika, a inne warstwy hierarchii je wykorzystują. Warstwy między producentem a konsumentem zwykle pełnią funkcję pośredników, które modyfikują strumień danych w celu dostosowania go do wymagań następnej warstwy.

Tworzenie przepływu

Aby utworzyć przepływy, użyj interfejsów API flow Builder. Funkcja kreatora flow tworzy nowy proces, w którym możesz ręcznie wysyłać nowe wartości do strumienia danych za pomocą funkcji emit.

W poniższym przykładzie źródło danych automatycznie pobiera najnowsze wiadomości w ustalonych odstępach czasu. Ponieważ funkcja zawieszania nie może zwracać wielu kolejnych wartości, źródło danych tworzy i zwraca przepływ, aby spełnić to wymaganie. W tym przypadku źródło danych działa jako producent.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Kreator flow jest wykonywany we współpracy. Pozwala to korzystać z tych samych asynchronicznych interfejsów API, ale obowiązują w nich pewne ograniczenia:

  • Przepływy są sekwencyjne. Ponieważ producent jest w korytynie, wywołując funkcję zawieszania, producent zawiesza się do czasu, gdy funkcja zawieszania wróci. W tym przykładzie producent zawiesza się do momentu zakończenia żądania sieciowego fetchLatestNews. Dopiero wtedy wynik jest wysyłany do strumienia.
  • Za pomocą konstruktora flow producent nie może emit wartości z innego CoroutineContext. Dlatego nie używaj wywołania emit w innym obiekcie CoroutineContext przez utworzenie nowych współrzędnych lub za pomocą withContext bloków kodu. W takich przypadkach możesz użyć innych kreatorów przepływu, takich jak callbackFlow.

Modyfikowanie strumienia

Pośrednicy mogą używać operatorów pośrednich do modyfikowania strumienia danych bez używania tych wartości. Operatory te to funkcje, które po zastosowaniu do strumienia danych tworzą łańcuch operacji, które nie będą wykonywane, dopóki wartości nie zostaną wykorzystane w przyszłości. Więcej informacji o operatorach pośrednich znajdziesz w dokumentacji przepływu.

W poniższym przykładzie warstwa repozytorium używa operatora pośredniego map do przekształcenia danych, które mają być wyświetlane w View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Operatory pośrednie można stosować jeden po drugim, tworząc łańcuch operacji wykonywanych leniwie, gdy element trafia do przepływu. Pamiętaj, że samo zastosowanie operatora pośredniego do strumienia nie uruchamia zbierania danych.

Gromadzenie danych

Użyj operatora terminala, aby aktywować przepływ nasłuchujący wartości. Aby pobrać wszystkie wartości w strumieniu w miarę ich emisji, użyj metody collect. Więcej informacji o operatorach terminali znajdziesz w oficjalnej dokumentacji procesu.

Ponieważ collect jest funkcją zawieszania, musi być wykonywana wewnątrz współrzędu. Przyjmuje ona lambda jako parametr, który jest wywoływany przy każdej nowej wartości. Ponieważ jest to funkcja zawieszenia, współprogram, który wywołuje funkcję collect, może zostać zawieszony do momentu zamknięcia przepływu.

Nawiązując do poprzedniego przykładu, oto prosta implementacja obiektu ViewModel zużywającego dane z warstwy repozytorium:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Rejestrowane przepływy uruchamiają producenta, który odświeża najnowsze wiadomości i nadaje wynik żądania sieciowego w stałym przedziale czasu. Ponieważ producent pozostaje zawsze aktywny z pętlą while(true), strumień danych zostanie zamknięty po wyczyszczeniu modelu widoku danych i anulowaniu viewModelScope.

Zbieranie przepływów może zostać zatrzymane z tych powodów:

  • Współorganizacja, która zbiera dane, jest anulowana, jak pokazano w poprzednim przykładzie. Spowoduje to też zatrzymanie producenta.
  • Producent kończy wydawanie produktów. W takim przypadku strumień danych zostanie zamknięty, a współpraca o nazwie collect wznawia działanie.

Przepływy są zimne i leniwe, chyba że zostały określone za pomocą innych operatorów pośrednich. Oznacza to, że kod producenta jest wykonywany za każdym razem, gdy w przepływie jest wywoływany operator terminala. W poprzednim przykładzie korzystanie z kilku kolektorów przepływu powoduje, że źródło danych wielokrotnie pobiera najnowsze wiadomości w różnych stałych odstępach czasu. Aby zoptymalizować i udostępniać przepływ, gdy wielu klientów zbiera dane w tym samym czasie, użyj operatora shareIn.

Wychwytywanie nieoczekiwanych wyjątków

Implementacja producenta może pochodzić z biblioteki zewnętrznej. Oznacza to, że może on zgłaszać nieoczekiwane wyjątki. Aby obsługiwać te wyjątki, użyj operatora pośredniego catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

W poprzednim przykładzie, gdy wystąpi wyjątek, funkcja lambda collect nie jest wywoływana, ponieważ nie otrzymano nowego elementu.

catch może również emit elementu ścieżki. Warstwa repozytorium mogłaby za to emit wyświetlić z pamięci podręcznej wartości z pamięci podręcznej:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

W tym przykładzie, gdy wystąpi wyjątek, wywoływana jest funkcja lambda collect, ponieważ w wyniku wyjątku do strumienia został wysłany nowy element.

Wykonywanie w innym kontekście CoroutineContext

Domyślnie producent konstruktora flow wykonuje działanie w CoroutineContext współrzędnej, która pobiera z niego dane, i jak już wspomnieliśmy, nie może emitwartości z innej CoroutineContext. W niektórych przypadkach takie zachowanie może być niepożądane. Na przykład w przykładach używanych w tym temacie warstwa repozytorium nie powinna wykonywać operacji na obiekcie Dispatchers.Main, który jest używany przez viewModelScope.

Aby zmienić CoroutineContext przepływu, użyj operatora pośredniego flowOn. flowOn zmienia element CoroutineContext nadrzędnego procesu, co oznacza producenta i wszystkich operatorów pośrednich stosowanych przed (lub powyżej) flowOn. Nie ma to wpływu na przepływ po ścieżce (operatory pośrednie po flowOn wraz z klientem) i są wykonywane w elemencie CoroutineContext używanym w collect z przepływu. Jeśli jest kilka operatorów flowOn, każdy z nich zmienia lokalizację nadrzędną względem bieżącej lokalizacji.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

W przypadku tego kodu operatory onEach i map używają defaultDispatcher, podczas gdy operator catch i konsument są wykonywane na elemencie Dispatchers.Main używanym przez viewModelScope.

Ponieważ warstwa źródła danych wykonuje operacje wejścia-wyjścia, należy użyć dyspozytora zoptymalizowanego pod kątem operacji wejścia-wyjścia:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Przepływy w bibliotekach Jetpack

Flow jest zintegrowany z wieloma bibliotekami Jetpack i jest popularny wśród bibliotek zewnętrznych Androida. Flow idealnie nadaje się do aktualizacji danych w czasie rzeczywistym i niekończących się strumieni danych.

Możesz użyć narzędzia Flow with Room, aby otrzymywać powiadomienia o zmianach w bazie danych. Gdy używasz obiektów dostępu do danych (DAO), zwracaj typ Flow, aby otrzymywać aktualizacje na żywo.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Za każdym razem, gdy występuje zmiana w tabeli Example, wysyłana jest nowa lista wraz z nowymi elementami w bazie danych.

Przekształcanie interfejsów API opartych na wywołaniach zwrotnych na przepływy

callbackFlow to kreator przepływu, który umożliwia przekształcanie interfejsów API opartych na wywołaniach zwrotnych w przepływy. Na przykład interfejsy API Firebase Firestore na Androida używają wywołań zwrotnych.

Aby przekonwertować te interfejsy API na przepływy i nasłuchiwać aktualizacji bazy danych Firestore, możesz użyć tego kodu:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

W przeciwieństwie do konstruktora flow dyrektywa callbackFlow zezwala na wysyłanie wartości z innego obiektu CoroutineContext za pomocą funkcji send lub poza współrzędną z funkcją trySend.

Wewnętrznie callbackFlow używa kanału, który koncepcyjnie jest bardzo podobny do kolejki blokowania. Kanał jest skonfigurowany z pojemnością, czyli maksymalną liczbą elementów, które można buforować. Kanał utworzony w callbackFlow ma domyślną pojemność 64 elementów. Jeśli spróbujesz dodać nowy element do kanału pełnego kanału, send zawiesza producenta do czasu, aż będzie na to miejsce. Z kolei trySend nie doda elementu do kanału i natychmiast zwróci false.

trySend natychmiast dodaje określony element do kanału tylko wtedy, gdy nie narusza to jego ograniczeń dotyczących pojemności, a potem zwraca odpowiedni wynik.

Dodatkowe materiały dotyczące przepływu