Alur Kotlin di Android

Dalam coroutine, alur adalah jenis yang dapat memunculkan beberapa nilai secara berurutan, bukan fungsi penangguhan yang hanya menampilkan satu nilai. Misalnya, Anda dapat menggunakan alur untuk menerima pembaruan langsung dari database.

Alur dibuat di atas coroutine dan dapat memberikan beberapa nilai. Secara konseptual, alur adalah aliran data yang dapat dikomputasi secara asinkron. Nilai yang dimunculkan harus memiliki jenis yang sama. Misalnya, Flow<Int> adalah alur yang memunculkan nilai bilangan bulat.

Alur sangat mirip dengan Iterator yang menghasilkan urutan nilai, tetapi menggunakan fungsi penangguhan untuk menghasilkan dan memakai nilai secara asinkron. Ini berarti, misalnya, alur dapat dengan aman membuat permintaan jaringan untuk menghasilkan nilai berikutnya tanpa memblokir thread utama.

Ada tiga entity yang terlibat dalam aliran data:

  • Produser menghasilkan data yang ditambahkan ke aliran data. Berkat coroutine, alur juga dapat menghasilkan data secara asinkron.
  • (Opsional) Perantara dapat mengubah setiap nilai yang dimunculkan ke dalam aliran data atau aliran data itu sendiri.
  • Konsumen memakai nilai dari aliran data.

entity yang terlibat dalam aliran data; konsumen, perantara
              opsional, dan produser
Gambar 1. Entity yang terlibat dalam aliran data: konsumen, perantara opsional, dan produser.

Di Android, sumber data atau repositori biasanya merupakan produser data UI yang memiliki View sebagai konsumen yang pada akhirnya menampilkan data. Di lain waktu, lapisan View adalah produser peristiwa input pengguna dan lapisan lain dari hierarkinya memakainya. Lapisan di antara produser dan konsumen biasanya berfungsi sebagai perantara yang mengubah aliran data untuk menyesuaikannya dengan persyaratan lapisan berikutnya.

Membuat alur

Untuk membuat alur, gunakan flow builder API. Fungsi builder flow membuat alur baru tempat Anda dapat memunculkan nilai baru ke dalam aliran data secara manual menggunakan fungsi emit.

Pada contoh berikut, sumber data mengambil berita terbaru secara otomatis pada interval tetap. Karena fungsi penangguhan tidak dapat menampilkan beberapa nilai berturut-turut, sumber data akan membuat dan menampilkan alur untuk memenuhi persyaratan ini. Dalam hal ini, sumber data berfungsi sebagai produser.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Builder flow dieksekusi dalam coroutine. Dengan demikian, menerima manfaat dari API asinkron yang sama, tetapi beberapa pembatasan berlaku:

  • Alur bersifat berurutan. Karena produser berada dalam coroutine, saat memanggil fungsi penangguhan, produser akan ditangguhkan hingga fungsi penangguhan ditampilkan. Pada contoh tersebut, produser ditangguhkan hingga permintaan jaringan fetchLatestNews selesai. Hanya setelah itu, hasilnya akan dimunculkan ke aliran data.
  • Dengan builder flow, produser tidak dapat memiliki nilai emit dari CoroutineContext berbeda. Karena itu, jangan panggil emit dalam CoroutineContext yang berbeda dengan membuat coroutine baru atau dengan menggunakan blok kode withContext. Dalam kasus ini, Anda dapat menggunakan builder alur lain seperti callbackFlow.

Mengubah aliran data

Perantara dapat menggunakan operator perantara untuk mengubah aliran data tanpa memakai nilainya. Operator ini adalah fungsi yang, saat diterapkan ke aliran data, menyiapkan rantai operasi yang tidak dieksekusi hingga nilai dipakai di masa mendatang. Pelajari operator perantara lebih lanjut dalam Dokumentasi referensi alur.

Pada contoh di bawah ini, lapisan repositori menggunakan operator perantara map untuk mengubah data yang akan ditampilkan di View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Operator perantara dapat diterapkan satu per satu, dengan membentuk rantai operasi yang dieksekusi dengan santai saat item dimunculkan ke dalam alur. Perlu diperhatikan bahwa hanya menerapkan operator perantara ke aliran data tidak akan memulai pengumpulan alur.

Mengumpulkan dari alur

Gunakan operator terminal untuk memicu alur guna mulai mendeteksi nilai. Untuk mendapatkan semua nilai dalam aliran data saat dimunculkan, gunakan collect. Anda dapat mempelajari operator terminal lebih lanjut di dokumentasi alur resmi.

Sebagai fungsi penangguhan, collect perlu dieksekusi dalam coroutine. Diperlukan lambda sebagai parameter yang dipanggil pada setiap nilai baru. Karena merupakan fungsi penangguhan, coroutine yang memanggil collect dapat ditangguhkan hingga alur ditutup.

