Kotlin è disponibile su Android

Nelle coroutine, un flusso è un tipo che può emettere più valori in sequenza, contrariamente a sospendere le funzioni che restituiscono un solo valore. Ad esempio, puoi usare un flusso per ricevere aggiornamenti in tempo reale da un database.

I flussi vengono creati sulle coroutine e possono fornire più valori. Concettualmente, un flusso è un flusso di dati che può essere calcolato in modo asincrono. I valori emessi devono essere dello stesso tipo. Ad esempio, Flow<Int> è un flusso che emette valori interi.

Un flusso è molto simile a un Iterator che produce una sequenza di valori, ma utilizza le funzioni di sospensione per produrre e consumare valori in modo asincrono. Ciò significa, ad esempio, che il flusso può effettuare in modo sicuro una richiesta di rete per produrre il valore successivo senza bloccare il thread principale.

Nei flussi di dati sono coinvolte tre entità:

  • Un produttore produce dati che vengono aggiunti allo stream. Grazie alle coroutine, i flussi possono anche produrre dati in modo asincrono.
  • (Facoltativo) Gli intermediari possono modificare ogni valore emesso nel stream o nello stream stesso.
  • Un consumatore consuma i valori dello stream.

entità coinvolte nei flussi di dati: consumatori, intermediari facoltativi e produttori
Figura 1. Persone giuridiche coinvolte nei flussi di dati: consumatore, intermediari facoltativi e produttore.

In Android, un repository è in genere un produttore di dati dell'interfaccia utente che ha l'interfaccia utente (UI) come consumatore che visualizza i dati. Altre volte, il livello UI è un produttore di eventi di input utente e gli altri livelli della gerarchia li utilizzano. I livelli tra il produttore e il consumatore di solito agiscono da intermediari che modificano il flusso di dati per adeguarlo ai requisiti del livello successivo.

Creazione di un flusso

Per creare i flussi, utilizza le API del generatore di flussi. La funzione del generatore flow crea un nuovo flusso in cui puoi emettere manualmente nuovi valori nel flusso di dati utilizzando la funzione emit.

Nell'esempio seguente, un'origine dati recupera automaticamente le ultime notizie a intervalli fissi. Poiché una funzione di sospensione non può restituire più valori consecutivi, l'origine dati crea e restituisce un flusso per soddisfare questo requisito. In questo caso, l'origine dati agisce come producer.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Il builder flow viene eseguito all'interno di una coroutine. Di conseguenza, trae vantaggio dalle stesse API asincrone, ma vengono applicate alcune limitazioni:

  • I flussi sono sequenziali. Poiché il producer si trova in una coroutine, quando chiama una funzione di sospensione, il producer esegue la sospensione finché non viene restituita la funzione di sospensione. Nell'esempio, il producer esegue la sospensione fino al completamento della richiesta di rete fetchLatestNews. Solo allora il risultato viene emesso nello stream.
  • Con il builder flow, il producer non può emit valori di CoroutineContext diversi. Pertanto, non chiamare emit in un altro CoroutineContext creando nuove coroutine o utilizzando withContext blocchi di codice. In questi casi, puoi utilizzare altri generatori di flussi, come callbackFlow.

Modifica del flusso

Gli intermediari possono utilizzare operatori intermedi per modificare il flusso di dati senza consumare i valori. Questi operatori sono funzioni che, se applicati a un flusso di dati, configurano una catena di operazioni che non vengono eseguite finché i valori non vengono consumati in futuro. Scopri di più sugli operatori intermedi nella documentazione di riferimento di Flow.

Nell'esempio riportato di seguito, il livello di repository utilizza l'operatore intermedio map per trasformare i dati da visualizzare sul View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Gli operatori intermedi possono essere applicati uno dopo l'altro, formando una catena di operazioni che vengono eseguite pigramente quando un elemento viene emesso nel flusso. Tieni presente che la semplice applicazione di un operatore intermedio a un flusso non avvia la raccolta dei flussi.

Raccolta da un flusso

Utilizza un operatore del terminale per attivare il flusso per iniziare l'ascolto dei valori. Per visualizzare tutti i valori nello stream man mano che vengono emessi, utilizza collect. Puoi scoprire di più sugli operatori di terminal nella documentazione ufficiale sui flussi.

Poiché collect è una funzione di sospensione, deve essere eseguita all'interno di una coroutine. Prende come parametro lambda che viene richiamato per ogni nuovo valore. Poiché si tratta di una funzione di sospensione, la coroutina che chiama collect potrebbe essere sospesa fino alla chiusura del flusso.

Proseguendo con l'esempio precedente, ecco una semplice implementazione di ViewModel che utilizza i dati dal livello di repository:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

