בתהליך של Kotlin ב-Android

בקורוטינים, זרימה הוא סוג שיכול לפלוט כמה ערכים ברצף, בניגוד לפונקציות השעיה שמחזירות רק בערך יחיד. לדוגמה, אפשר להשתמש בזרימה כדי לקבל נתונים בזמן אמת ממסד נתונים.

הזרימה מבוססת על קורוטינים והיא יכולה לספק ערכים מרובים. זרם הוא בעצם זרם נתונים שניתן לחשב באופן אסינכרוני. הערכים המותרים חייבים להיות מאותו סוג. עבור לדוגמה, Flow<Int> הוא זרם שפולט ערכים של מספרים שלמים.

יש רצף שדומה מאוד ל-Iterator שיוצר רצף של אבל נעשה בו שימוש בפונקציות השעיה כדי לייצר ולצרוך ערכים באופן אסינכרוני. זה אומר, לדוגמה, שהתהליך יכול ליצור לרשת המפיקה את הערך הבא בלי לחסום את של שרשור.

מקורות הנתונים מעורבים עם שלוש ישויות:

  • מפיק יוצר נתונים שמתווספים לשידור. תודות ל: ו-coroutines, תזרימים יכולים גם להפיק נתונים באופן אסינכרוני.
  • (אופציונלי) מתווכים יכולים לשנות כל ערך שמועבר אל או את השידור עצמו.
  • צרכן משתמש בערכים מהשידור.

ישויות שמעורבות במקורות נתונים; צרכן, אופציונלי
              מתווכים ומפיקים
איור 1. ישויות שמעורבות במקורות נתונים: הצרכן, מתווכים אופציונליים ומפיק.

ב-Android, מאגר הוא בדרך כלל הוא מפיק של נתוני ממשק המשתמש שהממשק המשתמש (UI) שלו משמש שבסופו של דבר מציג את הנתונים. במקרים אחרים, שכבת ממשק המשתמש היא מפיקה של אירועי קלט של משתמשים ושכבות אחרות בהיררכיה צורכות אותם. שכבות ב- בין המפיק לצרכן בדרך כלל משמשים כמתווכים המשנים את כדי להתאים אותו לדרישות של השכבה הבאה.

יצירת זרימה

כדי ליצור תהליכים, צריך להשתמש הכלי ליצירת פרחים ממשקי API. פונקציית ה-builder flow יוצרת תהליך חדש שבו אפשר לבצע ידנית פולט ערכים חדשים לזרם הנתונים באמצעות הפונקציה emit מותאמת אישית.

בדוגמה הבאה, מקור נתונים מאחזר את החדשות האחרונות באופן אוטומטי במרווחי זמן קבועים. כפונקציית השעיה לא ניתן מחזירה כמה ערכים עוקבים, מקור הנתונים יוצר ומחזיר זרימה קדימה כדי למלא את הדרישה הזו. במקרה הזה, מקור הנתונים פועל בתור המפיק.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

ה-builder flow מופעל בתוך קורוטין. לכן, הוא מועיל מאותם ממשקי API אסינכרוניים, אבל קיימות מגבלות מסוימות:

  • תהליכי העבודה הם רציפים. מאחר שהמפיק נמצא בקורוטין, כשהוא מתקשר פונקציית השעיה, המפיק מושעה עד שפונקציית ההשעיה החזרות. בדוגמה, המפיק מושעה עד fetchLatestNews בקשת הרשת הושלמה. רק כך התוצאה תופק לשידור.
  • באמצעות ה-builder flow, המפיק לא יכול emit ערכים מתוך CoroutineContext שונה. לכן, אין לקרוא ל-emit CoroutineContext על ידי יצירת קורוטין חדשים או באמצעות withContext בלוקים של קוד. אפשר להשתמש בכלים אחרים ליצירת תהליכים, כמו callbackFlow במקרים כאלה.

שינוי הזרם

