تدفق Kotlin على نظام Android

في الكوروتينات، نوع التدفق هو نوع يمكنه إصدار قيم متعددة بالتتابع، على عكس دوال التعليق التي تعرض فقط قيمة واحدة. على سبيل المثال، يمكنك استخدام مسار لتلقّي محتوى تحديثات من قاعدة بيانات.

تعتمد التدفقات على الكوروتينات ويمكن أن توفر قيمًا متعددة. التدفق من الناحية النظرية هو مصدر بيانات يمكن احتسابه. بشكل غير متزامن. يجب أن تكون القيم المنبعثة من النوع نفسه. بالنسبة Flow<Int> هو تدفق يُصدر عددًا صحيحًا.

يتشابه التدفق إلى حد كبير مع Iterator الذي يُنتج سلسلة من ولكنها تستخدم دوال التعليق لإنتاج القيم واستهلاكها بشكل غير متزامن. وهذا يعني، على سبيل المثال، أن التدفق يمكن أن يجعل لإنتاج القيمة التالية دون حظر الصفحة الرئيسية .

هناك ثلاثة كيانات متضمنة في تدفقات البيانات:

  • ينتج المنتج البيانات التي تتم إضافتها إلى مصدر البيانات. بفضل الكوروتينات، يمكن أن تؤدي التدفقات أيضًا إلى إنتاج البيانات بشكل غير متزامن.
  • (اختياري) يمكن للوسطاء تعديل كل قيمة يتم إطلاقها في البث أو البث نفسه
  • يستهلك المستهلك القيم من مصدر البيانات.

والجهات المشتركة في تدفقات البيانات المستهلك، اختياري
              والوسطاء والمنتج
الشكل 1. الكيانات المشاركة في تدفقات البيانات: المستهلك والوسطاء الاختياريين والمنتج.

في نظام Android، يتم إنشاء المستودع منتج عادةً لبيانات واجهة المستخدم التي تحتوي على واجهة المستخدم (UI) تعرض البيانات في النهاية. في أحيان أخرى، تكون طبقة واجهة المستخدم هي منتج وتستهلكها الأحداث التي يُدخلها المستخدم والطبقات الأخرى في التسلسل الهرمي. طبقات في بين المنتج والمستهلك عادةً كوسطاء يقومون بتعديل تدفق البيانات لضبطه وفقًا لمتطلبات الطبقة التالية.

إنشاء تدفق

لإنشاء تدفقات، استخدم أداة إنشاء التدفق واجهات برمجة التطبيقات. تنشئ دالة إنشاء flow مسارًا جديدًا يمكنك فيه يدويًا تنبعث قيمًا جديدة في تدفق البيانات باستخدام emit الأخرى.

في المثال التالي، يسترجع مصدر البيانات آخر الأخبار تلقائيًا على فترات زمنية ثابتة. كدالة تعليق، لا يمكن بإرجاع عدة قيم متتالية، فإن مصدر البيانات ينشئ ويرجع تدفقًا لتحقيق هذا المطلب. في هذه الحالة، يعمل مصدر البيانات كمنتج.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

يتم تنفيذ أداة إنشاء flow داخل الكوروتين. وبالتالي، فإنه يستفيد من واجهات برمجة التطبيقات غير المتزامنة نفسها، ولكن تنطبق بعض القيود:

  • تكون التدفقات متسلسلة. بما أن المنتِج موجود في الكوروتين، فعند استدعاء دالة التعليق، يتم تعليق المنتج حتى دالة التعليق وإرجاعه. في المثال، يعلّق المنتج المنتج حتى fetchLatestNews يكتمل طلب الشبكة. وحينئذٍ فقط يتم إصدار النتيجة إلى ساحة المشاركات.
  • باستخدام أداة إنشاء flow، لا يمكن للناتج emit قيمة من مختلفة CoroutineContext. لذلك، لا تستدعي emit بعد CoroutineContext عن طريق إنشاء كوروتين جديد أو باستخدام withContext مجموعات الرموز البرمجية. يمكنك استخدام منصات إنشاء التدفق الأخرى مثل callbackFlow في هذه الحالات.

