تدفق Kotlin على نظام Android

في الكوروتينات، يكون التدفق هو نوع يمكنه إرسال قيم متعددة بشكل تسلسلي، على عكس دوال التعليق التي تعرض قيمة واحدة فقط. على سبيل المثال، يمكنك استخدام تدفق لتلقي تحديثات مباشرة من قاعدة بيانات.

تعتمد التدفقات على الكوروتينات ويمكن أن توفر قيمًا متعددة. من الناحية النظرية، إنّ التدفق هو مصدر بيانات يمكن احتسابه بشكل غير متزامن. يجب أن تكون القيم المنبعثة من النوع نفسه. على سبيل المثال، Flow<Int> هو تدفق يُصدر قيمًا صحيحة.

يتشابه التدفق إلى حد كبير مع Iterator الذي ينتج سلسلة من القيم، ولكنه يستخدم دوال التعليق لإنتاج القيم واستهلاكها بشكل غير متزامن. وهذا يعني أنّ المسار يمكنه مثلاً إرسال طلب شبكة بأمان لإنتاج القيمة التالية بدون حظر سلسلة التعليمات الرئيسية.

هناك ثلاثة كيانات متضمنة في تدفقات البيانات:

  • ينتج المنتج البيانات التي تتم إضافتها إلى مصدر البيانات. بفضل الكوروتينات، يمكن للتدفقات أيضًا إنتاج البيانات بشكل غير متزامن.
  • (اختياري) يمكن للوسطاء تعديل كل قيمة يتم إطلاقها في ساحة المشاركات أو البث نفسه.
  • يستهلك المستهلك القيم من مصدر البيانات.

الكيانات المشاركة في تدفقات البيانات، والمستهلك، والوسطاء الاختياريين، والمنتج
الشكل 1. والجهات المعنيّة بمصادر البيانات، وهي: المستهلك والوسطاء الاختياريين والمنتج.

في نظام Android، يكون المستودع عادةً منتجًا لبيانات واجهة المستخدم التي تحتوي على واجهة المستخدم (UI) التي تعرض البيانات في النهاية. وفي أحيان أخرى، تكون طبقة واجهة المستخدم منتجة لأحداث إدخال المستخدم، وطبقات أخرى من التسلسل الهرمي تستهلكها. تعمل الطبقات الواقعة بين المنتج والمستهلك عادةً كوسطاء يعدّلون تدفق البيانات لضبطه وفقًا لمتطلبات الطبقة التالية.

إنشاء تدفق

لإنشاء التدفقات، استخدم واجهات برمجة التطبيقات لأداة إنشاء التدفق. تنشئ دالة إنشاء flow مسارًا جديدًا يمكنك من خلاله إرسال قيم جديدة يدويًا إلى تدفق البيانات باستخدام الدالة emit.

في المثال التالي، يجلب مصدر البيانات آخر الأخبار تلقائيًا على فترات زمنية ثابتة. نظرًا لأن دالة التعليق لا يمكن أن ترجع عدة قيم متتالية، ينشئ مصدر البيانات تدفقًا ويعيده لتحقيق هذا المطلب. في هذه الحالة، يعمل مصدر البيانات كمنتج.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

يتم تنفيذ أداة إنشاء flow داخل الكوروتين. وبالتالي، تستفيد من واجهات برمجة التطبيقات غير المتزامنة نفسها، ولكن تسري بعض القيود:

  • تكون التدفقات متسلسلة. بما أن المنتِج موجود في الكوروتين، عند استدعاء دالة تعليق، يعلّق المنتج مؤقتًا حتى تُرجع دالة التعليق. في المثال، يتم تعليق المنتج إلى أن يكتمل طلب الشبكة fetchLatestNews. وحينئذٍ فقط يتم إصدار النتيجة إلى ساحة المشاركات.
  • باستخدام أداة إنشاء flow، لا يمكن للناتج emit القيم من CoroutineContext مختلفة. لذلك، لا تطلب emit ضمن CoroutineContext مختلف من خلال إنشاء كوروتين جديد أو استخدام withContext كتلة من الرموز البرمجية. يمكنك في هذه الحالات استخدام أدوات إنشاء تدفقات أخرى مثل callbackFlow.

