Nelle coroutine, un flusso è un tipo che può emettere più valori in sequenza, a differenza delle funzioni di sospensione che restituiscono solo un singolo valore. Ad esempio, puoi utilizzare un flusso per ricevere da un database.
I flussi si basano sulle coroutine e possono fornire più valori.
Un flusso è concettualmente un flusso di dati che può essere calcolato
in modo asincrono. I valori emessi devono essere dello stesso tipo. Per
Ad esempio, Flow<Int>
è un flusso che emette valori interi.
Un flusso è molto simile a un Iterator
che produce una sequenza di
ma utilizza le funzioni di sospensione per produrre e utilizzare valori
in modo asincrono. Ciò significa, ad esempio, che il flusso può effettuare in sicurezza
richiesta di rete per produrre il valore successivo senza bloccare il
.
Nei flussi di dati sono coinvolte tre entità:
- Un producer produce i dati che vengono aggiunti allo stream. Grazie a coroutine, i flussi possono anche produrre dati in modo asincrono.
- (Facoltativo) Gli intermediari possono modificare ciascun valore emesso nel o lo stream stesso.
- Un consumer consuma i valori dello stream.
In Android, un repository viene di solito un produttore di dati che ha l'interfaccia utente (UI) come consumer che mostra i dati. Altre volte, il livello UI è un produttore che vengono consumati da eventi di input utente e da altri livelli della gerarchia. Livelli in tra il produttore e il consumatore di solito agiscono da intermediari che modificano il di dati per adattarli ai requisiti del livello successivo.
Creazione di un flusso
Per creare flussi, utilizza
flow builder
su quelle di livello inferiore. La funzione di creazione di flow
crea un nuovo flusso in cui puoi eseguire manualmente
emettono nuovi valori nel flusso di dati utilizzando
emit
personalizzata.
Nell'esempio seguente, un'origine dati recupera le ultime notizie automaticamente a intervalli fissi. Poiché una funzione di sospensione non può se restituiscono più valori consecutivi, l'origine dati crea e restituisce un flusso per soddisfare questo requisito. In questo caso, l'origine dati agisce in qualità di produttore.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Il builder di flow
viene eseguito all'interno di una coroutine. Di conseguenza,
dalle stesse API asincrone, ma vengono applicate alcune restrizioni:
- I flussi sono sequenziali. Come il produttore è in una coroutine, quando chiama
una funzione di sospensione, il producer sospende fino a quando la funzione di sospensione
i resi. Nell'esempio, il producer sospende fino a
fetchLatestNews
la richiesta di rete viene completata. Solo in questo caso il risultato viene emesso nello stream. - Con il builder
flow
, il producer non puòemit
valori da unCoroutineContext
diversi. Pertanto, non chiamareemit
in un'altraCoroutineContext
creando nuove coroutine o utilizzandowithContext
blocchi di codice. Puoi utilizzare altri generatori di flussi comecallbackFlow
in questi casi.
Modifica dello stream
Gli intermediari possono utilizzare operatori intermedi per modificare il flusso di dati senza consumare i valori. Questi operatori sono funzioni che, quando applicati a un flusso di dati, configurare una catena di operazioni che non siano finché i valori non vengono consumati in futuro. Scopri di più su operatori intermedi nel Documentazione di riferimento sui flussi.
Nell'esempio seguente, il livello di repository utilizza l'operatore intermedio
map
per trasformare i dati da visualizzare su View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Gli operatori intermedi possono essere applicati uno dopo l'altro, formando una catena di operazioni che vengono eseguite lentamente quando un elemento viene emesso flusso di lavoro. Tieni presente che la semplice applicazione di un operatore intermedio a uno stream non avviare la raccolta del flusso.
Raccolta da un flusso
Utilizza un operatore del terminale per attivare il flusso e iniziare l'ascolto
e i relativi valori. Per ottenere tutti i valori nel flusso man mano che vengono emessi, utilizza
collect
Puoi scoprire di più sugli operatori dei terminali nel
documentazione ufficiale sul flusso.
Poiché collect
è una funzione di sospensione, deve essere eseguita all'interno
una coroutine. Prende un parametro lambda chiamato
ogni nuovo valore. Poiché è una funzione di sospensione, la coroutina che
Le chiamate collect
possono essere sospese fino alla chiusura del flusso.
Continuando con l'esempio precedente, di seguito è riportata una semplice implementazione
un'ViewModel
che utilizza i dati del livello di repository:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
La raccolta del flusso attiva il produttore che aggiorna le ultime notizie
ed emette il risultato della richiesta di rete in base a un intervallo fisso. Come
producer rimane sempre attivo con il loop while(true)
, lo stream
di dati verranno chiusi quando il ViewModel viene cancellato
viewModelScope
annullato.
La raccolta dei flussi può interrompersi per i seguenti motivi:
- La coroutina che viene raccolta viene annullata, come mostrato nell'esempio precedente. In questo modo viene interrotto anche il producer sottostante.
- Il producer ha finito di emettere articoli. In questo caso, il flusso di dati
è stata chiusa e la coroutine che ha chiamato
collect
riprende l'esecuzione.
I flussi sono freddi e lazy a meno che non siano specificati con altri intermedi
. Ciò significa che il codice producer viene eseguito ogni volta
l'operatore del terminale viene chiamato nel flusso. Nell'esempio precedente,
avere più collettori di flusso fa sì che l'origine dati recuperi
le ultime notizie più volte a intervalli fissi differenti. Per ottimizzare
per condividere un flusso quando più consumatori raccolgono contemporaneamente, usa
Operatore shareIn
.
Rilevamento di eccezioni impreviste
L'implementazione del produttore può provenire da una libreria di terze parti.
Ciò significa che può generare eccezioni impreviste. Per gestire questi aspetti,
utilizza
catch
operatore intermedio.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Nell'esempio precedente, quando si verifica un'eccezione, collect
lambda non viene chiamata perché non è stato ricevuto un nuovo elemento.
catch
può anche emit
elementi nel flusso. Il repository di esempio
livello potrebbe emit
i valori memorizzati nella cache invece:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
In questo esempio, quando si verifica un'eccezione, il parametro lambda collect
viene
chiamata, poiché un nuovo elemento è stato emesso nello stream a causa
.
Esecuzione in un contesto Coroutine diverso
Per impostazione predefinita, il producer di un builder flow
viene eseguito nel
CoroutineContext
della coroutina che la raccoglie e come
precedentemente accennato, non può emit
valori provenienti da un
CoroutineContext
. Questo comportamento potrebbe essere indesiderato in alcuni casi.
Ad esempio, negli esempi utilizzati in questo argomento, il repository
non deve eseguire operazioni su Dispatchers.Main
che
è utilizzato da viewModelScope
.
Per modificare CoroutineContext
di un flusso, utilizza l'operatore intermedio
flowOn
flowOn
cambia il CoroutineContext
del flusso upstream, il che significa
il produttore ed eventuali operatori intermedi applicati prima (o sopra)
flowOn
. Il flusso downstream (gli operatori intermedi dopo flowOn
insieme al consumatore) non è interessata e procede sulla
CoroutineContext
utilizzato su collect
dal flusso. Se ci sono
più operatori flowOn
, ciascuno modifica l'upstream rispetto al rispettivo
posizione corrente.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Con questo codice, gli operatori onEach
e map
utilizzano i valori defaultDispatcher
,
mentre l'operatore catch
e il consumatore vengono eseguiti
Dispatchers.Main
utilizzato da viewModelScope
.
Poiché il livello dell'origine dati esegue il lavoro di I/O, devi utilizzare un supervisore ottimizzato per le operazioni di I/O:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Flussi nelle librerie Jetpack
Flow è integrato in molte librerie Jetpack ed è popolare tra Librerie di terze parti di Android. Flow è un'ottima soluzione per gli aggiornamenti dei dati in tempo reale e infiniti flussi di dati.
Puoi utilizzare
Flusso con spazio
ricevere notifiche sulle modifiche apportate a un database. Quando si utilizza
oggetti di accesso ai dati (DAO),
restituisce un tipo Flow
per ricevere aggiornamenti in tempo reale.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Ogni volta che viene apportata una modifica alla tabella Example
, viene emesso un nuovo elenco
con i nuovi elementi nel database.
Converti le API basate su callback in flussi
callbackFlow
è un generatore di flussi che consente di convertire in flussi le API basate su callback.
Ad esempio, il database Firebase Firestore
Le API Android utilizzano i callback.
Per convertire queste API in flussi e ascoltare gli aggiornamenti del database Firestore, puoi usare il seguente codice:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
A differenza dello strumento per la creazione di flow
, callbackFlow
consente di emettere valori da un CoroutineContext
diverso con il valore
send
funzione o al di fuori di una coroutina con
trySend
personalizzata.
Internamente, callbackFlow
utilizza un
canale,
che è concettualmente molto simile a un blocco
coda.
Un canale è configurato con una capacità, ovvero il numero massimo di elementi
che possono essere memorizzate nel buffer. Il canale creato in callbackFlow
ha un'impostazione predefinita
con una capacità di 64 elementi. Quando provi ad aggiungere un nuovo elemento a un
canale, send
sospende il produttore finché non sarà disponibile spazio per il nuovo
mentre trySend
non aggiunge l'elemento al canale e restituisce
false
immediatamente.
trySend
aggiunge immediatamente l'elemento specificato al canale,
solo se non viola i limiti di capacità, quindi restituisce
un risultato positivo.
Risorse di flusso aggiuntive
- Testare i flussi Kotlin su Android
StateFlow
eSharedFlow
- Risorse aggiuntive per coroutine e flussi di Kotlin