Flow (dòng dữ liệu) Kotlin trên Android

Trong coroutine, flow là một loại dữ liệu có thể phát ra nhiều giá trị tuần tự, khác với suspend function (hàm tạm ngưng) chỉ trả về một giá trị duy nhất. Ví dụ: bạn có thể sử dụng flow để nhận dữ liệu cập nhật trực tiếp từ cơ sở dữ liệu.

Flow được xây dựng dựa trên coroutine và có thể cung cấp nhiều giá trị. Về cơ bản, flow là một dòng dữ liệu có thể được tính toán không đồng bộ. Các giá trị trả về phải thuộc cùng một loại dữ liệu. Ví dụ: Flow<Int> là một flow trả về giá trị số nguyên.

Flow rất giống với Iterator có khả năng tạo ra một dãy giá trị, nhưng flow sử dụng hàm tạm ngưng để tạo và xử lý các giá trị một cách không đồng bộ. Ví dụ: flow có thể tạo yêu cầu mạng một cách an toàn để tạo ra giá trị tiếp theo mà không chặn luồng thực thi chính.

Có 3 thực thể tham gia vào dòng dữ liệu:

  • Thực thể tạo (producer) có vai trò tạo dữ liệu để thêm vào dòng dữ liệu. Nhờ coroutine, flow cũng có thể tạo ra dữ liệu một cách không đồng bộ.
  • Thực thể trung gian (intermediary, nếu có) có thể sửa đổi từng giá trị được phát vào dòng dữ liệu hoặc sửa đổi chính dòng dữ liệu.
  • Thực thể tiêu thụ (consumer) sử dụng các giá trị trong dòng dữ liệu.

các thực thể tham gia vào dòng dữ liệu; thực thể tiêu thụ, thực thể trung gian
              (nếu có) và thực thể tạo
Hình 1. Các thực thể tham gia vào dòng dữ liệu: thực thể tiêu thụ, thực thể trung gian (nếu có) và thực thể tạo.

Trong Android, kho lưu trữ (repository) thường là một thực thể tạo của dữ liệu giao diện người dùng (UI) và thực thể này có UI mà thực thể tiêu thụ sử dụng để hiện dữ liệu. Trong những trường hợp khác, lớp giao diện người dùng là thực thể tạo của dữ liệu đầu vào của người dùng và các lớp khác trong phân cấp sử dụng dữ liệu này. Lớp (layer) giữa thực thể tạo và thực thể tiêu thụ thường là thực thể trung gian có vai trò sửa đổi dòng dữ liệu để điều chỉnh sao cho phù hợp với yêu cầu của lớp sau.

Tạo flow

Để tạo flow, hãy sử dụng các API tạo flow. Hàm tạo flow sẽ tạo một dòng dữ liệu mới để bạn có thể phát giá trị mới vào dòng dữ liệu theo cách thủ công thông qua hàm emit.

Trong ví dụ sau, một nguồn dữ liệu sẽ tự động tìm nạp tin tức mới nhất theo một tần suất cố định. Do hàm tạm ngưng không thể trả về nhiều giá trị liên tiếp, nên nguồn dữ liệu sẽ tạo và trả về một flow để đáp ứng yêu cầu này. Trong trường hợp này, nguồn dữ liệu đóng vai trò là thực thể tạo.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Hàm tạo flow được thực thi trong một coroutine. Do đó, hàm này hưởng lợi từ cùng các API không đồng bộ, nhưng có một số hạn chế như sau:

  • Flow mang tính tuần tự. Vì thực thể tạo nằm trong coroutine nên khi gọi hàm tạm ngưng, thực thể tạo sẽ dừng hoạt động cho đến khi hàm tạm ngưng hoạt động trở lại. Trong ví dụ, thực thể tạo tạm ngưng cho đến khi yêu cầu của mạng fetchLatestNews hoàn tất. Chỉ khi đó, kết quả này mới được phát vào dòng dữ liệu này.
  • Với hàm tạo flow, thực thể tạo không thể emit (gửi) giá trị từ một CoroutineContext khác. Vì vậy, đừng gọi emit trong một CoroutineContext khác bằng cách tạo coroutine mới hoặc bằng cách sử dụng khối mã withContext. Bạn có thể sử dụng hàm tạo flow khác như callbackFlow trong những trường hợp như vậy.