تعديل ساحة المشاركات

يمكن للوسطاء استخدام عوامل تشغيل متوسطة لتعديل تدفق البيانات بدون استهلاك القيم. هذه العوامل هي دوال، عند تطبيقها على تدفق من البيانات، تعد سلسلة من العمليات التي لا يتم تنفيذها حتى يتم استهلاك القيم في المستقبل. لمزيد من المعلومات حول المشغلات المتوسطة، يُرجى الاطّلاع على المستندات المرجعية للتدفق.

في المثال أدناه، تستخدم طبقة المستودع عامل التشغيل المتوسط map لتحويل البيانات التي سيتم عرضها على View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

يمكن تطبيق المشغلات المتوسطة واحدًا تلو الآخر، لتشكيل سلسلة من العمليات التي يتم تنفيذها ببطء عند إطلاق عنصر في التدفق. يُرجى العلم أنّ تطبيق عامل تشغيل متوسط على التدفق لا يؤدي إلى بدء جمع البيانات.

الجمع من التدفق

استخدِم عامل تشغيل طرفي لتشغيل التدفق لبدء الاستماع إلى القيم. للحصول على جميع القيم في ساحة المشاركات بالشكل المنبعث، استخدِم collect. يمكنك معرفة المزيد من المعلومات حول مشغّلي الوحدات الطرفية في المستندات الرسمية حول التدفق.

بما أنّ collect هي دالة تعليق، يجب تنفيذها داخل كوروتين. تأخذ lambda كمعلمة يتم استدعاؤها على كل قيمة جديدة. ونظرًا لأنها دالة تعليق، قد يتم تعليق الكوروتين الذي يستدعي collect إلى أن يتم إغلاق التدفق.

استكمالاً للمثال السابق، إليك تنفيذًا بسيطًا لـ ViewModel يستخدم البيانات من طبقة المستودع:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

وتؤدي عملية جمع البيانات إلى تشغيل المنتج الذي يحدّث آخر الأخبار ويصدر نتيجة طلب الشبكة على فاصل ثابت. وبما أنّ المنتج يظل نشطًا دائمًا في حلقة while(true)، سيتم إغلاق مصدر البيانات عند محو ViewModel ويتم إلغاء viewModelScope.

يمكن أن يتوقف جمع البيانات للأسباب التالية:

  • يتم إلغاء الكوروتين الذي يجمع، كما هو موضّح في المثال السابق. يؤدي هذا أيضًا إلى إيقاف المنتج الأساسي.
  • ينتهي المنتج من إطلاق العناصر. في هذه الحالة، يتم إغلاق تدفق البيانات ويستأنف الكوروتين الذي يسمى collect التنفيذ.

تكون التدفقات باردة وكسولة ما لم يتم تحديدها مع عوامل تشغيل وسيطة أخرى. هذا يعني أنه يتم تنفيذ كود المنتج في كل مرة يتم استدعاء عامل تشغيل نهائي في التدفق. في المثال السابق، يؤدي وجود عدّة أدوات تجميع للتدفق إلى استرجاع مصدر البيانات لآخر الأخبار عدة مرات على فترات زمنية ثابتة مختلفة. لتحسين التدفق ومشاركته عندما يجمع عدة مستهلكين في الوقت نفسه، استخدِم عامل التشغيل shareIn.

رصد الاستثناءات غير المتوقعة

يمكن أن تأتي عملية تنفيذ المنتج من مكتبة تابعة لجهة خارجية. وهذا يعني أنه قد ينتج عنه استثناءات غير متوقعة. للتعامل مع هذه الاستثناءات، استخدِم عامل التشغيل المتوسط catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

في المثال السابق، عند حدوث استثناء، لا يتم استدعاء lambda collect لعدم تلقّي عنصر جديد.

يمكن لـ "catch" أيضًا إضافة emit عنصر إلى المسار. يمكن لطبقة المستودع النموذجية emit القيم المخزنة مؤقتًا بدلاً من ذلك:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

في هذا المثال، عند حدوث استثناء، يتم استدعاء دالة lambda collect لأنّه تم إرسال عنصر جديد إلى البث بسبب الاستثناء.

