Flow Kotlin di Android

Dalam coroutine, flow adalah jenis yang dapat memunculkan beberapa nilai secara berurutan, bukan fungsi penangguhan yang hanya menampilkan satu nilai. Misalnya, Anda dapat menggunakan flow untuk menerima update langsung dari database.

Flow dibuat berdasarkan coroutine dan dapat memberikan beberapa nilai. Secara konseptual, flow adalah aliran data yang dapat dikomputasi secara asinkron. Nilai yang dimunculkan harus memiliki jenis yang sama. Misalnya, Flow<Int> adalah flow yang memunculkan nilai bilangan bulat.

Flow sangat mirip dengan Iterator yang menghasilkan urutan nilai, tetapi menggunakan fungsi penangguhan untuk menghasilkan dan memakai nilai secara asinkron. Ini berarti, misalnya, flow dapat dengan aman membuat permintaan jaringan untuk menghasilkan nilai berikutnya tanpa memblokir thread utama.

Ada tiga entity yang terlibat dalam aliran data:

  • Produser menghasilkan data yang ditambahkan ke aliran data. Berkat coroutine, flow juga dapat menghasilkan data secara asinkron.
  • (Opsional) Perantara dapat mengubah setiap nilai yang dimunculkan ke dalam aliran data atau aliran data itu sendiri.
  • Konsumen memakai nilai dari aliran data.

entity yang terlibat dalam aliran data; konsumen, perantara
              opsional, dan produser
Gambar 1. Entity yang terlibat dalam aliran data: konsumen, perantara opsional, dan produser.

Di Android, repositori biasanya merupakan produser data UI yang memiliki antarmuka pengguna (UI) sebagai konsumen yang pada akhirnya menampilkan data. Pada waktu lain, lapisan UI merupakan produser peristiwa input pengguna dan lapisan lain dari hierarki memakainya. Lapisan di antara produser dan konsumen biasanya berfungsi sebagai perantara yang mengubah aliran data untuk menyesuaikannya dengan persyaratan lapisan berikutnya.

Membuat flow

Untuk membuat flow, gunakan API flow builder Fungsi builder flow membuat flow baru tempat Anda dapat memunculkan nilai baru ke dalam aliran data secara manual menggunakan fungsi emit.

Pada contoh berikut, sumber data mengambil berita terbaru secara otomatis pada interval tetap. Karena fungsi penangguhan tidak dapat menampilkan beberapa nilai berturut-turut, sumber data akan membuat dan menampilkan flow untuk memenuhi persyaratan ini. Dalam hal ini, sumber data berfungsi sebagai produser.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Builder flow dieksekusi dalam coroutine. Dengan demikian, builder tersebut menerima manfaat dari API asinkron yang sama, tetapi beberapa pembatasan berlaku:

  • Flow bersifat berurutan. Karena produser berada dalam coroutine, saat memanggil fungsi penangguhan, produser akan ditangguhkan hingga fungsi penangguhan ditampilkan. Pada contoh tersebut, produser ditangguhkan hingga permintaan jaringan fetchLatestNews selesai. Baru setelah itu hasilnya akan dimunculkan ke aliran data.
  • Dengan builder flow, produser tidak dapat melakukan emit nilai dari CoroutineContext berbeda. Karena itu, jangan panggil emit dalam CoroutineContext yang berbeda dengan membuat coroutine baru atau dengan menggunakan blok kode withContext. Dalam kasus ini, Anda dapat menggunakan builder flow lain seperti callbackFlow.

Mengubah aliran data

Perantara dapat menggunakan operator perantara untuk mengubah aliran data tanpa memakai nilainya. Operator ini adalah fungsi yang, saat diterapkan ke aliran data, menyiapkan rantai operasi yang tidak dieksekusi hingga nilai dipakai di masa mendatang. Pelajari lebih lanjut operator perantara dalam Dokumentasi referensi flow.

Pada contoh di bawah ini, lapisan repositori menggunakan operator perantara map untuk mengubah data yang akan ditampilkan di View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Operator perantara dapat diterapkan satu per satu, dengan membentuk rantai operasi yang dieksekusi dengan lambat saat item dimunculkan ke dalam flow. Perlu diperhatikan bahwa hanya menerapkan operator perantara ke aliran data tidak akan memulai pengumpulan flow.

Mengumpulkan dari flow

Gunakan operator terminal untuk memicu flow guna mulai memproses nilai. Untuk mendapatkan semua nilai dalam aliran data saat dimunculkan, gunakan collect. Anda dapat mempelajari operator terminal lebih lanjut di dokumentasi flow resmi.

Sebagai fungsi penangguhan, collect perlu dieksekusi dalam coroutine. Diperlukan lambda sebagai parameter yang dipanggil pada setiap nilai baru. Karena merupakan fungsi penangguhan, coroutine yang memanggil collect dapat ditangguhkan hingga flow ditutup.