Sửa đổi dòng dữ liệu

Thực thể trung gian có thể sử dụng toán tử trung gian để sửa đổi dòng dữ liệu mà không cần xử lý các giá trị trong đó. Khi áp dụng cho một dòng dữ liệu, các toán tử này là các hàm sẽ thiết lập một chuỗi thao tác, các thao tác này không được thực thi cho đến khi các giá trị đó được sử dụng trong tương lai. Hãy tìm hiểu thêm về các toán tử trung gian trong tài liệu tham khảo về flow.

Trong ví dụ bên dưới, lớp kho lưu trữ sử dụng toán tử trung gian map để chuyển đổi dữ liệu cần hiện trên View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Bạn có thể áp dụng lần lượt các toán tử trung gian, tạo thành chuỗi thao tác được thực thi chậm (lazy) khi một mục giá trị được phát vào flow. Xin lưu ý rằng việc chỉ áp dụng toán tử trung gian cho một dòng dữ liệu sẽ không kích hoạt quá trình thu thập dữ liệu từ flow.

Thu thập dữ liệu từ flow

Sử dụng một toán tử đầu cuối để kích hoạt flow bắt đầu theo dõi các giá trị. Để nhận tất cả giá trị ngay khi được phát vào dòng dữ liệu, hãy sử dụng collect. Bạn có thể tìm hiểu thêm về các toán tử đầu cuối trong tài liệu chính thức về flow.

collect là một hàm tạm ngưng, nên bạn cần thực thi hàm này trong một coroutine. Hàm này sẽ lấy lambda làm tham số được gọi cho mỗi giá trị mới. Vì đây là một hàm tạm ngưng, nên coroutine gọi collect có thể tạm ngưng cho đến khi flow đóng lại.

Tiếp tục ví dụ trước, sau đây là một cách triển khai đơn giản cho ViewModel có vai trò xử lý dữ liệu trong lớp kho lưu trữ:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Việc thu thập dữ liệu từ flow sẽ kích hoạt thực thể tạo làm mới tin tức mới nhất và phát ra kết quả của yêu cầu mạng theo một tần suất cố định. Vì thực thể tạo vẫn luôn hoạt động với vòng lặp while(true), nên dòng dữ liệu sẽ đóng khi ViewModel bị xoá và viewModelScope bị huỷ.

Quá trình thu thập dữ liệu flow có thể ngừng vì những lý do sau:

  • Coroutine đảm nhận việc thu thập dữ liệu bị huỷ, như ví dụ trước đã thể hiện. Việc này cũng sẽ làm tạm ngưng thực thể tạo phía sau.
  • Thực thể tạo hoàn thành quá trình phát các mục giá trị. Trong trường hợp này, dòng dữ liệu sẽ đóng và coroutine đã gọi collect sẽ tiếp tục thực thi.

Flow có trạng thái là cold (bị động) và lazy (trì hoãn) trừ phi được chỉ định cụ thể bằng các toán tử trung gian khác. Điều này có nghĩa là mã nguồn cho thực thể tạo được thực thi mỗi khi một toán tử đầu cuối được gọi trên flow. Trong ví dụ trước, việc có nhiều trình thu thập dữ liệu flow sẽ khiến nguồn dữ liệu tìm nạp tin tức mới nhất nhiều lần theo tần suất cố định khác nhau. Để tối ưu hoá và chia sẻ flow khi nhiều thực thể tiêu thụ thu thập cùng một lúc, hãy sử dụng toán tử shareIn.

Phát hiện ngoại lệ không mong muốn

Mã triển khai thực thể tạo có thể đến từ một thư viện của bên thứ ba. Như vậy có nghĩa là thực thể này có thể gửi những ngoại lệ không mong muốn. Để xử lý các ngoại lệ này, hãy sử dụng toán tử trung gian catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Trong ví dụ trước, khi ngoại lệ xảy ra thì lambda collect sẽ không được gọi do chưa nhận được mục mới.