מתווכים יכולים להשתמש באופרטורים מתווכים כדי לשנות את הסטרימינג של מבלי לצרוך את הערכים. האופרטורים האלה הם פונקציות שכאשר מוחלת על זרם נתונים, להגדיר שרשרת פעולות ש עד שתצרוך את הערכים בעתיד. מידע נוסף על מפעילי ביניים חומרי עזר בנושא זרימה

בדוגמה הבאה, שכבת המאגר משתמשת באופרטור הביניים map כדי לשנות את הנתונים שיוצגו בView:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

ניתן להחיל אופרטורים מתווכים אחד אחרי השני, וכך ליצור שרשרת פעולות המבוצעות באופן מדורג כשפריט נפלט אל . שימו לב שפשוט החלת אופרטור ביניים על זרם לא להתחיל את איסוף הזרימה.

איסוף בתהליך

משתמשים באופרטור טרמינל כדי להפעיל את התהליך להתחלת ההאזנה ערכים. כדי לקבל את כל הערכים שבסטרימינג בזמן שהם נשלחים, משתמשים בפונקציה collect כדי לקבל מידע נוסף על אופרטורים של מסופים, אפשר לקרוא מסמכים רשמיים בנושא התהליך.

collect היא פונקציית השעיה, ולכן צריך להפעיל אותה בתוך קווטין. היא לוקחת את lambda כפרמטר שנקרא כל ערך חדש. מכיוון שזו פונקציית השעיה, הקווטין הקריאות collect עשויות להיות מושהות עד שהזרימה תיסגר.

בהמשך לדוגמה הקודמת, הנה הטמעה פשוטה של ViewModel שצורך את הנתונים משכבת המאגר:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

איסוף של נתוני התהליך גורם למפיק שמעדכן את החדשות האחרונות ופולט את התוצאה של בקשת הרשת במרווח זמן קבוע. כי המפיק תמיד פעיל בלולאת while(true), השידור של הנתונים ייסגרו לאחר מחיקת ה-ViewModel, viewModelScope בוטל.

איסוף הזרימה יכול להיפסק מהסיבות הבאות:

  • התרופה שאוספת מבוטלת, כפי שמוצג בדוגמה הקודמת. הפעולה הזו מפסיקה גם את המפיק המקורי.
  • המפיק מסיים לקלוט פריטים. במקרה כזה, זרם הנתונים נסגרה והקורוטין בשם collect ממשיכה בביצוע.

תהליכי העבודה הם קרים ומתקדמים, אלא אם הם צוינו בתהליכי ביניים אחרים אופרטורים. כלומר, קוד היצרן מופעל בכל פעם לאופרטור הטרמינל קוראים לזרימה. בדוגמה הקודמת, כשיש מספר אוספים של תהליכים, מקור הנתונים יאחזר את את החדשות האחרונות מספר פעמים במרווחי זמן קבועים שונים. כדי לבצע אופטימיזציה יכולים לבצע את התהליך שבו מספר צרכנים אוספים בו-זמנית, השתמשו אופרטור shareIn.

תפיסת חריגים בלתי צפויים

ההטמעה של המפיק יכולה להגיע מספרייה של צד שלישי. המשמעות היא שהדבר עלול לגרום חריגות בלתי צפויות. כדי לטפל בתרחישים האלה חריגים, השתמשו catch מפעיל ביניים.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

בדוגמה הקודמת, כשמתרחש חריגה, הפונקציה collect לא מתבצעת קריאה ל-lambda כי לא התקבל פריט חדש.

catch יכול גם להוסיף emit פריטים לזרימה. המאגר לדוגמה במקום זאת, השכבה יכולה emit את הערכים שנשמרו במטמון:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

בדוגמה הזו, כשמתרחש יוצא מן הכלל, ה-lambda collect הוא נקרא, כי פריט חדש הופץ לשידור בגלל חריג.

