Trong coroutine, flow là một loại dữ liệu có thể phát ra nhiều giá trị tuần tự, khác với suspend function (hàm tạm ngưng) chỉ trả về một giá trị duy nhất. Ví dụ: bạn có thể sử dụng flow để nhận dữ liệu cập nhật trực tiếp từ cơ sở dữ liệu.
Flow được xây dựng dựa trên coroutine và có thể cung cấp nhiều giá trị.
Về cơ bản, flow là một dòng dữ liệu có thể được tính toán không đồng bộ. Các giá trị trả về phải thuộc cùng một loại dữ liệu. Ví
dụ: Flow<Int>
là một flow trả về giá trị số nguyên.
Flow rất giống với Iterator
có khả năng tạo ra một dãy
giá trị, nhưng flow sử dụng hàm tạm ngưng để tạo và xử lý các giá trị
một cách không đồng bộ. Ví dụ: flow có thể tạo yêu cầu mạng một cách an toàn để tạo ra giá trị tiếp theo mà không chặn luồng thực thi chính.
Có 3 thực thể tham gia vào dòng dữ liệu:
- Thực thể tạo (producer) có vai trò tạo dữ liệu để thêm vào dòng dữ liệu. Nhờ coroutine, flow cũng có thể tạo ra dữ liệu một cách không đồng bộ.
- Thực thể trung gian (intermediary, nếu có) có thể sửa đổi từng giá trị được phát vào dòng dữ liệu hoặc sửa đổi chính dòng dữ liệu.
- Thực thể tiêu thụ (consumer) sử dụng các giá trị trong dòng dữ liệu.
Trong Android, kho lưu trữ (repository) thường là một thực thể tạo của dữ liệu giao diện người dùng (UI) và thực thể này có UI mà thực thể tiêu thụ sử dụng để hiện dữ liệu. Trong những trường hợp khác, lớp giao diện người dùng là thực thể tạo của dữ liệu đầu vào của người dùng và các lớp khác trong phân cấp sử dụng dữ liệu này. Lớp (layer) giữa thực thể tạo và thực thể tiêu thụ thường là thực thể trung gian có vai trò sửa đổi dòng dữ liệu để điều chỉnh sao cho phù hợp với yêu cầu của lớp sau.
Tạo flow
Để tạo flow, hãy sử dụng các API
tạo flow. Hàm tạo flow
sẽ tạo một dòng dữ liệu mới để bạn có thể
phát giá trị mới vào dòng dữ liệu theo cách thủ công thông qua hàm
emit
.
Trong ví dụ sau, một nguồn dữ liệu sẽ tự động tìm nạp tin tức mới nhất theo một tần suất cố định. Do hàm tạm ngưng không thể trả về nhiều giá trị liên tiếp, nên nguồn dữ liệu sẽ tạo và trả về một flow để đáp ứng yêu cầu này. Trong trường hợp này, nguồn dữ liệu đóng vai trò là thực thể tạo.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Hàm tạo flow
được thực thi trong một coroutine. Do đó, hàm này hưởng lợi từ cùng các API không đồng bộ, nhưng có một số hạn chế như sau:
- Flow mang tính tuần tự. Vì thực thể tạo nằm trong coroutine nên khi gọi hàm
tạm ngưng, thực thể tạo sẽ dừng hoạt động cho đến khi hàm tạm ngưng hoạt động trở lại. Trong ví dụ, thực thể tạo tạm ngưng cho đến khi yêu cầu của mạng
fetchLatestNews
hoàn tất. Chỉ khi đó, kết quả này mới được phát vào dòng dữ liệu này. - Với hàm tạo
flow
, thực thể tạo không thểemit
(gửi) giá trị từ mộtCoroutineContext
khác. Vì vậy, đừng gọiemit
trong mộtCoroutineContext
khác bằng cách tạo coroutine mới hoặc bằng cách sử dụng khối mãwithContext
. Bạn có thể sử dụng hàm tạo flow khác nhưcallbackFlow
trong những trường hợp như vậy.
Sửa đổi dòng dữ liệu
Thực thể trung gian có thể sử dụng toán tử trung gian để sửa đổi dòng dữ liệu mà không cần xử lý các giá trị trong đó. Khi áp dụng cho một dòng dữ liệu, các toán tử này là các hàm sẽ thiết lập một chuỗi thao tác, các thao tác này không được thực thi cho đến khi các giá trị đó được sử dụng trong tương lai. Hãy tìm hiểu thêm về các toán tử trung gian trong tài liệu tham khảo về flow.
Trong ví dụ bên dưới, lớp kho lưu trữ sử dụng toán tử trung gian
map
để chuyển đổi dữ liệu cần hiện trên View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Bạn có thể áp dụng lần lượt các toán tử trung gian, tạo thành chuỗi thao tác được thực thi chậm (lazy) khi một mục giá trị được phát vào flow. Xin lưu ý rằng việc chỉ áp dụng toán tử trung gian cho một dòng dữ liệu sẽ không kích hoạt quá trình thu thập dữ liệu từ flow.
Thu thập dữ liệu từ flow
Sử dụng một toán tử đầu cuối để kích hoạt flow bắt đầu theo dõi các giá trị. Để nhận tất cả giá trị ngay khi được phát vào dòng dữ liệu, hãy sử dụng
collect
.
Bạn có thể tìm hiểu thêm về các toán tử đầu cuối trong tài liệu chính thức về flow.
Vì collect
là một hàm tạm ngưng, nên bạn cần thực thi hàm này trong
một coroutine. Hàm này sẽ lấy lambda làm tham số được gọi cho mỗi giá trị mới. Vì đây là một hàm tạm ngưng, nên coroutine gọi collect
có thể tạm ngưng cho đến khi flow đóng lại.
Tiếp tục ví dụ trước, sau đây là một cách triển khai đơn giản cho
ViewModel
có vai trò xử lý dữ liệu trong lớp kho lưu trữ:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Việc thu thập dữ liệu từ flow sẽ kích hoạt thực thể tạo làm mới tin tức mới nhất
và phát ra kết quả của yêu cầu mạng theo một tần suất cố định. Vì
thực thể tạo vẫn luôn hoạt động với vòng lặp while(true)
, nên dòng
dữ liệu sẽ đóng khi ViewModel bị xoá và
viewModelScope
bị huỷ.
Quá trình thu thập dữ liệu flow có thể ngừng vì những lý do sau:
- Coroutine đảm nhận việc thu thập dữ liệu bị huỷ, như ví dụ trước đã thể hiện. Việc này cũng sẽ làm tạm ngưng thực thể tạo phía sau.
- Thực thể tạo hoàn thành quá trình phát các mục giá trị. Trong trường hợp này, dòng dữ liệu
sẽ đóng và coroutine đã gọi
collect
sẽ tiếp tục thực thi.
Flow có trạng thái là cold (bị động) và lazy (trì hoãn) trừ phi được chỉ định cụ thể bằng các toán tử trung gian khác. Điều này có nghĩa là mã nguồn cho thực thể tạo được thực thi mỗi khi
một toán tử đầu cuối được gọi trên flow. Trong ví dụ trước,
việc có nhiều trình thu thập dữ liệu flow sẽ khiến nguồn dữ liệu tìm nạp tin tức mới nhất nhiều lần theo tần suất cố định khác nhau. Để tối ưu hoá và
chia sẻ flow khi nhiều thực thể tiêu thụ thu thập cùng một lúc, hãy sử dụng toán tử
shareIn
.
Phát hiện ngoại lệ không mong muốn
Mã triển khai thực thể tạo có thể đến từ một thư viện của bên thứ ba.
Như vậy có nghĩa là thực thể này có thể gửi những ngoại lệ không mong muốn. Để xử lý các ngoại lệ này, hãy sử dụng toán tử trung gian
catch
.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Trong ví dụ trước, khi ngoại lệ xảy ra thì lambda collect
sẽ không được gọi do chưa nhận được mục mới.
catch
cũng có thể emit
(gửi) các mục vào flow. Ví dụ: lớp lưu trữ mẫu có thể emit
(gửi) các giá trị đã lưu vào bộ nhớ đệm:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
Trong ví dụ này, khi ngoại lệ xảy ra thì lambda collect
sẽ được gọi vì mục mới đã được phát vào dòng dữ liệu do có ngoại lệ đó.
Thực thị trong một CoroutineContext khác
Theo mặc định, thực thể tạo của hàm tạo flow
sẽ thực thi trong CoroutineContext
của coroutine đảm nhiệm việc thu thập dữ liệu từ flow đó, và như đã đề cập
trước đó, thực thể này không thể emit
(gửi) giá trị từ một
CoroutineContext
khác. Đây có thể là hành vi không mong muốn trong một số trường hợp.
Ví dụ: trong các ví dụ của chủ đề này, lớp lưu trữ
không được thực hiện các thao tác trên Dispatchers.Main
mà viewModelScope
sử dụng.
Để thay đổi CoroutineContext
của một flow, hãy sử dụng toán tử trung gian
flowOn
.
flowOn
thay đổi CoroutineContext
của upstream flow (dòng dữ liệu ngược), nghĩa là
thực thể tạo và mọi toán tử trung gian đã áp dụng trước (hoặc trên)
flowOn
. Downstream flow (dòng dữ liệu xuôi) (nghĩa là các toán tử trung gian đứng sau flowOn
cùng với thực thể tiêu thụ) không bị ảnh hưởng và sẽ thực thi trên
CoroutineContext
dùng để collect
(thu thập) flow đó. Nếu có nhiều toán tử flowOn
, mỗi toán tử sẽ thay đổi dòng dữ liệu ngược so với vị trí hiện tại.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Với mã này, hai toán tử onEach
và map
sử dụng defaultDispatcher
,
còn toán tử catch
thực thể tiêu thụ được thực thi trên Dispatchers.Main
thì sử dụng viewModelScope
.
Khi lớp nguồn dữ liệu đang thực hiện tác vụ I/O, bạn nên sử dụng trình điều phối (dispatcher) được tối ưu hoá cho hoạt động I/O:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Flow trong thư viện Jetpack
Flow được tích hợp vào nhiều thư viện Jetpack và rất phổ biến trong các thư viện Android bên thứ ba. Flow là lựa chọn tuyệt vời để cập nhật dữ liệu trực tiếp và những dòng dữ liệu bất tận.
Bạn có thể sử dụng
Flow cùng với Room
để được thông báo về thay đổi trong cơ sở dữ liệu. Khi sử dụng
đối tượng truy cập dữ liệu (DAO),
hãy trả về loại Flow
để nhận dữ liệu cập nhật trực tiếp.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Mỗi lần có thay đổi trong bảng Example
, một danh sách mới sẽ được phát cùng với các mục mới trong cơ sở dữ liệu.
Chuyển API sử dụng hàm callback thành flow
callbackFlow
là một hàm tạo flow cho phép bạn chuyển API sử dụng hàm callback thành flow.
Ví dụ: Các API Firebase Firestore của Android dùng hàm callback.
Để chuyển những API nêu trên thành flow (luồng) và theo dõi nội dung cập nhật từ cơ sở dữ liệu Firestore, bạn có thể dùng mã sau đây:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
Không giống như trình tạo flow
, callbackFlow
cho phép phát các giá trị từ một CoroutineContext
khác bằng hàm send
hoặc gửi ra bên ngoài một coroutine bằng hàm trySend
.
Còn trong phạm vi một coroutine, hãy callbackFlow
sử dụng
channel (kênh),
về mặt lý thuyết thì channel sẽ tương tự như blocking
queue (hàng đợi có khả năng chặn luồng thực thi).
Cấu hình của một channel sẽ xác định dung lượng, nghĩa là số lượng tối đa các phần tử
có thể lưu vào bộ đệm. Channel được tạo trong callbackFlow
sẽ có dung lượng mặc định là 64 phần tử. Khi bạn cố gắng thêm phần tử mới vào
một channel đã hết dung lượng, send
sẽ tạm ngưng thực thể tạo cho đến khi có không gian cho phần tử mới, còn trySend
thì sẽ không thêm phần tử vào channel và trả lại
false
ngay lập tức.
trySend
ngay lập tức thêm phần tử được chỉ định vào kênh,
chỉ khi hoạt động này không vi phạm giới hạn hạn mức về dung lượng, sau đó trả về
kết quả thành công.
Các tài nguyên khác về flow
- Kiểm thử flow của Kotlin trên Android
StateFlow
vàSharedFlow
- Tài nguyên khác về coroutine và luồng trong Kotlin