La raccolta del flusso attiva il produttore che aggiorna le ultime notizie ed emette il risultato della richiesta di rete a intervalli fissi. Poiché il producer rimane sempre attivo con il loop while(true), il flusso di dati viene chiuso quando il ViewModel viene cancellato e viewModelScope viene annullato.

La raccolta dei flussi può interrompersi per i seguenti motivi:

  • La coroutine raccolta viene annullata, come mostrato nell'esempio precedente. Questa operazione interrompe anche il producer sottostante.
  • Il produttore finisce di emettere oggetti. In questo caso, il flusso di dati viene chiuso e la coroutine che ha chiamato collect riprende l'esecuzione.

I flussi sono freddi e pigri a meno che non siano specificati con altri operatori intermedi. Ciò significa che il codice producer viene eseguito ogni volta che viene richiamato un operatore del terminale sul flusso. Nell'esempio precedente, la presenza di più raccoglitori di flussi fa sì che l'origine dati recuperi più volte le notizie più recenti a intervalli fissi diversi. Per ottimizzare e condividere un flusso quando più consumatori raccolgono contemporaneamente, utilizza l'operatore shareIn.

Rilevamento di eccezioni impreviste

L'implementazione del producer può provenire da una libreria di terze parti. Ciò significa che potrebbero essere generate eccezioni impreviste. Per gestire queste eccezioni, utilizza l'operatore intermedio catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Nell'esempio precedente, quando si verifica un'eccezione, la funzione lambda collect non viene chiamata perché non è stato ricevuto un nuovo elemento.

catch può anche emit elementi nel flusso. Il livello repository di esempio potrebbe emit invece i valori memorizzati nella cache:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

In questo esempio, quando si verifica un'eccezione, viene richiamata la funzione lambda collect, poiché un nuovo elemento è stato emesso nel flusso a causa dell'eccezione.

Esecuzione in un CoroutineContext diverso

Per impostazione predefinita, il produttore di un generatore flow esegue l'esecuzione nella CoroutineContext della coroutine che lo raccoglie e, come accennato in precedenza, non può emit valori di CoroutineContext diversi. In alcuni casi, questo comportamento potrebbe essere indesiderato. Ad esempio, negli esempi utilizzati in questo argomento, il livello repository non dovrebbe eseguire operazioni su Dispatchers.Main utilizzato da viewModelScope.

Per modificare il CoroutineContext di un flusso, utilizza l'operatore intermedio flowOn. flowOn modifica il valore CoroutineContext del flusso a monte, ovvero il produttore e gli eventuali operatori intermedi applicati prima (o superiore) flowOn. Il flusso a valle (gli operatori intermedi dopo flowOn insieme al consumer) non è interessato ed viene eseguito sul CoroutineContext utilizzato per collect dal flusso. Se sono presenti più operatori flowOn, ognuno modifica l'upstream rispetto alla posizione corrente.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Con questo codice, gli operatori onEach e map utilizzano defaultDispatcher, mentre l'operatore catch e il consumer vengono eseguiti su Dispatchers.Main utilizzato da viewModelScope.

Poiché il livello dell'origine dati si occupa di I/O, devi utilizzare un supervisore ottimizzato per le operazioni di I/O:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Flussi nelle librerie Jetpack

Flow è integrato in molte librerie Jetpack ed è popolare tra le librerie Android di terze parti. Flow è perfetto per gli aggiornamenti dei dati in tempo reale e per flussi infiniti di dati.

Puoi utilizzare Flusso con la stanza per ricevere notifiche delle modifiche in un database. Quando utilizzi oggetti di accesso ai dati (DAO), restituisci un tipo Flow per ricevere aggiornamenti in tempo reale.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Ogni volta che viene apportata una modifica alla tabella Example, viene generato un nuovo elenco con i nuovi elementi nel database.

Converti le API basate su callback in flussi

callbackFlow è un generatore di flussi che consente di convertire le API basate su callback in flussi. Ad esempio, le API Android di Firebase Firestore utilizzano i callback.

Per convertire queste API in flussi e ascoltare gli aggiornamenti del database Firestore, puoi utilizzare il codice seguente:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

A differenza del builder di flow, callbackFlow consente l'emissione di valori da un CoroutineContext diverso con la funzione send o al di fuori di una coroutine con la funzione trySend.

Internamente, callbackFlow utilizza un canale, concettualmente molto simile a una coda di blocco. Un canale viene configurato con una capacità, ossia il numero massimo di elementi che possono essere inseriti nel buffer. Il canale creato in callbackFlow ha una capacità predefinita di 64 elementi. Quando provi ad aggiungere un nuovo elemento a un canale completo, send sospende il producer finché non è disponibile spazio per il nuovo elemento, mentre offer non aggiunge l'elemento al canale e torna false immediatamente.

Risorse di flusso aggiuntive