In Koroutinen ist ein Fluss ein Typ, der mehrere Werte nacheinander ausgeben kann, im Gegensatz zu Sperrfunktionen, die nur einen einzelnen Wert zurückgeben. Mit einem Ablauf können Sie beispielsweise Live-Updates von einer Datenbank empfangen.
Abläufe basieren auf Koroutinen und können mehrere Werte bereitstellen.
Ein Fluss ist konzeptionell ein Datenstrom, der asynchron berechnet werden kann. Die ausgegebenen Werte müssen vom gleichen Typ sein. Ein Flow<Int>
ist beispielsweise ein Ablauf, der Ganzzahlwerte ausgibt.
Ein Ablauf ist einem Iterator
sehr ähnlich, der eine Folge von Werten erzeugt, aber Sperrfunktionen verwenden, um Werte asynchron zu erzeugen und zu nutzen. Dies bedeutet beispielsweise, dass der Ablauf sicher eine Netzwerkanfrage senden kann, um den nächsten Wert zu erzeugen, ohne den Hauptthread zu blockieren.
An Datenstreams sind drei Entitäten beteiligt:
- Ein Producer erzeugt Daten, die dem Stream hinzugefügt werden. Dank Koroutinen können Datenflüsse auch asynchron erzeugt werden.
- Vermittler(optional) können jeden Wert ändern, der in den Stream oder den Stream selbst ausgegeben wird.
- Ein Nutzer nimmt die Werte aus dem Stream auf.
In Android ist ein Repository in der Regel ein Ersteller von UI-Daten mit der Benutzeroberfläche als Nutzer, der die Daten letztendlich anzeigt. In anderen Fällen erzeugt die UI-Ebene Nutzereingabeereignisse, die von anderen Hierarchieebenen verarbeitet werden. Ebenen zwischen dem Ersteller und dem Nutzer fungieren in der Regel als Vermittler, die den Datenstrom an die Anforderungen der folgenden Ebene anpassen.
Ablauf erstellen
Verwenden Sie zum Erstellen von Abläufen die Flow Builder-APIs. Die Builder-Funktion flow
erstellt einen neuen Ablauf, in dem Sie mithilfe der Funktion emit
manuell neue Werte in den Datenstream ausgeben können.
Im folgenden Beispiel ruft eine Datenquelle die neuesten Nachrichten automatisch in einem festen Intervall ab. Da eine Anhalten-Funktion nicht mehrere aufeinanderfolgende Werte zurückgeben kann, erstellt und gibt die Datenquelle einen Ablauf zurück, um diese Anforderung zu erfüllen. In diesem Fall fungiert die Datenquelle als Ersteller.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Der flow
-Builder wird innerhalb einer Koroutine ausgeführt. Daher werden dieselben asynchronen APIs verwendet, es gelten jedoch einige Einschränkungen:
- Abläufe sind sequentiell. Da sich der Producer in einer Koroutine befindet, hält er beim Aufrufen einer Anhaltenfunktion den Vorgang an, bis die Funktion wieder aktiviert wird. Im Beispiel hält der Producer an, bis die
fetchLatestNews
-Netzwerkanfrage abgeschlossen ist. Nur dann wird das Ergebnis an den Stream gesendet. - Mit dem
flow
-Builder kann der Ersteller keine Werte aus einem anderenCoroutineContext
emit
hinzufügen. Rufen Sieemit
daher nicht in einem anderenCoroutineContext
auf. Erstellen Sie dazu neue Koroutinen oder verwenden SiewithContext
-Codeblöcke. In diesen Fällen können Sie andere Ablauferstellungstools wiecallbackFlow
verwenden.
Stream ändern
Mittler können Zwischenoperatoren verwenden, um den Datenstrom zu ändern, ohne die Werte zu verbrauchen. Diese Operatoren sind Funktionen, die bei Anwendung auf einen Datenstrom eine Kette von Vorgängen bilden, die erst dann ausgeführt werden, wenn die Werte in Zukunft verarbeitet werden. Weitere Informationen zu Operatoren mittlerer Stufe finden Sie in der Referenzdokumentation zum Ablauf.
Im folgenden Beispiel verwendet die Repository-Ebene den Zwischenoperator map
, um die Daten zu transformieren, die auf View
angezeigt werden sollen:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Zwischenoperatoren können nacheinander angewendet werden und bilden eine Kette von Vorgängen, die verzögert ausgeführt werden, wenn ein Element in den Ablauf übergeben wird. Beachten Sie, dass durch das Anwenden eines Zwischenoperators auf einen Stream nicht die Flusserfassung gestartet wird.
Daten aus einem Ablauf erfassen
Verwenden Sie einen Terminal-Operator, um den Ablauf auszulösen und mit dem Monitoring von Werten zu beginnen. Mit collect
können Sie alle Werte im Stream abrufen, sobald sie ausgegeben werden.
Weitere Informationen zu Terminaloperatoren finden Sie in der offiziellen Ablaufdokumentation.
Da collect
eine Anhaltenfunktion ist, muss sie innerhalb einer Koroutine ausgeführt werden. Sie verwendet eine Lambda-Funktion als Parameter, der bei jedem neuen Wert aufgerufen wird. Da es sich um eine Anhaltefunktion handelt, kann die Koroutine, die collect
aufruft, angehalten, bis der Ablauf geschlossen wird.
Ausgehend vom vorherigen Beispiel hier eine einfache Implementierung eines ViewModel
, der die Daten aus der Repository-Ebene verwendet:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Das Erfassen des Datenflusses löst den Producer aus, der die neuesten Nachrichten aktualisiert und das Ergebnis der Netzwerkanfrage in einem festen Intervall ausgibt. Da der Producer immer mit der while(true)
-Schleife aktiv bleibt, wird der Datenstrom geschlossen, wenn ViewModel gelöscht und viewModelScope
abgebrochen wird.
Die Ablauferfassung kann aus folgenden Gründen beendet werden:
- Die gesammelte Koroutine wird abgebrochen, wie im vorherigen Beispiel gezeigt. Dadurch wird auch der zugrunde liegende Producer gestoppt.
- Der Produzent schließt die Ausgabe der Elemente ab. In diesem Fall wird der Datenstrom geschlossen und die Koroutine, die
collect
aufgerufen hat, setzt die Ausführung fort.
Abläufe sind kalt und lazy, sofern nicht mit anderen Zwischenoperatoren angegeben. Das bedeutet, dass der Producer-Code jedes Mal ausgeführt wird, wenn ein Terminaloperator im Ablauf aufgerufen wird. Im vorherigen Beispiel führt die Datenquelle mit mehreren Fluss-Collectors dazu, dass die Datenquelle die neuesten Nachrichten mehrmals in verschiedenen festen Intervallen abruft. Verwenden Sie den Operator shareIn
, um einen Ablauf zu optimieren und freizugeben, wenn mehrere Nutzer gleichzeitig Daten erheben.
Unerwartete Ausnahmen abfangen
Die Implementierung des Producers kann aus der Bibliothek eines Drittanbieters stammen.
Das bedeutet, dass unerwartete Ausnahmen ausgelöst werden können. Verwenden Sie den Zwischenoperator catch
, um diese Ausnahmen zu verarbeiten.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Wenn im vorherigen Beispiel eine Ausnahme auftritt, wird das Lambda collect
nicht aufgerufen, da kein neues Element empfangen wurde.
catch
kann auch Elemente im Ablauf emit
. Die Beispiel-Repository-Ebene könnte stattdessen die im Cache gespeicherten Werte mit emit
versehen:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
In diesem Beispiel wird beim Auftreten einer Ausnahme das Lambda collect
aufgerufen, da aufgrund der Ausnahme ein neues Element an den Stream ausgegeben wurde.
In einem anderen CoroutineContext ausführen
Standardmäßig wird der Ersteller eines flow
-Builders in der CoroutineContext
der Koroutine ausgeführt, die daraus erfasst. Wie bereits erwähnt, kann er keine Werte aus einem anderen CoroutineContext
-Wert emit
. Dies ist möglicherweise nicht erwünscht.
In den Beispielen, die in diesem Thema verwendet werden, sollte die Repository-Ebene beispielsweise keine Vorgänge für Dispatchers.Main
ausführen, die von viewModelScope
verwendet werden.
Verwenden Sie zum Ändern der CoroutineContext
eines Ablaufs den Zwischenoperator flowOn
.
flowOn
ändert den CoroutineContext
des vorgelagerten Ablaufs, d. h. den Producer und alle Zwischenoperatoren, die vor (oder darüber)
flowOn
angewendet werden. Der nachgelagerte Ablauf (die Zwischenoperatoren nach flowOn
und der Nutzer) ist nicht betroffen und wird vom CoroutineContext
bis collect
ausgeführt. Wenn es mehrere flowOn
-Operatoren gibt, ändert jeder den Upstream gegenüber seinem aktuellen Standort.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Mit diesem Code verwenden die Operatoren onEach
und map
den defaultDispatcher
. Der Operator catch
und der Nutzer werden hingegen auf Dispatchers.Main
ausgeführt, das von viewModelScope
verwendet wird.
Da die Datenquellenebene für E/A-Vorgänge zuständig ist, sollten Sie einen Disponenten verwenden, der für E/A-Vorgänge optimiert ist:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Abläufe in Jetpack-Bibliotheken
Flow ist in viele Jetpack-Bibliotheken eingebunden und unter Android-Drittanbieterbibliotheken beliebt. Flow eignet sich hervorragend für Live-Datenaktualisierungen und endlose Datenstreams.
Sie können Flow with Room verwenden, um über Änderungen in einer Datenbank benachrichtigt zu werden. Wenn Sie Datenzugriffsobjekte (Data Access Objects, DAO) verwenden, geben Sie den Typ Flow
zurück, um Liveaktualisierungen zu erhalten.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Jedes Mal, wenn eine Änderung in der Tabelle Example
erfolgt, wird eine neue Liste mit den neuen Elementen in der Datenbank ausgegeben.
Callback-basierte APIs in Abläufe umwandeln
callbackFlow
ist ein Ablauf-Builder, mit dem Sie Callback-basierte APIs in Abläufe umwandeln können.
Die Android APIs von Firebase Firestore verwenden beispielsweise Callbacks.
Mit dem folgenden Code können Sie diese APIs in Abläufe konvertieren und auf Firestore-Datenbankaktualisierungen warten:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
Im Gegensatz zum flow
-Builder erlaubt callbackFlow
die Ausgabe von Werten von einer anderen CoroutineContext
mit der send
-Funktion oder außerhalb einer Koroutine mit der trySend
-Funktion.
Intern verwendet callbackFlow
einen Kanal, der einer blockierenden Warteschlange konzeptionell ähnlich ist.
Ein Kanal wird mit einer capacity konfiguriert, also der maximalen Anzahl von Elementen, die zwischengespeichert werden können. Der in callbackFlow
erstellte Kanal hat eine Standardkapazität von 64 Elementen. Wenn Sie versuchen, einem vollständigen Kanal ein neues Element hinzuzufügen, hält send
den Producer an, bis ausreichend Platz für das neue Element vorhanden ist. offer
fügt das Element hingegen nicht dem Kanal hinzu und gibt sofort false
zurück.
Zusätzliche Ressourcen zum Ablauf
- Kotlin-Abläufe auf Android-Geräten testen
StateFlow
undSharedFlow
- Zusätzliche Ressourcen für Kotlin-Koroutinen und -Ablauf