في الكوروتينات، نوع التدفق هو نوع يمكنه إصدار قيم متعددة بالتتابع، على عكس دوال التعليق التي تعرض فقط قيمة واحدة. على سبيل المثال، يمكنك استخدام مسار لتلقّي محتوى تحديثات من قاعدة بيانات.
تعتمد التدفقات على الكوروتينات ويمكن أن توفر قيمًا متعددة.
التدفق من الناحية النظرية هو مصدر بيانات يمكن احتسابه.
بشكل غير متزامن. يجب أن تكون القيم المنبعثة من النوع نفسه. بالنسبة
Flow<Int>
هو تدفق يُصدر عددًا صحيحًا.
يتشابه التدفق إلى حد كبير مع Iterator
الذي يُنتج سلسلة من
ولكنها تستخدم دوال التعليق لإنتاج القيم واستهلاكها
بشكل غير متزامن. وهذا يعني، على سبيل المثال، أن التدفق يمكن أن يجعل
لإنتاج القيمة التالية دون حظر الصفحة الرئيسية
.
هناك ثلاثة كيانات متضمنة في تدفقات البيانات:
- ينتج المنتج البيانات التي تتم إضافتها إلى مصدر البيانات. بفضل الكوروتينات، يمكن أن تؤدي التدفقات أيضًا إلى إنتاج البيانات بشكل غير متزامن.
- (اختياري) يمكن للوسطاء تعديل كل قيمة يتم إطلاقها في البث أو البث نفسه
- يستهلك المستهلك القيم من مصدر البيانات.
في نظام Android، يتم إنشاء المستودع منتج عادةً لبيانات واجهة المستخدم التي تحتوي على واجهة المستخدم (UI) تعرض البيانات في النهاية. في أحيان أخرى، تكون طبقة واجهة المستخدم هي منتج وتستهلكها الأحداث التي يُدخلها المستخدم والطبقات الأخرى في التسلسل الهرمي. طبقات في بين المنتج والمستهلك عادةً كوسطاء يقومون بتعديل تدفق البيانات لضبطه وفقًا لمتطلبات الطبقة التالية.
إنشاء تدفق
لإنشاء تدفقات، استخدم
أداة إنشاء التدفق
واجهات برمجة التطبيقات. تنشئ دالة إنشاء flow
مسارًا جديدًا يمكنك فيه يدويًا
تنبعث قيمًا جديدة في تدفق البيانات باستخدام
emit
الأخرى.
في المثال التالي، يسترجع مصدر البيانات آخر الأخبار تلقائيًا على فترات زمنية ثابتة. كدالة تعليق، لا يمكن بإرجاع عدة قيم متتالية، فإن مصدر البيانات ينشئ ويرجع تدفقًا لتحقيق هذا المطلب. في هذه الحالة، يعمل مصدر البيانات كمنتج.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
يتم تنفيذ أداة إنشاء flow
داخل الكوروتين. وبالتالي، فإنه يستفيد
من واجهات برمجة التطبيقات غير المتزامنة نفسها، ولكن تنطبق بعض القيود:
- تكون التدفقات متسلسلة. بما أن المنتِج موجود في الكوروتين، فعند استدعاء
دالة التعليق، يتم تعليق المنتج حتى دالة التعليق
وإرجاعه. في المثال، يعلّق المنتج المنتج حتى
fetchLatestNews
يكتمل طلب الشبكة. وحينئذٍ فقط يتم إصدار النتيجة إلى ساحة المشاركات. - باستخدام أداة إنشاء
flow
، لا يمكن للناتجemit
قيمة من مختلفةCoroutineContext
. لذلك، لا تستدعيemit
بعدCoroutineContext
عن طريق إنشاء كوروتين جديد أو باستخدامwithContext
مجموعات الرموز البرمجية. يمكنك استخدام منصات إنشاء التدفق الأخرى مثلcallbackFlow
في هذه الحالات.
تعديل ساحة المشاركات
يمكن للوسطاء استخدام عوامل تشغيل متوسطة لتعديل تدفق البيانات دون استهلاك القيم. هذه العوامل هي دوال، عندما تطبيقها على تدفق من البيانات، وإعداد سلسلة من العمليات غير حتى يتم استهلاك القيم في المستقبل. مزيد من المعلومات حول المشغلات الوسيطة في المستندات المرجعية للتدفق
في المثال أدناه، تستخدم طبقة المستودع المشغل المتوسط
map
لتحويل البيانات التي سيتم عرضها على View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
يمكن تطبيق العوامل المتوسطة واحدًا تلو الآخر، لتكوين سلسلة العمليات التي يتم تنفيذها ببطء عند إطلاق عنصر إلى التدفق. تجدر الإشارة إلى أنّ تطبيق عامل تشغيل متوسط على البث المباشر عدم بدء جمع التدفق.
الجمع من التدفق
استخدِم عامل تشغيل طرفي لتشغيل المسار لبدء الاستماع
القيم. للحصول على جميع القيم في ساحة المشاركات بالشكل المنبعث، استخدم
collect
يمكنك التعرف على مزيد من المعلومات حول مشغلي الوحدات الطرفية في
المستندات الرسمية حول التدفق
بما أنّ collect
هي دالة تعليق، يجب تنفيذها ضمن
كوروتين. تستخدم lambda كمعلمة يتم استدعاؤها على
كل قيمة جديدة. ونظرًا لأنها دالة تعليق، فإن الكوروتين
قد يتم تعليق المكالمات collect
حتى يتم إغلاق التدفق.
استكمالاً للمثال السابق، إليك تنفيذًا بسيطًا
ViewModel
يستخدم البيانات من طبقة المستودع:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
عملية جمع المعلومات تحفّز المنتج على تحديث آخر الأخبار
وينتج عن طلب الشبكة على فاصل ثابت. نظرًا لأن العمود
يبقى المنتج نشطًا دائمًا في حلقة while(true)
، ويمثّل البث
من البيانات سيتم إغلاقها عند محو ViewModel
تم إلغاء viewModelScope
.
يمكن أن يتوقف جمع البيانات للأسباب التالية:
- يتم إلغاء الكوروتين الذي يجمع، كما هو موضّح في المثال السابق. يؤدي هذا أيضًا إلى إيقاف المنتج الأساسي.
- ينتهي المنتج من إطلاق العناصر. في هذه الحالة، فإن تدفق البيانات
مغلقة ويستأنف الكوروتين الذي يُسمى
collect
التنفيذ.
التدفقات باردة وكسولة ما لم يتم تحديدها مع وسيط آخر
والمشغلات. يعني هذا أنه يتم تنفيذ رمز المنتج في كل مرة
المشغل الطرفي على التدفق. في المثال السابق،
فإن وجود عدة جامعات للتدفقات يؤدي إلى قيام مصدر البيانات بجلب
آخر الأخبار عدة مرات على فترات زمنية ثابتة مختلفة. لتحسين
في التدفق عندما يجمع عدة مستهلكين في الوقت نفسه،
shareIn
.
رصد الاستثناءات غير المتوقعة
يمكن أن تأتي عملية تنفيذ المنتج من مكتبة تابعة لجهة خارجية.
وهذا يعني أنه قد ينتج عنه استثناءات غير متوقعة. للتعامل مع هذه الأمور
باستثناءات، يمكنك استخدام
catch
الوسيط المتوسط.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
في المثال السابق، عند حدوث استثناء، لا يتم تعديل السمة collect
لم يتم استدعاء lambda، نظرًا لعدم استلام عنصر جديد.
يمكن لـ "catch
" أيضًا إضافة emit
عنصر إلى المسار. مثال على المستودع
يمكن أن emit
القيم المخزنة مؤقتًا بدلاً من ذلك:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
في هذا المثال، عند حدوث استثناء، تكون دالة lambda collect
هي
لأنّه تم إطلاق عنصر جديد إلى البث بسبب
.
التنفيذ في سياق CoroutineContext مختلف
بشكل تلقائي، ينفذ منتج أداة إنشاء flow
في
CoroutineContext
من الكوروتين الذي يجمع منه، وكذلك
التي ذكرناها سابقًا، لا يمكنها emit
من القيم
CoroutineContext
وقد يكون هذا السلوك غير مرغوب فيه في بعض الحالات.
فعلى سبيل المثال، في الأمثلة المستخدمة خلال هذا الموضوع، يمثل المستودع
طبقة معينة من المفترض ألا تقوم بإجراء عمليات على Dispatchers.Main
والتي
ويستخدمه viewModelScope
.
لتغيير CoroutineContext
للتدفق، استخدم عامل التشغيل الوسيط
flowOn
يغيِّر flowOn
CoroutineContext
في التدفّق الرئيسي، ما يعني
الشركة المصنّعة وأي شركات تشغيل وسيطة تم تطبيقها قبل (أو أعلى)
flowOn
تدفق التدفق (عوامل التشغيل الوسيطة بعد flowOn
مع المستهلك) لا تتأثر ويتم تنفيذه في
تم استخدام CoroutineContext
على collect
من المسار. إذا كانت هناك
عدة عوامل تشغيل flowOn
، يؤدي كل عامل منها إلى تغيير الجزء الرئيسي من الصفحة
موقعك الجغرافي الحالي.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
ومن خلال هذا الرمز، يستخدم العاملان onEach
وmap
العاملَين defaultDispatcher
بينما يتم تنفيذ عامل التشغيل catch
والمستهلك على
تم استخدام "Dispatchers.Main
" من قِبل "viewModelScope
".
يجب استخدام مرسِل لأنّ طبقة مصدر البيانات تقوم بعمل الإدخال والإخراج. بحيث يتوافق مع عمليات وحدات الإدخال والإخراج:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
مسارات في مكتبات Jetpack
تم دمج التدفق في العديد من مكتبات Jetpack، وهو شائع بين مكتبات الجهات الخارجية على Android يُعد التدفق مناسبًا جدًا لتحديثات البيانات المباشرة ومصادر لا حصر لها من البيانات.
يمكنك استخدام
Flow with Room
ليتم إشعارك بالتغييرات التي تحدث في قاعدة البيانات. عند استخدام
كائنات الوصول إلى البيانات (DAO)،
عرض النوع Flow
للحصول على تحديثات مباشرة.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
كلما حدث تغيير في جدول Example
، يتم إصدار قائمة جديدة.
بالعناصر الجديدة في قاعدة البيانات.
تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات
callbackFlow
أداة إنشاء التدفق تتيح لك تحويل واجهات برمجة التطبيقات القائمة على معاودة الاتصال إلى مسارات.
على سبيل المثال، يوفّر Firebase Firestore
تستخدم واجهات برمجة تطبيقات Android عمليات معاودة الاتصال.
لتحويل واجهات برمجة التطبيقات هذه إلى تدفقات والاستماع إلى تحديثات قاعدة بيانات Firestore، يمكنك يمكنك استخدام التعليمة البرمجية التالية:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
على عكس أداة إنشاء flow
، callbackFlow
وتسمح بانبعاثات القيم من CoroutineContext
مختلفة مع
send
أو خارج الكوروتين مع
trySend
الأخرى.
داخليًا، يستخدم callbackFlow
channel
وهو من الناحية النظرية يشبه إلى حد كبير
قائمة انتظار.
يتم إعداد القناة باستخدام السعة، وهو الحد الأقصى لعدد العناصر
التي يمكن تخزينها مؤقتًا. القناة التي تم إنشاؤها في callbackFlow
تستخدم إعدادًا تلقائيًا
سعة 64 عنصرًا. عندما تحاول إضافة عنصر جديد إلى
قناة، تعلّق send
المنتج حتى تتوفر مساحة لإجراء
العنصر، بينما لا تضيف trySend
العنصر إلى القناة وتعرض
false
فورًا.
يضيف trySend
العنصر المحدد إلى القناة على الفور،
فقط إذا كان ذلك لا ينتهك قيود السعة، ثم يعرض
نتيجة ناجحة.
موارد التدفق الإضافية
- اختبار مسارات Kotlin على Android
StateFlow
وSharedFlow
- مراجع إضافية حول الكوروتينات في لغة Kotlin وتدفق البيانات