Google is committed to advancing racial equity for Black communities. See how.

Testing Kotlin flows

The way you test units or modules that communicate with flow depends on whether the subject under test uses the flow as input or output.

  • If the subject under test observes a flow, you can generate flows within fake dependencies that you can control from tests.
  • If the unit or module exposes a flow, you can read and verify one or multiple items emitted by a flow in the test.

Creating a fake producer

When the subject under test is a consumer of a flow, one common way to test it is by replacing the producer with a fake implementation. For example, given a class that observes a repository that takes data from two data sources in production:

the subject under test and the data layer
Figure 1. The subject under test and the data layer.

To make the test deterministic, you can replace the repository and its dependencies with a fake repository that always emits the same fake data:

dependencies are replaced with a fake implementation
Figure 2. Dependencies are replaced with a fake implementation.

To emit a predefined series of values in a flow, use the flow builder:

class MyFakeRepository : MyRepository {
    fun observeCount() = flow {
        emit(ITEM_1)
    }
}

In the test, this fake repository is injected, replacing the real implementation:

@Test
fun myTest() {
    // Given a class with fake dependencies:
    val sut = MyUnitUnderTest(MyFakeRepository())
    // Trigger and verify
    ...
}

Now that you have control over the outputs of the subject under test, you can verify that it works correctly by checking its output.

Asserting flow emissions in a test

If the subject under test is exposing a flow, the test needs to make assertions on the elements of the data stream.

Let's assume that the previous example's repository exposes a flow:

repository with fake dependencies that exposes a flow
Figure 3. A repository (the subject under test) with fake dependencies that exposes a flow.

Depending on the needs of the test, you'll typically check the first emission or a finite number of items coming from the flow.

You can consume the first emission to the flow by calling first(). This function waits until the first item is received and then sends the cancellation signal to the producer.

@Test
fun myRepositoryTest() = runBlocking {
    // Given a repository that combines values from two data sources:
    val repository = MyRepository(fakeSource1, fakeSource2)

    // When the repository emits a value
    val firstItem = repository.counter.first() // Returns the first item in the flow

    // Then check it's the expected item
    assertThat(firstItem, isEqualTo(ITEM_1) // Using AssertJ
}

If the test needs to check multiple values, calling toList() causes the flow to wait for the source to emit all its values and then returns those values as a list. Note that this works only for finite data streams.

@Test
fun myRepositoryTest() = runBlocking {
    // Given a repository with a fake data source that emits ALL_MESSAGES
    val messages = repository.observeChatMessages().toList()

    // When all messages are emitted then they should be ALL_MESSAGES
    assertThat(messages, isEqualTo(ALL_MESSAGES))
}

For data streams that require a more complex collection of items or that don't return a finite number of items, you can use the Flow API to pick and transform items. Here are some examples:

// Take the second item
outputFlow.drop(1).first()

// Take the first 5 items
outputFlow.take(5).toList()

// Take the first 5 distinct items
outputFlow.take(5).toSet()

// Take the first 2 items matching a predicate
outputFlow.takeWhile(predicate).take(2).toList()

// Take the first item that matches the predicate
outputFlow.firstWhile(predicate)

// Take 5 items and apply a transformation to each
outputFlow.map(transformation).take(5)

// Takes the first item verifying that the flow is closed after that
outputFlow.single()

// Finite data streams
// Verify that the flow emits exactly N elements (optional predicate)
outputFlow.count()
outputFlow.count(predicate)

CoroutineDispatcher as a dependency

If the subject under test takes a CoroutineDispatcher as a dependency, pass an instance of TestCoroutineDispatcher from the kotlinx-coroutines-test library, and execute the test body within the runBlockingTest method of that dispatcher:

private val coroutineDispatcher = TestCoroutineDispatcher()
private val uut = MyUnitUnderTest(coroutineDispatcher)

@Test
fun myTest() = coroutineDispatcher.runBlockingTest {
    // Test body
}