Eş yordamlarda, yalnızca tek bir değer döndüren askıya alma işlevlerinin aksine, akış, sırayla birden fazla değer iletebilen bir türdür. Örneğin, bir veritabanından canlı güncellemeler almak için bir akış kullanabilirsiniz.
Akışlar eş yordamlar temel alınarak oluşturulur ve birden fazla değer sağlayabilir.
Akış, kavram olarak eşzamansız olarak hesaplanabilen bir veri akışıdır. Yayınlanan değerler aynı türde olmalıdır. Örneğin Flow<Int>
, tam sayı değerleri yayan bir akıştır.
Akış, değer dizisi oluşturan bir Iterator
öğesine çok benzer ancak değerleri eşzamansız olarak üretmek ve tüketmek için askıya alma işlevlerini kullanır. Bu, örneğin, akışın ana iş parçacığını engellemeden sonraki değeri üretmek için güvenli bir şekilde ağ isteğinde bulunabileceği anlamına gelir.
Veri akışlarına dahil olan üç varlık vardır:
- Üretici, akışa eklenen veriler üretir. İlişkiler sayesinde akışlar eşzamansız olarak veri de üretebilir.
- (İsteğe bağlı) Aracılar, akışa veya akışın kendisine yayınlanan her değeri değiştirebilir.
- Tüketici, akıştaki değerleri tüketir.
Android'de depo, genellikle verileri görüntüleyen tüketici olarak kullanıcı arayüzüne (UI) sahip olan kullanıcı arayüzü verilerinin üreticisidir. Bazı durumlarda ise kullanıcı arayüzü katmanı, kullanıcı girişi etkinliklerinin üreticisidir ve hiyerarşinin diğer katmanları bunları tüketir. Üretici ile tüketici arasındaki katmanlar genellikle veri akışını aşağıdaki katmanın gereksinimlerine göre düzenlemek için değiştiren aracılar olarak görev yapar.
Akış oluşturma
Akışlar oluşturmak için akış oluşturucu API'lerini kullanın. flow
oluşturucu işlevi, emit
işlevini kullanarak veri akışına manuel olarak yeni değerler yayınlayabileceğiniz yeni bir akış oluşturur.
Aşağıdaki örnekte bir veri kaynağı en son haberleri sabit aralıklarla otomatik olarak getirmektedir. Askıya alma işlevi ardışık birden çok değer döndüremediğinden, veri kaynağı bu gereksinimi karşılamak için bir akış oluşturur ve döndürür. Bu durumda veri kaynağı, üretici rolünü üstlenir.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
oluşturucu bir eş yordam içinde yürütülür. Bu nedenle, aynı eşzamansız API'lerden yararlanır ancak bazı kısıtlamalar geçerlidir:
- Akışlar sıralıdır. Üretici bir eş yordamda olduğundan, askıya alma işlevi çağrılırken askıya alma işlevi döndürülene kadar üretici askıya alır. Bu örnekte, üretici
fetchLatestNews
ağ isteği tamamlanana kadar askıya alır. Ancak bu şekilde sonuç akışa gönderilir. flow
oluşturucu ile üretici, farklı birCoroutineContext
öğesindenemit
değer alamaz. Bu nedenle, yeni eş yordamlar oluşturarak veyawithContext
kod blokları kullanarakemit
öğesini farklı birCoroutineContext
içinde çağırmayın. Bu durumlarda,callbackFlow
gibi diğer akış oluşturucuları kullanabilirsiniz.
Akışta değişiklik yapma
Aracılar, değerleri kullanmadan veri akışını değiştirmek için ara operatörleri kullanabilir. Bu operatörler, bir veri akışına uygulandığında değerler gelecekte tüketilene kadar çalıştırılmayan bir işlem zinciri oluşturan işlevlerdir. Akış referans belgelerinde ara operatörler hakkında daha fazla bilgi edinebilirsiniz.
Aşağıdaki örnekte depo katmanı, View
üzerinde görüntülenecek verileri dönüştürmek için ara operatörü map
kullanmaktadır:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Ara operatörler art arda uygulanarak akışa bir öğe yayınlandığında geç yürütülen bir işlem zinciri oluşturabilir. Akışa yalnızca ara operatör uygulanmasının akış toplamayı başlatmadığını unutmayın.
Akıştan veri toplama
Değerleri dinlemeye başlamak üzere akışı tetiklemek için bir terminal operatörü kullanın. Akıştaki tüm değerleri yayınlandıkça almak için collect
işlevini kullanın.
Terminal operatörleri hakkında daha fazla bilgiyi resmi akış belgelerinde bulabilirsiniz.
collect
bir askıya alma işlevi olduğundan, bir eş yordam içinde yürütülmelidir. Her yeni değerde çağrılan bir lambdayı parametre olarak alır. Bu bir askıya alma işlevi olduğundan collect
çağıran eş yordam, akış kapatılana kadar askıya alabilir.
Önceki örnekten devam edersek, depo katmanındaki verileri tüketen bir ViewModel
basit uygulaması şu şekildedir:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Akışın toplanması, en son haberleri yenileyen ve ağ isteğinin sonucunu sabit aralıklarla yayınlayan yapımcıyı tetikler. Üretici, while(true)
döngüsüyle her zaman etkin kaldığı için ViewModel temizlendiğinde ve viewModelScope
iptal edildiğinde veri akışı kapatılır.
Akış toplama aşağıdaki nedenlerle durdurulabilir:
- Toplanan eş yordam, önceki örnekte gösterildiği gibi iptal edilir. Bu işlem, temel yapımcıyı da durdurur.
- Üretici ürün yayınlamayı bitirir. Bu durumda veri akışı kapatılır ve
collect
adlı eş yordam yürütmeye devam eder.
Akışlar, diğer ara operatörlerle belirtilmedikçe soğuk ve geçen niteliktedir. Bu, akışta bir terminal operatörü her çağrıldığında üretici kodunun yürütüleceği anlamına gelir. Önceki örnekte, birden fazla akış toplayıcınızın olması, veri kaynağının en son haberleri farklı sabit aralıklarla birden çok kez getirmesine neden olur. Birden fazla tüketici aynı anda veri topladığında bir akışı optimize etmek ve paylaşmak için shareIn
operatörünü kullanın.
Beklenmedik istisnaları yakalama
Yapımcının uygulaması üçüncü taraf bir kitaplıktan gelebilir.
Bu da beklenmedik istisnalar atabileceği anlamına gelir. Bu istisnaları yönetmek için catch
ara operatörünü kullanın.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Önceki örnekte, bir istisna oluştuğunda yeni bir öğe alınmadığı için collect
lambda çağrılmaz.
Ayrıca catch
, akışa emit
öğe de ekleyebilir. Örnek depo katmanı, bunun yerine önbelleğe alınan değerleri emit
sağlayabilir:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
Bu örnekte, bir istisna oluştuğunda istisna nedeniyle akışa yeni bir öğe dağıtıldığı için collect
lambda çağrılır.
Farklı bir CoroutineContext'te yürütme
Varsayılan olarak flow
oluşturucunun üreticisi, kendisinden toplanan eşin CoroutineContext
içinde yürütülür ve daha önce de belirtildiği gibi, farklı bir CoroutineContext
öğesinden değerler emit
alamaz. Bazı durumlarda bu istenmeyen bir davranış olabilir.
Örneğin, bu konu boyunca kullanılan örneklerde depo katmanı, viewModelScope
tarafından kullanılan Dispatchers.Main
üzerinde işlem yapmamalıdır.
Bir akışın CoroutineContext
değerini değiştirmek için flowOn
ara operatörünü kullanın.
flowOn
, yukarı akış akışının CoroutineContext
değerini değiştirir. Bu değer, flowOn
öncesinde (veya sonrasında) uygulanan üretici ve ara operatörlerin olduğu anlamına gelir. Aşağı akış (tüketiciyle birlikte flowOn
'dan sonra ara operatörler) etkilenmez ve akıştan collect
için kullanılan CoroutineContext
üzerinde yürütülür. Birden fazla flowOn
operatörü varsa her biri yukarı akışı geçerli konumundan değiştirir.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Bu kodla onEach
ve map
operatörleri defaultDispatcher
kullanırken catch
operatörü ve tüketici, viewModelScope
tarafından kullanılan Dispatchers.Main
üzerinde yürütülür.
Veri kaynağı katmanı G/Ç işlemleri yaparken, G/Ç işlemleri için optimize edilmiş bir görev dağıtıcı kullanmanız gerekir:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack kitaplıklarındaki akışlar
Flow, birçok Jetpack kitaplığına entegredir ve Android üçüncü taraf kitaplıkları arasında popülerdir. Akış, canlı veri güncellemeleri ve sonsuz veri akışı için idealdir.
Bir veritabanındaki değişikliklerle ilgili bildirim almak için Oda ile Akış'ı kullanabilirsiniz. Veri erişim nesnelerini (DAO) kullanırken canlı güncellemeleri almak için bir Flow
türü döndürün.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Example
tablosunda her değişiklik olduğunda, veritabanındaki yeni öğelerle yeni bir liste yayınlanır.
Geri çağırmaya dayalı API'leri akışlara dönüştürün
callbackFlow
, geri çağırmaya dayalı API'leri akışlara dönüştürmenizi sağlayan bir akış oluşturucudur.
Örneğin, Firebase Firestore Android API'leri geri çağırma kullanır.
Bu API'leri akışlara dönüştürmek ve Firestore veritabanı güncellemelerini dinlemek için aşağıdaki kodu kullanabilirsiniz:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
flow
oluşturucunun aksine callbackFlow
, değerlerin send
işlevine sahip farklı bir CoroutineContext
'den veya trySend
işlevine sahip bir eş değer dışında dağıtılmasına izin verir.
callbackFlow
şirket içinde, kavramsal olarak engelleme sırasına çok benzeyen bir kanal kullanır.
Bir kanal, arabelleğe alınabilecek maksimum öğe sayısı olan kapasite ile yapılandırılır. callbackFlow
ürününde oluşturulan kanalın varsayılan kapasitesi 64 öğedir. Tam kanala yeni bir öğe eklemeye çalıştığınızda send
, yeni öğe için alan açılıncaya kadar yapımcıyı askıya alır. Öte yandan, offer
öğeyi kanala eklemez ve hemen false
değerini döndürür.
Ek akış kaynakları
- Android'de Kotlin akışlarını test etme
StateFlow
veSharedFlow
- Kotlin eş yordamları ve akışı için ek kaynaklar