تعديل ساحة المشاركات

يمكن للوسطاء استخدام عوامل تشغيل متوسطة لتعديل تدفق البيانات دون استهلاك القيم. هذه العوامل هي دوال، عندما تطبيقها على تدفق من البيانات، وإعداد سلسلة من العمليات غير حتى يتم استهلاك القيم في المستقبل. مزيد من المعلومات حول المشغلات الوسيطة في المستندات المرجعية للتدفق

في المثال أدناه، تستخدم طبقة المستودع المشغل المتوسط map لتحويل البيانات التي سيتم عرضها على View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

يمكن تطبيق العوامل المتوسطة واحدًا تلو الآخر، لتكوين سلسلة العمليات التي يتم تنفيذها ببطء عند إطلاق عنصر إلى التدفق. تجدر الإشارة إلى أنّ تطبيق عامل تشغيل متوسط على البث المباشر عدم بدء جمع التدفق.

الجمع من التدفق

استخدِم عامل تشغيل طرفي لتشغيل المسار لبدء الاستماع القيم. للحصول على جميع القيم في ساحة المشاركات بالشكل المنبعث، استخدم collect يمكنك التعرف على مزيد من المعلومات حول مشغلي الوحدات الطرفية في المستندات الرسمية حول التدفق

بما أنّ collect هي دالة تعليق، يجب تنفيذها ضمن كوروتين. تستخدم lambda كمعلمة يتم استدعاؤها على كل قيمة جديدة. ونظرًا لأنها دالة تعليق، فإن الكوروتين قد يتم تعليق المكالمات collect حتى يتم إغلاق التدفق.

استكمالاً للمثال السابق، إليك تنفيذًا بسيطًا ViewModel يستخدم البيانات من طبقة المستودع:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

عملية جمع المعلومات تحفّز المنتج على تحديث آخر الأخبار وينتج عن طلب الشبكة على فاصل ثابت. نظرًا لأن العمود يبقى المنتج نشطًا دائمًا في حلقة while(true)، ويمثّل البث من البيانات سيتم إغلاقها عند محو ViewModel تم إلغاء viewModelScope.

يمكن أن يتوقف جمع البيانات للأسباب التالية:

  • يتم إلغاء الكوروتين الذي يجمع، كما هو موضّح في المثال السابق. يؤدي هذا أيضًا إلى إيقاف المنتج الأساسي.
  • ينتهي المنتج من إطلاق العناصر. في هذه الحالة، فإن تدفق البيانات مغلقة ويستأنف الكوروتين الذي يُسمى collect التنفيذ.

التدفقات باردة وكسولة ما لم يتم تحديدها مع وسيط آخر والمشغلات. يعني هذا أنه يتم تنفيذ رمز المنتج في كل مرة المشغل الطرفي على التدفق. في المثال السابق، فإن وجود عدة جامعات للتدفقات يؤدي إلى قيام مصدر البيانات بجلب آخر الأخبار عدة مرات على فترات زمنية ثابتة مختلفة. لتحسين في التدفق عندما يجمع عدة مستهلكين في الوقت نفسه، shareIn.

رصد الاستثناءات غير المتوقعة

يمكن أن تأتي عملية تنفيذ المنتج من مكتبة تابعة لجهة خارجية. وهذا يعني أنه قد ينتج عنه استثناءات غير متوقعة. للتعامل مع هذه الأمور باستثناءات، يمكنك استخدام catch الوسيط المتوسط.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

في المثال السابق، عند حدوث استثناء، لا يتم تعديل السمة collect لم يتم استدعاء lambda، نظرًا لعدم استلام عنصر جديد.

يمكن لـ "catch" أيضًا إضافة emit عنصر إلى المسار. مثال على المستودع يمكن أن emit القيم المخزنة مؤقتًا بدلاً من ذلك:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

في هذا المثال، عند حدوث استثناء، تكون دالة lambda collect هي لأنّه تم إطلاق عنصر جديد إلى البث بسبب .