Melanjutkan contoh sebelumnya, berikut implementasi sederhana ViewModel yang memakai data dari lapisan repositori:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Pengumpulan flow akan memicu produser yang memperbarui berita terkini dan memunculkan hasil permintaan jaringan pada interval tetap. Karena produser selalu aktif dengan loop while(true), aliran data akan ditutup saat ViewModel dihapus dan viewModelScope dibatalkan.

Pengumpulan flow dapat dihentikan karena alasan berikut:

  • Coroutine yang mengumpulkan dibatalkan, seperti ditunjukkan pada contoh sebelumnya. Kondisi ini juga akan menghentikan produser yang mendasari.
  • Produser selesai memunculkan item. Dalam hal ini, aliran data ditutup dan coroutine yang memanggil collect melanjutkan eksekusi.

Flow bersifat cold dan lambat kecuali jika ditentukan dengan operator perantara lainnya. Ini berarti kode produser dieksekusi setiap kali operator terminal dipanggil pada flow. Pada contoh sebelumnya, memiliki beberapa pengumpul flow menyebabkan sumber data mengambil berita terkini beberapa kali pada interval tetap yang berbeda. Untuk mengoptimalkan dan berbagi flow jika beberapa konsumen mengumpulkan secara bersamaan, gunakan operator shareIn.

Menangkap pengecualian yang tidak diharapkan

Implementasi produser dapat berasal dari library pihak ketiga. Hal ini berarti implementasi dapat menampilkan pengecualian yang tidak diharapkan. Untuk menangani pengecualian ini, gunakan operator perantara catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Pada contoh sebelumnya, saat pengecualian terjadi, lambda collect tidak dipanggil karena item baru belum diterima.

catch juga dapat melakukan emit item ke flow. Lapisan repositori contoh dapat melakukan emit nilai yang di-cache:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Dalam contoh ini, jika pengecualian terjadi, lambda collect akan dipanggil saat item baru telah dimunculkan ke aliran data karena pengecualian.

Mengeksekusi dalam CoroutineContext yang berbeda

Secara default, produser builder flow dieksekusi dalam CoroutineContext coroutine yang mengumpulkan darinya, dan seperti disebutkan sebelumnya, produser ini tidak dapat melakukan emit nilai dari CoroutineContext yang berbeda. Perilaku ini mungkin tidak diinginkan dalam beberapa kasus. Misalnya, dalam contoh yang digunakan di seluruh topik ini, lapisan repositori tidak boleh menjalankan operasi pada Dispatchers.Main yang digunakan oleh viewModelScope.

Untuk mengubah CoroutineContext flow, gunakan operator perantara flowOn. flowOn mengubah CoroutineContext flow upstream, yang berarti produser dan operator perantara mana pun yang diterapkan sebelum (atau di atas) flowOn. Flow downstream (operator perantara setelah flowOn bersama konsumen) tidak terpengaruh dan dieksekusi di CoroutineContext yang digunakan untuk collect dari flow. Jika ada beberapa operator flowOn, masing-masing operator akan mengubah upstream dari lokasinya saat ini.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Dengan kode ini, operator onEach dan map menggunakan defaultDispatcher, sedangkan operator catch dan konsumen dieksekusi pada Dispatchers.Main yang digunakan oleh viewModelScope.

Karena lapisan sumber data menjalankan fungsi I/O, Anda harus menggunakan dispatcher yang dioptimalkan untuk operasi I/O:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Flow di library Jetpack

Flow diintegrasikan ke banyak library Jetpack, dan populer di antara library pihak ketiga Android. Flow sangat cocok untuk update data langsung dan aliran data tanpa batas.

Anda dapat menggunakan Flow dengan Room untuk menerima notifikasi perubahan dalam database. Saat menggunakan objek akses data (DAO), tampilkan jenis Flow untuk mendapatkan update langsung.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Setiap kali ada perubahan di tabel Example, daftar baru akan dimunculkan dengan item baru dalam database.

Mengonversi API berbasis callback menjadi flow

callbackFlow adalah flow builder yang memungkinkan Anda mengonversi API berbasis callback menjadi flow. Sebagai contoh, Firebase Firestore Android API menggunakan callback.

Untuk mengonversi API ini menjadi flow dan memproses update database Firestore, Anda dapat menggunakan kode berikut:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

Berbeda dengan builder flow, callbackFlow memungkinkan nilai dimunculkan dari CoroutineContext yang berbeda dengan fungsi send atau di luar coroutine dengan fungsi trySend.

Secara internal, callbackFlow menggunakan saluran, yang secara konseptual sangat mirip dengan antrean pemblokiran. Saluran dikonfigurasi dengan kapasitas, yaitu jumlah maksimum elemen yang dapat di-buffer. Saluran yang dibuat di callbackFlow memiliki kapasitas default 64 elemen. Saat mencoba menambahkan elemen baru ke saluran lengkap, send akan menangguhkan produser hingga ada ruang untuk elemen baru, sementara offer tidak menambahkan elemen ke saluran dan segera menampilkan false.

Referensi flow lainnya