הפעולה מתבצעת בקובץ CoroutineContext אחר

כברירת מחדל, הבעלים של כלי הבנייה של flow מבצע CoroutineContext של הקורוטינים שאוספים ממנו, וכמו צוין קודם לכן, אי אפשר emit ערכים CoroutineContext. במקרים מסוימים, התנהגות כזו עשויה להיות לא רצויה. למשל, בדוגמאות שמופיעות בנושא הזה, המאגר השכבה לא אמורה לבצע פעולות על Dispatchers.Main. נמצא בשימוש על ידי viewModelScope.

כדי לשנות את CoroutineContext של זרימה, צריך להשתמש באופרטור הביניים flowOn. הפונקציה flowOn משנה את הערך CoroutineContext של הזרימה ב-upstream, כלומר היצרן וכל מפעילי הביניים שהוחלו לפני (או יותר) flowOn התהליך במורד הזרם (האופרטורים ברמת הביניים אחרי flowOn וגם הצרכן) CoroutineContext משמש ל-collect מהזרימה. אם יש מספר אופרטורים של flowOn, כל אחד מהם משנה את ה-upstream המיקום הנוכחי.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

עם הקוד הזה, האופרטורים onEach ו-map משתמשים ב-defaultDispatcher, אבל המפעיל catch והצרכן מבוצעים Dispatchers.Main נמצא בשימוש על ידי viewModelScope.

מכיוון ששכבת מקור הנתונים עושה עבודת קלט/פלט (I/O), צריך להשתמש בסדר שמותאמת במיוחד לפעולות קלט/פלט (I/O):

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

תהליכים בספריות Jetpack

הזרימה משולבת בספריות רבות של Jetpack, והיא פופולרית בקרב ספריות של צדדים שלישיים ל-Android. התכונה 'זרימה' מתאימה במיוחד לעדכונים בזמן אמת ומקורות נתונים אינסופיים.

אפשר להשתמש לזרום עם החדר לקבל התראות על שינויים במסד נתונים. בזמן השימוש אובייקטים של גישה לנתונים (DAO), להחזיר סוג Flow כדי לקבל עדכונים בזמן אמת.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

בכל פעם שיש שינוי בטבלה Example, נוצרת רשימה חדשה עם הפריטים החדשים במסד הנתונים.

המרת ממשקי API שמבוססים על קריאה חוזרת לתהליכים

callbackFlow הוא כלי ליצירת תהליכים שמאפשר להמיר ממשקי API שמבוססים על קריאה חוזרת לתהליכים. לדוגמה, Firebase Firestore ממשקי API של Android משתמשים בקריאות חוזרות (callback).

כדי להמיר את ממשקי ה-API האלה לזרימה ולהאזין לעדכונים במסד הנתונים של Firestore, אתם צריכים: יכול להשתמש בקוד הבא:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

בניגוד ל-builder flow, callbackFlow מאפשרת להפיץ ערכים מ-CoroutineContext שונה עם פונקציית send או מחוץ לקורוטין עם הפונקציה trySend מותאמת אישית.

באופן פנימי, callbackFlow משתמש channel, דומה מאוד מבחינה מושגית לחסימה להבא. ערוץ מוגדר עם קיבולת, מספר הרכיבים המקסימלי שאפשר לבצע אגירת נתונים. בערוץ שנוצר ב-callbackFlow מוגדרת ברירת מחדל קיבולת של 64 רכיבים. כשמנסים להוסיף רכיב חדש לקובץ מלא הערוץ, send משעה את המפיק עד שיתפנה מקום ואילו trySend לא מוסיף את הרכיב לערוץ ומחזיר false באופן מיידי.

trySend מוסיף באופן מיידי את הרכיב שצוין לערוץ, רק אם הדבר לא מפר את הגבלות הקיבולת שלו, ולאחר מכן מחזירה את תוצאה מוצלחת.

מקורות מידע נוספים בנושא תהליכי