The way you test units or modules that communicate with flow depends on whether the subject under test uses the flow as input or output.
- If the subject under test observes a flow, you can generate flows within fake dependencies that you can control from tests.
- If the unit or module exposes a flow, you can read and verify one or multiple items emitted by a flow in the test.
Creating a fake producer
When the subject under test is a consumer of a flow, one common way to test it is by replacing the producer with a fake implementation. For example, given a class that observes a repository that takes data from two data sources in production:

To make the test deterministic, you can replace the repository and its dependencies with a fake repository that always emits the same fake data:

To emit a predefined series of values in a flow, use the flow
builder:
class MyFakeRepository : MyRepository {
fun observeCount() = flow {
emit(ITEM_1)
}
}
In the test, this fake repository is injected, replacing the real implementation:
@Test
fun myTest() {
// Given a class with fake dependencies:
val sut = MyUnitUnderTest(MyFakeRepository())
// Trigger and verify
...
}
Now that you have control over the outputs of the subject under test, you can verify that it works correctly by checking its output.
Asserting flow emissions in a test
If the subject under test is exposing a flow, the test needs to make assertions on the elements of the data stream.
Let's assume that the previous example's repository exposes a flow:

Depending on the needs of the test, you'll typically check the first emission or a finite number of items coming from the flow.
You can consume the first emission to the flow by calling first()
. This
function waits until the first item is received and then sends the
cancellation signal to the producer.
@Test
fun myRepositoryTest() = runBlocking {
// Given a repository that combines values from two data sources:
val repository = MyRepository(fakeSource1, fakeSource2)
// When the repository emits a value
val firstItem = repository.counter.first() // Returns the first item in the flow
// Then check it's the expected item
assertThat(firstItem, isEqualTo(ITEM_1) // Using AssertJ
}
If the test needs to check multiple values, calling toList()
causes the
flow to wait for the source to emit all its values and then returns those
values as a list. Note that this works only for finite data streams.
@Test
fun myRepositoryTest() = runBlocking {
// Given a repository with a fake data source that emits ALL_MESSAGES
val messages = repository.observeChatMessages().toList()
// When all messages are emitted then they should be ALL_MESSAGES
assertThat(messages, isEqualTo(ALL_MESSAGES))
}
For data streams that require a more complex collection of items or
that don't return a finite number of items, you can use the Flow
API
to pick and transform items. Here are some examples:
// Take the second item
outputFlow.drop(1).first()
// Take the first 5 items
outputFlow.take(5).toList()
// Take the first 5 distinct items
outputFlow.take(5).toSet()
// Take the first 2 items matching a predicate
outputFlow.takeWhile(predicate).take(2).toList()
// Take the first item that matches the predicate
outputFlow.firstWhile(predicate)
// Take 5 items and apply a transformation to each
outputFlow.map(transformation).take(5)
// Takes the first item verifying that the flow is closed after that
outputFlow.single()
// Finite data streams
// Verify that the flow emits exactly N elements (optional predicate)
outputFlow.count()
outputFlow.count(predicate)
CoroutineDispatcher as a dependency
If the subject under test takes a CoroutineDispatcher
as a
dependency, pass an instance of
TestCoroutineDispatcher
from the
kotlinx-coroutines-test
library,
and execute the test body within the runBlockingTest
method of
that dispatcher:
private val coroutineDispatcher = TestCoroutineDispatcher()
private val uut = MyUnitUnderTest(coroutineDispatcher)
@Test
fun myTest() = coroutineDispatcher.runBlockingTest {
// Test body
}