In Koroutinen ist ein Ablauf ein Typ, der mehrere Werte ausgeben kann im Gegensatz zu Unterbrechungsfunktionen, die nur einen einzelnen Wert. Sie können beispielsweise einen Ablauf verwenden, um Aktualisierungen aus einer Datenbank.
Abläufe basieren auf Koroutinen und können mehrere Werte bereitstellen.
Ein Ablauf ist konzeptionell ein Datenstrom, der berechnet werden kann.
asynchron programmiert. Die ausgegebenen Werte müssen denselben Typ haben. Für
Beispiel: Ein Flow<Int>
ist ein Fluss, der ganzzahlige Werte ausgibt.
Ein Ablauf ist einem Iterator
sehr ähnlich, der eine Folge von
-Werte, verwendet aber Anhalten-Funktionen, um Werte zu erzeugen und zu verarbeiten.
asynchron programmiert. Das bedeutet zum Beispiel, dass der Fluss sicher eine
Netzwerkanfrage senden, um den nächsten Wert zu erzeugen, ohne den Hauptwert
Diskussions-Thread.
An Datenstreams sind drei Entitäten beteiligt:
- Ein Produzent erzeugt Daten, die dem Stream hinzugefügt werden. Dank der Koroutinen, Abläufe können Daten auch asynchron erzeugen.
- (Optional) Vermittler können die einzelnen Werte ändern, die in den oder den Stream selbst.
- Ein Nutzer bezieht die Werte aus dem Stream.
In Android ist ein Repository in der Regel ein Erzeuger von UI-Daten, bei denen die Benutzeroberfläche (UI) als Consumer verwendet wird auf dem die Daten schließlich angezeigt werden. In anderen Fällen erzeugt die UI-Ebene Nutzereingabeereignisse und andere Hierarchieebenen verarbeiten sie. Ebenen in zwischen Hersteller und Verbraucher als Vermittler fungieren, die die Datenstrom, um ihn an die Anforderungen der folgenden Schicht anzupassen.
Ablauf erstellen
Verwenden Sie zum Erstellen von Abläufen die
Flow Builder
APIs Mit der Builder-Funktion flow
wird ein neuer Ablauf erstellt, in dem Sie
neue Werte mit der Methode
emit
.
Im folgenden Beispiel ruft eine Datenquelle die aktuellen Nachrichten ab automatisch in einem festen Intervall. Da eine Sperrfunktion mehrere aufeinanderfolgende Werte zurückgeben, erstellt die Datenquelle um diese Anforderung zu erfüllen. In diesem Fall agiert die Datenquelle als Produzent.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Der flow
-Builder wird innerhalb einer Koroutine ausgeführt. Daher profitieren Sie
aus den gleichen asynchronen APIs. Es gelten jedoch einige Einschränkungen:
- Abläufe sind sequentiell. Da sich der Produzent in einer gemeinsamen Routine befindet, kann beim Aufruf von
eine Unterbrechungsfunktion hat, hält der Producer an, bis die Unterbrechungsfunktion aktiviert ist.
Rücksendungen. In diesem Beispiel sperrt der Ersteller das Video bis zum
fetchLatestNews
. abgeschlossen ist. Nur dann wird das Ergebnis an den Stream ausgegeben. - Mit dem
flow
Builder kann der Ersteller keineemit
Werte aus einem unterschiedlichenCoroutineContext
. Rufen Sieemit
daher nicht in einem anderenCoroutineContext
durch Erstellen neuer Koroutinen oder mithilfe vonwithContext
Codeblöcken. Sie können andere Ablauf-Builder verwenden,callbackFlow
in diesen Fällen.
Stream ändern
Vermittler können Zwischenoperatoren verwenden, um den Stream von ohne die Werte zu verbrauchen. Diese Operatoren sind Funktionen, auf einen Datenstrom angewendet, richten Sie eine Kette von Vorgängen ein, bis die Werte in der Zukunft verbraucht sind. Weitere Informationen über Zwischenoperatoren in den Referenzdokumentation zum Ablauf
Im folgenden Beispiel verwendet die Repository-Ebene den Zwischenoperator
map
zum Transformieren der Daten, die im View
angezeigt werden sollen:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Zwischenoperatoren können nacheinander angewendet werden und so eine Kette bilden von Vorgängen, die verzögert ausgeführt werden, wenn ein Element im Ablauf. Wenn Sie einen Zwischenoperator auf einen Stream anwenden, nicht mit der Erfassung des Ablaufs beginnen.
Aus einem Ablauf erfassen
Verwenden Sie einen Terminaloperator, um den Ablauf auszulösen, auf den gewartet werden soll
Werte. Um alle Werte im Stream zu erhalten, sobald sie ausgegeben wurden, verwenden Sie
collect
Weitere Informationen zu Terminaloperatoren finden Sie in der
offizielle Ablaufdokumentation.
Da collect
eine Beendigungsfunktion ist, muss sie in folgendem Zeitraum ausgeführt werden:
eine Koroutine. Dabei wird eine Lambda-Funktion
als Parameter verwendet, der
für jeden neuen Wert. Da es sich um eine Anhalten-Funktion handelt,
ruft collect
möglicherweise an, bis der Ablauf geschlossen wird.
Ausgehend vom vorherigen Beispiel ist hier eine einfache Implementierung eines
einen ViewModel
, der die Daten aus der Repository-Ebene nutzt:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Beim Erfassen des Ablaufs aktualisiert der Produzent die neuesten Nachrichten.
und gibt das Ergebnis der Netzwerkanfrage in einem festen Intervall aus. Da die
bleibt mit der while(true)
-Schleife immer aktiv, da der Stream
der Daten werden geschlossen, wenn ViewModel gelöscht wird und
viewModelScope
wurde abgesagt.
Die Ablauferfassung kann aus folgenden Gründen beendet werden:
- Die Koroutine, die erfasst wird, wird abgebrochen, wie im vorherigen Beispiel gezeigt. Dadurch wird auch der zugrunde liegende Producer gestoppt.
- Der Produzent beendet die Ausgabe von Elementen. In diesem Fall ist der Datenstrom
geschlossen und die Koroutine
collect
setzt die Ausführung fort.
Datenflüsse sind kalt und verzögert, sofern nicht mit einer anderen Zwischenstufe angegeben
. Das bedeutet, dass der Producer-Code jedes Mal ausgeführt wird, wenn ein
wird im Ablauf aufgerufen. Im vorherigen Beispiel
führen mehrere Flow-Collectors dazu, dass die Datenquelle
die neuesten Nachrichten mehrmals in verschiedenen festen Intervallen. Zur Optimierung und
einen Ablauf teilen, wenn mehrere Nutzer gleichzeitig Daten erheben, verwenden Sie
shareIn
-Operator.
Unerwartete Ausnahmen abfangen
Die Implementierung des Erstellers kann aus der Bibliothek eines Drittanbieters stammen.
Dies bedeutet, dass unerwartete Ausnahmen ausgelöst werden können. Um diese Probleme zu verarbeiten,
verwenden Sie die Methode
catch
den Zwischenoperator.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Im vorherigen Beispiel tritt bei einer Ausnahme der Fehler collect
auf.
Lambda wird nicht aufgerufen, da kein neues Element empfangen wurde.
catch
kann auch Elemente in den Ablauf emit
aufnehmen. Das Beispiel-Repository
kann die Ebene stattdessen die im Cache gespeicherten Werte emit
:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
In diesem Beispiel wird bei einer Ausnahme das Lambda collect
aufgerufen, da aufgrund des Ereignisses
Ausnahme.
Wird in einem anderen CoroutineContext ausgeführt
Standardmäßig wird der Ersteller eines flow
-Builders in der
CoroutineContext
der Koroutine, die daraus erfasst wird, und
kann nicht emit
Werte von einer anderen
CoroutineContext
. In einigen Fällen kann dieses Verhalten unerwünscht sein.
In den Beispielen, die in diesem Thema verwendet werden,
Ebene sollte auf Dispatchers.Main
keine Vorgänge ausführen, die
wird von viewModelScope
verwendet.
Mit dem Zwischenoperator können Sie den CoroutineContext
eines Ablaufs ändern
flowOn
flowOn
ändert CoroutineContext
des Upstream-Ablaufs, was bedeutet,
der Hersteller und alle zwischengeschalteten Betreiber, die vor (oder höher) angewendet wurden
flowOn
Den nachgelagerten Ablauf (die Zwischenoperatoren nach flowOn
)
zusammen mit dem Kunden) ist nicht betroffen und wird auf der
CoroutineContext
hat collect
aus dem Ablauf verwendet. Wenn es
mit mehreren flowOn
-Operatoren, wobei jeder den vorgelagerten Wert von seinem
aktuellen Standort
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Mit diesem Code verwenden die Operatoren onEach
und map
das defaultDispatcher
,
während der catch
-Operator und der Consumer auf dem
Dispatchers.Main
wird von viewModelScope
verwendet.
Da auf der Datenquellenebene die E/A-Vorgänge ausgeführt werden, sollten Sie einen Dispatcher verwenden. das für E/A-Vorgänge optimiert ist:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Abläufe in Jetpack-Bibliotheken
Flow ist in viele Jetpack-Bibliotheken integriert und ist bei Android-Bibliotheken von Drittanbietern. Der Flow eignet sich hervorragend für Live-Datenaktualisierungen. und endlose Datenströme.
Sie können
Flow mit Raum
um über Änderungen
in einer Datenbank informiert zu werden. Bei Verwendung
Datenzugriffsobjekte (Data Access Objects, DAO)
einen Flow
-Typ zurückgeben, um Live-Updates zu erhalten.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Bei jeder Änderung in der Tabelle Example
wird eine neue Liste ausgegeben.
mit den neuen Elementen
in der Datenbank.
Callback-basierte APIs in Abläufe konvertieren
callbackFlow
ist ein Ablauf-Builder, mit dem Sie Callback-basierte APIs in Abläufe konvertieren können.
Der Firebase Firestore
Android APIs verwenden Callbacks.
Um diese APIs in Abläufe zu konvertieren und auf Firestore-Datenbankaktualisierungen zu warten, können Sie den folgenden Code verwenden:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
Im Gegensatz zum flow
-Builder ist callbackFlow
erlaubt die Ausgabe von Werten aus einem anderen CoroutineContext
mit der
send
oder außerhalb einer Koroutine mit der
trySend
.
Intern verwendet callbackFlow
einen
channel,
das konzeptionell mit einer Blockierfunktion
Warteschlange.
Ein Kanal wird mit einer Kapazität, der maximalen Anzahl von Elementen, konfiguriert
die gepuffert werden kann. Für den in callbackFlow
erstellten Kanal gilt eine Standardeinstellung
von 64 Elementen. Wenn Sie versuchen, ein neues Element
sperrt send
den Produzenten, bis Platz für den neuen Kanal
-Element, während trySend
das Element nicht zum Channel hinzufügt und es zurückgibt,
false
sofort.
trySend
fügt das angegebene Element sofort dem Kanal hinzu.
wenn dies nicht gegen die Kapazitätsbeschränkungen verstößt, und dann
erfolgreiches Ergebnis.
Zusätzliche Ablaufressourcen
- Kotlin-Datenflüsse unter Android testen
StateFlow
undSharedFlow
- Zusätzliche Ressourcen für Kotlin-Koroutinen und -Ablauf