التنفيذ في سياق CoroutineContext مختلف

بشكل تلقائي، ينفّذ المنتج الذي أنشأ أداة إنشاء flow في CoroutineContext من الكوروتين الذي يجمع منه، وكما ذكرنا سابقًا، لا يمكنه emit استخدام قيم من CoroutineContext مختلف. وقد يكون هذا السلوك غير مرغوب فيه في بعض الحالات. على سبيل المثال، في الأمثلة المستخدَمة في هذا الموضوع، يجب ألا تُجري طبقة المستودع أي عمليات على Dispatchers.Main التي تستخدمها viewModelScope.

لتغيير CoroutineContext للتدفق، استخدِم عامل التشغيل الوسيط flowOn. تؤدي السمة flowOn إلى تغيير CoroutineContext في المسار الرئيسي، ما يعني أنّ المنتج وأي عوامل تشغيل وسيطة تم تطبيقها قبل (أو أعلى) flowOn. لا يتأثر تدفق عملية نقل البيانات (عوامل التشغيل الوسيطة بعد flowOn بالإضافة إلى المستهلك) ويتم تنفيذه على CoroutineContext المستخدَم في collect من المسار. إذا كان هناك العديد من عوامل تشغيل flowOn، سيغيّر كل عامل تشغيل من عوامل تشغيل الصفحة الرئيسية من موقعها الحالي.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

من خلال هذا الرمز، يستخدم عاملا التشغيل onEach وmap العاملَين defaultDispatcher، في حين يتم تنفيذ عامل التشغيل catch والمستهلك على Dispatchers.Main ويتم استخدامه من خلال viewModelScope.

بما أنّ طبقة مصدر البيانات هي التي تنفّذ أعمال وحدات الإدخال والإخراج، عليك استخدام مُرسِل محسَّن لعمليات الإدخال والإخراج:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

مسارات في مكتبات Jetpack

تم دمج Floodlight في العديد من مكتبات Jetpack، وهو شائع بين مكتبات الجهات الخارجية على Android. يُعد التدفق مناسبًا جدًا لتحديثات البيانات المباشرة وتدفقات البيانات اللانهائية.

يمكنك استخدام التدفق مع الغرفة للإبلاغ بالتغييرات في قاعدة البيانات. عند استخدام كائنات الوصول إلى البيانات (DAO)، يمكنك إرجاع النوع Flow للحصول على تحديثات مباشرة.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

عندما يحدث تغيير في الجدول Example، يتم إصدار قائمة جديدة بالعناصر الجديدة في قاعدة البيانات.

تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات

callbackFlow هي أداة إنشاء مسارات تتيح لك تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات. على سبيل المثال، تستخدم واجهات برمجة تطبيقات Firebase Firestore في Android عمليات معاودة الاتصال.

لتحويل واجهات برمجة التطبيقات هذه إلى تدفقات والاستماع إلى تحديثات قاعدة بيانات Firestore، يمكنك استخدام التعليمة البرمجية التالية:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

بخلاف أداة إنشاء flow، تسمح السمة callbackFlow بإطلاق القيم من CoroutineContext مختلفة باستخدام الدالة send أو خارج الكوروتين من خلال الدالة trySend.

داخليًا، يستخدم callbackFlow قناة، وهي تشبه إلى حدّ كبير قائمة انتظار الحظر. يتم إعداد القناة باستخدام السعة، وهي أقصى عدد من العناصر التي يمكن تخزينها مؤقتًا. إنّ القناة التي تم إنشاؤها في callbackFlow بسعة 64 عنصرًا بشكل تلقائي عندما تحاول إضافة عنصر جديد إلى قناة كاملة، تعلّق send المنتج إلى أن تتوفر مساحة للعنصر الجديد، في حين لا يضيف trySend العنصر إلى القناة ويعرض false على الفور.

تضيف السمة trySend العنصر المحدّد إلى القناة على الفور، وذلك فقط إذا لم يكن المحتوى ينتهك قيود الحدّ الأقصى، ثم يعرض النتيجة الناجحة.

موارد التدفق الإضافية