catch cũng có thể emit (gửi) các mục vào flow. Ví dụ: lớp lưu trữ mẫu có thể emit (gửi) các giá trị đã lưu vào bộ nhớ đệm:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Trong ví dụ này, khi ngoại lệ xảy ra thì lambda collect sẽ được gọi vì mục mới đã được phát vào dòng dữ liệu do có ngoại lệ đó.

Thực thị trong một CoroutineContext khác

Theo mặc định, thực thể tạo của hàm tạo flow sẽ thực thi trong CoroutineContext của coroutine đảm nhiệm việc thu thập dữ liệu từ flow đó, và như đã đề cập trước đó, thực thể này không thể emit (gửi) giá trị từ một CoroutineContext khác. Đây có thể là hành vi không mong muốn trong một số trường hợp. Ví dụ: trong các ví dụ của chủ đề này, lớp lưu trữ không được thực hiện các thao tác trên Dispatchers.MainviewModelScope sử dụng.

Để thay đổi CoroutineContext của một flow, hãy sử dụng toán tử trung gian flowOn. flowOn thay đổi CoroutineContext của upstream flow (dòng dữ liệu ngược), nghĩa là thực thể tạo và mọi toán tử trung gian đã áp dụng trước (hoặc trên) flowOn. Downstream flow (dòng dữ liệu xuôi) (nghĩa là các toán tử trung gian đứng sau flowOn cùng với thực thể tiêu thụ) không bị ảnh hưởng và sẽ thực thi trên CoroutineContext dùng để collect (thu thập) flow đó. Nếu có nhiều toán tử flowOn, mỗi toán tử sẽ thay đổi dòng dữ liệu ngược so với vị trí hiện tại.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Với mã này, hai toán tử onEachmap sử dụng defaultDispatcher , còn toán tử catch thực thể tiêu thụ được thực thi trên Dispatchers.Main thì sử dụng viewModelScope.

Khi lớp nguồn dữ liệu đang thực hiện tác vụ I/O, bạn nên sử dụng trình điều phối (dispatcher) được tối ưu hoá cho hoạt động I/O:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Flow trong thư viện Jetpack

Flow được tích hợp vào nhiều thư viện Jetpack và rất phổ biến trong các thư viện Android bên thứ ba. Flow là lựa chọn tuyệt vời để cập nhật dữ liệu trực tiếp và những dòng dữ liệu bất tận.

Bạn có thể sử dụng Flow cùng với Room để được thông báo về thay đổi trong cơ sở dữ liệu. Khi sử dụng đối tượng truy cập dữ liệu (DAO), hãy trả về loại Flow để nhận dữ liệu cập nhật trực tiếp.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Mỗi lần có thay đổi trong bảng Example, một danh sách mới sẽ được phát cùng với các mục mới trong cơ sở dữ liệu.

Chuyển API sử dụng hàm callback thành flow

callbackFlow là một hàm tạo flow cho phép bạn chuyển API sử dụng hàm callback thành flow. Ví dụ: Các API Firebase Firestore của Android dùng hàm callback.

Để chuyển những API nêu trên thành flow (luồng) và theo dõi nội dung cập nhật từ cơ sở dữ liệu Firestore, bạn có thể dùng mã sau đây:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

Không giống như trình tạo flow, callbackFlow cho phép phát các giá trị từ một CoroutineContext khác bằng hàm send hoặc gửi ra bên ngoài một coroutine bằng hàm trySend.

Còn trong phạm vi một coroutine, hãy callbackFlow sử dụng channel (kênh), về mặt lý thuyết thì channel sẽ tương tự như blocking queue (hàng đợi có khả năng chặn luồng thực thi). Cấu hình của một channel sẽ xác định dung lượng, nghĩa là số lượng tối đa các phần tử có thể lưu vào bộ đệm. Channel được tạo trong callbackFlow sẽ có dung lượng mặc định là 64 phần tử. Khi bạn cố gắng thêm phần tử mới vào một channel đã hết dung lượng, send sẽ tạm ngưng thực thể tạo cho đến khi có không gian cho phần tử mới, còn trySend thì sẽ không thêm phần tử vào channel và trả lại false ngay lập tức.

trySend ngay lập tức thêm phần tử được chỉ định vào kênh, chỉ khi hoạt động này không vi phạm giới hạn hạn mức về dung lượng, sau đó trả về kết quả thành công.

Các tài nguyên khác về flow