בקורוטינים, זרימה הוא סוג שיכול לפלוט כמה ערכים ברצף, בניגוד לפונקציות השעיה שמחזירות רק בערך יחיד. לדוגמה, אפשר להשתמש בזרימה כדי לקבל נתונים בזמן אמת ממסד נתונים.
הזרימה מבוססת על קורוטינים והיא יכולה לספק ערכים מרובים.
זרם הוא בעצם זרם נתונים שניתן לחשב
באופן אסינכרוני. הערכים המותרים חייבים להיות מאותו סוג. עבור
לדוגמה, Flow<Int>
הוא זרם שפולט ערכים של מספרים שלמים.
יש רצף שדומה מאוד ל-Iterator
שיוצר רצף של
אבל נעשה בו שימוש בפונקציות השעיה כדי לייצר ולצרוך ערכים
באופן אסינכרוני. זה אומר, לדוגמה, שהתהליך יכול ליצור
לרשת המפיקה את הערך הבא בלי לחסום את
של שרשור.
מקורות הנתונים מעורבים עם שלוש ישויות:
- מפיק יוצר נתונים שמתווספים לשידור. תודות ל: ו-coroutines, תזרימים יכולים גם להפיק נתונים באופן אסינכרוני.
- (אופציונלי) מתווכים יכולים לשנות כל ערך שמועבר אל או את השידור עצמו.
- צרכן משתמש בערכים מהשידור.
ב-Android, מאגר הוא בדרך כלל הוא מפיק של נתוני ממשק המשתמש שהממשק המשתמש (UI) שלו משמש שבסופו של דבר מציג את הנתונים. במקרים אחרים, שכבת ממשק המשתמש היא מפיקה של אירועי קלט של משתמשים ושכבות אחרות בהיררכיה צורכות אותם. שכבות ב- בין המפיק לצרכן בדרך כלל משמשים כמתווכים המשנים את כדי להתאים אותו לדרישות של השכבה הבאה.
יצירת זרימה
כדי ליצור תהליכים, צריך להשתמש
הכלי ליצירת פרחים
ממשקי API. פונקציית ה-builder flow
יוצרת תהליך חדש שבו אפשר לבצע ידנית
פולט ערכים חדשים לזרם הנתונים באמצעות הפונקציה
emit
מותאמת אישית.
בדוגמה הבאה, מקור נתונים מאחזר את החדשות האחרונות באופן אוטומטי במרווחי זמן קבועים. כפונקציית השעיה לא ניתן מחזירה כמה ערכים עוקבים, מקור הנתונים יוצר ומחזיר זרימה קדימה כדי למלא את הדרישה הזו. במקרה הזה, מקור הנתונים פועל בתור המפיק.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
ה-builder flow
מופעל בתוך קורוטין. לכן, הוא מועיל
מאותם ממשקי API אסינכרוניים, אבל קיימות מגבלות מסוימות:
- תהליכי העבודה הם רציפים. מאחר שהמפיק נמצא בקורוטין, כשהוא מתקשר
פונקציית השעיה, המפיק מושעה עד שפונקציית ההשעיה
החזרות. בדוגמה, המפיק מושעה עד
fetchLatestNews
בקשת הרשת הושלמה. רק כך התוצאה תופק לשידור. - באמצעות ה-builder
flow
, המפיק לא יכולemit
ערכים מתוךCoroutineContext
שונה. לכן, אין לקרוא ל-emit
CoroutineContext
על ידי יצירת קורוטין חדשים או באמצעותwithContext
בלוקים של קוד. אפשר להשתמש בכלים אחרים ליצירת תהליכים, כמוcallbackFlow
במקרים כאלה.
שינוי הזרם
מתווכים יכולים להשתמש באופרטורים מתווכים כדי לשנות את הסטרימינג של מבלי לצרוך את הערכים. האופרטורים האלה הם פונקציות שכאשר מוחלת על זרם נתונים, להגדיר שרשרת פעולות ש עד שתצרוך את הערכים בעתיד. מידע נוסף על מפעילי ביניים חומרי עזר בנושא זרימה
בדוגמה הבאה, שכבת המאגר משתמשת באופרטור הביניים
map
כדי לשנות את הנתונים שיוצגו בView
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
ניתן להחיל אופרטורים מתווכים אחד אחרי השני, וכך ליצור שרשרת פעולות המבוצעות באופן מדורג כשפריט נפלט אל . שימו לב שפשוט החלת אופרטור ביניים על זרם לא להתחיל את איסוף הזרימה.
איסוף בתהליך
משתמשים באופרטור טרמינל כדי להפעיל את התהליך להתחלת ההאזנה
ערכים. כדי לקבל את כל הערכים שבסטרימינג בזמן שהם נשלחים, משתמשים בפונקציה
collect
כדי לקבל מידע נוסף על אופרטורים של מסופים, אפשר לקרוא
מסמכים רשמיים בנושא התהליך.
collect
היא פונקציית השעיה, ולכן צריך להפעיל אותה בתוך
קווטין. היא לוקחת את lambda כפרמטר שנקרא
כל ערך חדש. מכיוון שזו פונקציית השעיה, הקווטין
הקריאות collect
עשויות להיות מושהות עד שהזרימה תיסגר.
בהמשך לדוגמה הקודמת, הנה הטמעה פשוטה של
ViewModel
שצורך את הנתונים משכבת המאגר:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
איסוף של נתוני התהליך גורם למפיק שמעדכן את החדשות האחרונות
ופולט את התוצאה של בקשת הרשת במרווח זמן קבוע. כי
המפיק תמיד פעיל בלולאת while(true)
, השידור
של הנתונים ייסגרו לאחר מחיקת ה-ViewModel,
viewModelScope
בוטל.
איסוף הזרימה יכול להיפסק מהסיבות הבאות:
- התרופה שאוספת מבוטלת, כפי שמוצג בדוגמה הקודמת. הפעולה הזו מפסיקה גם את המפיק המקורי.
- המפיק מסיים לקלוט פריטים. במקרה כזה, זרם הנתונים
נסגרה והקורוטין בשם
collect
ממשיכה בביצוע.
תהליכי העבודה הם קרים ומתקדמים, אלא אם הם צוינו בתהליכי ביניים אחרים
אופרטורים. כלומר, קוד היצרן מופעל בכל פעם
לאופרטור הטרמינל קוראים לזרימה. בדוגמה הקודמת,
כשיש מספר אוספים של תהליכים, מקור הנתונים יאחזר את
את החדשות האחרונות מספר פעמים במרווחי זמן קבועים שונים. כדי לבצע אופטימיזציה
יכולים לבצע את התהליך שבו מספר צרכנים אוספים בו-זמנית, השתמשו
אופרטור shareIn
.
תפיסת חריגים בלתי צפויים
ההטמעה של המפיק יכולה להגיע מספרייה של צד שלישי.
המשמעות היא שהדבר עלול לגרום חריגות בלתי צפויות. כדי לטפל בתרחישים האלה
חריגים, השתמשו
catch
מפעיל ביניים.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
בדוגמה הקודמת, כשמתרחש חריגה, הפונקציה collect
לא מתבצעת קריאה ל-lambda כי לא התקבל פריט חדש.
catch
יכול גם להוסיף emit
פריטים לזרימה. המאגר לדוגמה
במקום זאת, השכבה יכולה emit
את הערכים שנשמרו במטמון:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
בדוגמה הזו, כשמתרחש יוצא מן הכלל, ה-lambda collect
הוא
נקרא, כי פריט חדש הופץ לשידור בגלל
חריג.
הפעולה מתבצעת בקובץ CoroutineContext אחר
כברירת מחדל, הבעלים של כלי הבנייה של flow
מבצע
CoroutineContext
של הקורוטינים שאוספים ממנו, וכמו
צוין קודם לכן, אי אפשר emit
ערכים
CoroutineContext
. במקרים מסוימים, התנהגות כזו עשויה להיות לא רצויה.
למשל, בדוגמאות שמופיעות בנושא הזה, המאגר
השכבה לא אמורה לבצע פעולות על Dispatchers.Main
.
נמצא בשימוש על ידי viewModelScope
.
כדי לשנות את CoroutineContext
של זרימה, צריך להשתמש באופרטור הביניים
flowOn
.
הפונקציה flowOn
משנה את הערך CoroutineContext
של הזרימה ב-upstream, כלומר
היצרן וכל מפעילי הביניים שהוחלו לפני (או יותר)
flowOn
התהליך במורד הזרם (האופרטורים ברמת הביניים אחרי flowOn
וגם הצרכן)
CoroutineContext
משמש ל-collect
מהזרימה. אם יש
מספר אופרטורים של flowOn
, כל אחד מהם משנה את ה-upstream
המיקום הנוכחי.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
עם הקוד הזה, האופרטורים onEach
ו-map
משתמשים ב-defaultDispatcher
,
אבל המפעיל catch
והצרכן מבוצעים
Dispatchers.Main
נמצא בשימוש על ידי viewModelScope
.
מכיוון ששכבת מקור הנתונים עושה עבודת קלט/פלט (I/O), צריך להשתמש בסדר שמותאמת במיוחד לפעולות קלט/פלט (I/O):
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
תהליכים בספריות Jetpack
הזרימה משולבת בספריות רבות של Jetpack, והיא פופולרית בקרב ספריות של צדדים שלישיים ל-Android. התכונה 'זרימה' מתאימה במיוחד לעדכונים בזמן אמת ומקורות נתונים אינסופיים.
אפשר להשתמש
לזרום עם החדר
לקבל התראות על שינויים במסד נתונים. בזמן השימוש
אובייקטים של גישה לנתונים (DAO),
להחזיר סוג Flow
כדי לקבל עדכונים בזמן אמת.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
בכל פעם שיש שינוי בטבלה Example
, נוצרת רשימה חדשה
עם הפריטים החדשים במסד הנתונים.
המרת ממשקי API שמבוססים על קריאה חוזרת לתהליכים
callbackFlow
הוא כלי ליצירת תהליכים שמאפשר להמיר ממשקי API שמבוססים על קריאה חוזרת לתהליכים.
לדוגמה, Firebase Firestore
ממשקי API של Android משתמשים בקריאות חוזרות (callback).
כדי להמיר את ממשקי ה-API האלה לזרימה ולהאזין לעדכונים במסד הנתונים של Firestore, אתם צריכים: יכול להשתמש בקוד הבא:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
בניגוד ל-builder flow
, callbackFlow
מאפשרת להפיץ ערכים מ-CoroutineContext
שונה עם פונקציית
send
או מחוץ לקורוטין עם הפונקציה
trySend
מותאמת אישית.
באופן פנימי, callbackFlow
משתמש
channel,
דומה מאוד מבחינה מושגית לחסימה
להבא.
ערוץ מוגדר עם קיבולת, מספר הרכיבים המקסימלי
שאפשר לבצע אגירת נתונים. בערוץ שנוצר ב-callbackFlow
מוגדרת ברירת מחדל
קיבולת של 64 רכיבים. כשמנסים להוסיף רכיב חדש לקובץ מלא
הערוץ, send
משעה את המפיק עד שיתפנה מקום
ואילו trySend
לא מוסיף את הרכיב לערוץ ומחזיר
false
באופן מיידי.
trySend
מוסיף באופן מיידי את הרכיב שצוין לערוץ,
רק אם הדבר לא מפר את הגבלות הקיבולת שלו, ולאחר מכן מחזירה את
תוצאה מוצלחת.
מקורות מידע נוספים בנושא תהליכי
- בדיקת תהליכים של Kotlin ב-Android
StateFlow
ו-SharedFlow
- מקורות מידע נוספים בנושא קורוטין וזרימה ב-Kotlin