التنفيذ في سياق CoroutineContext مختلف

بشكل تلقائي، ينفذ منتج أداة إنشاء flow في CoroutineContext من الكوروتين الذي يجمع منه، وكذلك التي ذكرناها سابقًا، لا يمكنها emit من القيم CoroutineContext وقد يكون هذا السلوك غير مرغوب فيه في بعض الحالات. فعلى سبيل المثال، في الأمثلة المستخدمة خلال هذا الموضوع، يمثل المستودع طبقة معينة من المفترض ألا تقوم بإجراء عمليات على Dispatchers.Main والتي ويستخدمه viewModelScope.

لتغيير CoroutineContext للتدفق، استخدم عامل التشغيل الوسيط flowOn يغيِّر flowOn CoroutineContext في التدفّق الرئيسي، ما يعني الشركة المصنّعة وأي شركات تشغيل وسيطة تم تطبيقها قبل (أو أعلى) flowOn تدفق التدفق (عوامل التشغيل الوسيطة بعد flowOn مع المستهلك) لا تتأثر ويتم تنفيذه في تم استخدام CoroutineContext على collect من المسار. إذا كانت هناك عدة عوامل تشغيل flowOn، يؤدي كل عامل منها إلى تغيير الجزء الرئيسي من الصفحة موقعك الجغرافي الحالي.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

ومن خلال هذا الرمز، يستخدم العاملان onEach وmap العاملَين defaultDispatcher بينما يتم تنفيذ عامل التشغيل catch والمستهلك على تم استخدام "Dispatchers.Main" من قِبل "viewModelScope".

يجب استخدام مرسِل لأنّ طبقة مصدر البيانات تقوم بعمل الإدخال والإخراج. بحيث يتوافق مع عمليات وحدات الإدخال والإخراج:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

مسارات في مكتبات Jetpack

تم دمج التدفق في العديد من مكتبات Jetpack، وهو شائع بين مكتبات الجهات الخارجية على Android يُعد التدفق مناسبًا جدًا لتحديثات البيانات المباشرة ومصادر لا حصر لها من البيانات.

يمكنك استخدام Flow with Room ليتم إشعارك بالتغييرات التي تحدث في قاعدة البيانات. عند استخدام كائنات الوصول إلى البيانات (DAO)، عرض النوع Flow للحصول على تحديثات مباشرة.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

كلما حدث تغيير في جدول Example، يتم إصدار قائمة جديدة. بالعناصر الجديدة في قاعدة البيانات.

تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات

callbackFlow أداة إنشاء التدفق تتيح لك تحويل واجهات برمجة التطبيقات القائمة على معاودة الاتصال إلى مسارات. على سبيل المثال، يوفّر Firebase Firestore تستخدم واجهات برمجة تطبيقات Android عمليات معاودة الاتصال.

لتحويل واجهات برمجة التطبيقات هذه إلى تدفقات والاستماع إلى تحديثات قاعدة بيانات Firestore، يمكنك يمكنك استخدام التعليمة البرمجية التالية:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

على عكس أداة إنشاء flow، callbackFlow وتسمح بانبعاثات القيم من CoroutineContext مختلفة مع send أو خارج الكوروتين مع trySend الأخرى.

داخليًا، يستخدم callbackFlow channel وهو من الناحية النظرية يشبه إلى حد كبير قائمة انتظار. يتم إعداد القناة باستخدام السعة، وهو الحد الأقصى لعدد العناصر التي يمكن تخزينها مؤقتًا. القناة التي تم إنشاؤها في callbackFlow تستخدم إعدادًا تلقائيًا سعة 64 عنصرًا. عندما تحاول إضافة عنصر جديد إلى قناة، تعلّق send المنتج حتى تتوفر مساحة لإجراء العنصر، بينما لا تضيف trySend العنصر إلى القناة وتعرض false فورًا.

يضيف trySend العنصر المحدد إلى القناة على الفور، فقط إذا كان ذلك لا ينتهك قيود السعة، ثم يعرض نتيجة ناجحة.

موارد التدفق الإضافية