In coroutines, a flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return only a single value. For example, you can use a flow to receive live updates from a database.
Flows are built on top of coroutines and can provide multiple values.
A flow is conceptually a stream of data that can be computed
asynchronously. The emitted values must be of the same type. For
example, a Flow<Int>
is a flow that emits integer values.
A flow is very similar to an Iterator
that produces a sequence of
values, but it uses suspend functions to produce and consume values
asynchronously. This means, for example, that the flow can safely make a
network request to produce the next value without blocking the main
thread.
There are three entities involved in streams of data:
- A producer produces data that is added to the stream. Thanks to coroutines, flows can also produce data asynchronously.
- (Optional) Intermediaries can modify each value emitted into the stream or the stream itself.
- A consumer consumes the values from the stream.
In Android, a repository is typically a producer of UI data that has the user interface (UI) as the consumer that ultimately displays the data. Other times, the UI layer is a producer of user input events and other layers of the hierarchy consume them. Layers in between the producer and consumer usually act as intermediaries that modify the stream of data to adjust it to the requirements of the following layer.
Creating a flow
To create flows, use the
flow builder
APIs. The flow
builder function creates a new flow where you can manually
emit new values into the stream of data using the
emit
function.
In the following example, a data source fetches the latest news automatically at a fixed interval. As a suspend function cannot return multiple consecutive values, the data source creates and returns a flow to fulfill this requirement. In this case, the data source acts as the producer.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
The flow
builder is executed within a coroutine. Thus, it benefits
from the same asynchronous APIs, but some restrictions apply:
- Flows are sequential. As the producer is in a coroutine, when calling
a suspend function, the producer suspends until the suspend function
returns. In the example, the producer suspends until the
fetchLatestNews
network request completes. Only then is the result emitted to the stream. - With the
flow
builder, the producer cannotemit
values from a differentCoroutineContext
. Therefore, don't callemit
in a differentCoroutineContext
by creating new coroutines or by usingwithContext
blocks of code. You can use other flow builders such ascallbackFlow
in these cases.
Modifying the stream
Intermediaries can use intermediate operators to modify the stream of data without consuming the values. These operators are functions that, when applied to a stream of data, set up a chain of operations that aren't executed until the values are consumed in the future. Learn more about intermediate operators in the Flow reference documentation.
In the example below, the repository layer uses the intermediate operator
map
to transform the data to be displayed on the View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Intermediate operators can be applied one after the other, forming a chain of operations that are executed lazily when an item is emitted into the flow. Note that simply applying an intermediate operator to a stream does not start the flow collection.
Collecting from a flow
Use a terminal operator to trigger the flow to start listening for
values. To get all the values in the stream as they're emitted, use
collect
.
You can learn more about terminal operators in the
official flow documentation.
As collect
is a suspend function, it needs to be executed within
a coroutine. It takes a lambda as a parameter that is called on
every new value. Since it's a suspend function, the coroutine that
calls collect
may suspend until the flow is closed.
Continuing the previous example, here's a simple implementation of
a ViewModel
consuming the data from the repository layer:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Collecting the flow triggers the producer that refreshes the latest news
and emits the result of the network request on a fixed interval. As the
producer remains always active with the while(true)
loop, the stream
of data will be closed when the ViewModel is cleared and
viewModelScope
is cancelled.
Flow collection can stop for the following reasons:
- The coroutine that collects is cancelled, as shown in the previous example. This also stops the underlying producer.
- The producer finishes emitting items. In this case, the stream of data
is closed and the coroutine that called
collect
resumes execution.
Flows are cold and lazy unless specified with other intermediate
operators. This means that the producer code is executed each time a
terminal operator is called on the flow. In the previous example,
having multiple flow collectors causes the data source to fetch the
latest news multiple times on different fixed intervals. To optimize and
share a flow when multiple consumers collect at the same time, use the
shareIn
operator.
Catching unexpected exceptions
The implementation of the producer can come from a third party library.
This means that it can throw unexpected exceptions. To handle these
exceptions, use the
catch
intermediate operator.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
In the previous example, when an exception occurs, the collect
lambda isn't called, as a new item hasn't been received.
catch
can also emit
items to the flow. The example repository
layer could emit
the cached values instead:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
In this example, when an exception occurs, the collect
lambda is
called, as a new item has been emitted to the stream because of the
exception.
Executing in a different CoroutineContext
By default, the producer of a flow
builder executes in the
CoroutineContext
of the coroutine that collects from it, and as
previously mentioned, it cannot emit
values from a different
CoroutineContext
. This behavior might be undesirable in some cases.
For instance, in the examples used throughout this topic, the repository
layer shouldn't be performing operations on Dispatchers.Main
that
is used by viewModelScope
.
To change the CoroutineContext
of a flow, use the intermediate operator
flowOn
.
flowOn
changes the CoroutineContext
of the upstream flow, meaning
the producer and any intermediate operators applied before (or above)
flowOn
. The downstream flow (the intermediate operators after flowOn
along with the consumer) is not affected and executes on the
CoroutineContext
used to collect
from the flow. If there are
multiple flowOn
operators, each one changes the upstream from its
current location.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
With this code, the onEach
and map
operators use the defaultDispatcher
,
whereas the catch
operator and the consumer are executed on
Dispatchers.Main
used by viewModelScope
.
As the data source layer is doing I/O work, you should use a dispatcher that is optimized for I/O operations:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Flows in Jetpack libraries
Flow is integrated into many Jetpack libraries, and it's popular among Android third party libraries. Flow is a great fit for live data updates and endless streams of data.
You can use
Flow with Room
to be notified of changes in a database. When using
data access objects (DAO),
return a Flow
type to get live updates.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Every time there's a change in the Example
table, a new list is emitted
with the new items in the database.
Convert callback-based APIs to flows
callbackFlow
is a flow builder that lets you convert callback-based APIs into flows.
As an example, the Firebase Firestore
Android APIs use callbacks.
To convert these APIs to flows and listen for Firestore database updates, you can use the following code:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
Unlike the flow
builder, callbackFlow
allows values to be emitted from a different CoroutineContext
with the
send
function or outside a coroutine with the
trySend
function.
Internally, callbackFlow
uses a
channel,
which is conceptually very similar to a blocking
queue.
A channel is configured with a capacity, the maximum number of elements
that can be buffered. The channel created in callbackFlow
has a default
capacity of 64 elements. When you try to add a new element to a full
channel, send
suspends the producer until there's space for the new
element, whereas trySend
does not add the element to the channel and returns
false
immediately.
trySend
immediately adds the specified element to the channel,
only if this doesn't violate its capacity restrictions, and then returns the
successful result.
Additional flow resources
- Testing Kotlin flows on Android
StateFlow
andSharedFlow
- Additional resources for Kotlin coroutines and flow