Melanjutkan contoh sebelumnya, berikut implementasi sederhana ViewModel yang memakai data dari lapisan repositori:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Pengumpulan alur akan memicu produser yang memperbarui berita terkini dan memunculkan hasil permintaan jaringan pada interval tetap. Karena produser selalu aktif dengan loop while(true), aliran data akan ditutup saat ViewModel dihapus dan viewModelScope dibatalkan.

Pengumpulan alur dapat dihentikan karena alasan berikut:

  • Coroutine yang mengumpulkan dibatalkan, seperti ditunjukkan pada contoh sebelumnya. Kondisi ini juga akan menghentikan produser yang mendasari.
  • Produser selesai memunculkan item. Dalam hal ini, aliran data ditutup dan coroutine yang memanggil collect melanjutkan eksekusi.

Alur bersifat dingin dan lambat kecuali jika ditentukan dengan operator perantara lainnya. Ini berarti kode produser dieksekusi setiap kali operator terminal dipanggil pada alur. Pada contoh sebelumnya, memiliki beberapa pengumpul alur menyebabkan sumber data mengambil berita terkini beberapa kali pada interval tetap yang berbeda. Untuk mengoptimalkan dan berbagi alur jika beberapa konsumen mengumpulkan secara bersamaan, gunakan operator shareIn.

Menangkap pengecualian yang tidak diharapkan

Implementasi produser dapat berasal dari library pihak ketiga. Ini berarti implementasi dapat melemparkan pengecualian yang tidak diharapkan. Untuk menangani pengecualian ini, gunakan operator perantara catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Pada contoh sebelumnya, saat pengecualian terjadi, lambda collect tidak dipanggil karena item baru belum diterima.

catch juga dapat emit item ke alur. Lapisan repositori contoh dapat emit nilai yang di-cache:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Dalam contoh ini, bila pengecualian terjadi, lambda collect akan dipanggil saat item baru telah dimunculkan ke aliran data karena pengecualian.

Mengeksekusi dalam CoroutineContext yang berbeda

Secara default, produser builder flow dieksekusi dalam CoroutineContext coroutine yang mengumpulkan darinya, dan seperti disebutkan sebelumnya, produser ini tidak dapat emit nilai dari CoroutineContext yang berbeda. Perilaku ini mungkin tidak diinginkan dalam beberapa kasus. Misalnya, dalam contoh yang digunakan di seluruh topik ini, lapisan repositori tidak boleh menjalankan operasi pada Dispatchers.Main yang digunakan oleh viewModelScope.

Untuk mengubah CoroutineContext alur, gunakan operator perantara flowOn. flowOn mengubah CoroutineContext alur upstream, yang berarti produser dan operator perantara mana pun diterapkan sebelum (atau di atas) flowOn. Alur downstream (operator perantara setelah flowOn bersama konsumen) tidak terpengaruh dan dieksekusi di CoroutineContext yang digunakan untuk collect dari alur. Jika ada beberapa operator flowOn, masing-masing operator akan mengubah upstream dari lokasinya saat ini.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Dengan kode ini, operator onEach dan map menggunakan defaultDispatcher, sedangkan operator catch dan konsumen dieksekusi pada Dispatchers.Main digunakan oleh viewModelScope.

Karena lapisan sumber data menjalankan fungsi I/O, Anda harus menggunakan dispatcher yang dioptimalkan untuk operasi I/O:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Alur di library Jetpack

Alur diintegrasikan ke banyak library Jetpack, dan populer di antara library pihak ketiga Android. Alur sangat cocok untuk update data live dan aliran data tanpa batas.

Anda dapat menggunakan Alur dengan Ruang untuk menerima notifikasi perubahan dalam database. Saat menggunakan objek akses data (DAO), tampilkan jenis Flow untuk mendapatkan update live.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Setiap kali ada perubahan di tabel Example, daftar baru akan dimunculkan dengan item baru dalam database.

Mengonversi API berbasis callback menjadi alur

callbackFlow adalah builder alur yang memungkinkan Anda mengonversi API berbasis callback menjadi alur. Sebagai contoh, Firebase Firestore Android API menggunakan callback. Untuk mengonversi API ini menjadi alur dan mendeteksi update database Firestore, Anda dapat menggunakan kode berikut:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

Berbeda dengan builder flow, callbackFlow memungkinkan nilai dimunculkan dari CoroutineContext yang berbeda dengan fungsi send atau di luar coroutine dengan fungsi offer.

Secara internal, callbackFlow menggunakan saluran, yang secara konseptual sangat mirip dengan antrean pemblokiran. Saluran dikonfigurasi dengan kapasitas, yaitu jumlah maksimum elemen yang dapat di-buffer. Saluran yang dibuat di callbackFlow memiliki kapasitas default 64 elemen. Saat mencoba menambahkan elemen baru ke saluran lengkap, send akan menangguhkan produser hingga ada ruang untuk elemen baru, sementara offer tidak menambahkan elemen ke saluran dan segera menampilkan false.

Referensi alur lainnya