Eş yordamlarda akış, birden fazla değer yayımlayabilen bir türdür askıya alma işlevlerine kıyasla sıralı olarak ekleyebilirsiniz. Örneğin, canlı yayınları almak için otomatik olarak eklenir.
Akışlar eş yordamların üzerine kuruludur ve birden fazla değer sağlayabilir.
Akış, kavramsal olarak hesaplanabilen bir veri akışıdır
eşzamansız olarak ayarlayabilirsiniz. Yayınlanan değerler aynı türde olmalıdır. Örneğin,
Örneğin, Flow<Int>
tam sayı değerleri yayan bir akıştır.
Akış, bir dizi üreten Iterator
ile çok benzerdir.
değerleri üretmek ve kullanmak için askıya alma işlevlerini kullanır
eşzamansız olarak ayarlayabilirsiniz. Bu da örneğin, akışın güvenli bir şekilde
ağ isteği.
ileti dizisi.
Veri akışlarına dahil olan üç varlık vardır:
- Üretici, akışa eklenen veriler oluşturur. Teşekkürler eş yordamlar, akışlar verileri eşzamansız olarak da üretebilir.
- (İsteğe bağlı) Aracılar, akış veya akışın kendisi olabilir.
- Tüketiciler, akıştaki değerleri tüketir.
Android'de depo, genellikle tüketici olarak kullanıcı arayüzüne (UI) sahip bir kullanıcı arayüzü verileri üreticisi verileri gösteren bir satırdır. Bazı durumlarda ise kullanıcı arayüzü katmanı Kullanıcı giriş etkinlikleri ve hiyerarşinin diğer katmanları bunları tüketir. Katman: genellikle, üretici ve tüketici arasındaki yerini alan aracılar, ayarlamak için veri akışı kullanır.
Akış oluşturma
Akış oluşturmak için
akış oluşturucu
API'ler. flow
oluşturucu işlevi, manuel olarak yapabileceğiniz yeni bir akış oluşturur.
emit
işlevini kullanın.
Aşağıdaki örnekte, bir veri kaynağı en son haberleri getirir ve belirli bir arada otomatik olarak yapılır. Askıya alma işlevi birden çok ardışık değer döndürdüğünde, veri kaynağı aynı değeri bir akış şeması oluşturabilirsiniz. Bu durumda veri kaynağı, sizin görevinizdir.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
oluşturucu, bir eş yordam içinde yürütüldü. Böylelikle
aynı eşzamansız API'lerden yayınlanır ancak bazı kısıtlamalar geçerlidir:
- Akışlar sıralıdır. Yapımcı bir eş yordamdayken
bir askıya alma işlevi söz konusuysa üretici, askıya alma işlevi
belirtir. Bu örnekte yapımcı,
fetchLatestNews
tarihine kadar askıya alınır. ağ isteği tamamlanır. Sonuç, ancak sonrasında akışa iletilir. flow
oluşturucuyla, yapımcıemit
farklıCoroutineContext
. Bu nedenle,emit
öğesini farklı bir Yeni eş yordamlar oluşturarak veyawithContext
kullanarakCoroutineContext
gibi diyelim. Ayrıca, Google Etiket Yöneticisi vecallbackFlow
durumlarda işe yarıyor.
Akışı değiştirme
Aracılar, akışı değiştirmek için ara operatörler kullanabilir. tükettiğini düşünelim. Bu operatörler, veya uygulanmadığı bir işlem zinciri oluşturun, bu şekilde devam eder. Daha fazla bilgi: ve ara operatörlerle Akış referans belgeleri.
Aşağıdaki örnekte, depo katmanı ara operatörü kullanır
map
View
üzerinde görüntülenecek verileri dönüştürmek için:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Ara operatörler birbiri ardına uygulanarak bir zincir oluşturabilir bir öğe, akışı sağlar. Akışa yalnızca ara operatör uygulamanın veri toplama işlemini başlatmaz.
Akıştan veri toplama
Akışı, dinlemeye başlamak üzere tetiklemek için bir terminal operatörü kullanın
değerler. Akıştaki tüm değerleri yayınlandıkça almak için
collect
.
Terminal operatörleri hakkında daha fazla bilgiyi
resmi akış dokümanlarını takip edin.
collect
bir askıya alma işlevi olduğundan,
eş yordam. Bir lambda parametresi olarak
dahil edin. Bu bir askıya alma fonksiyonu olduğundan,
collect
çağrıları, akış kapatılana kadar askıya alınabilir.
Önceki örnekten devam edersek, Google Etiket Yöneticisi'nin
depo katmanındaki verileri kullanan bir ViewModel
:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Akışın toplanması, en son haberleri yenileyen yapımcıyı tetikler.
ve ağ isteğinin sonucunu sabit bir aralıkta yayar.
yapımcı, while(true)
döngüsüyle her zaman etkin
ViewModel temizlendiğinde ve veri türü temizlendiğinde kapatılacaktır.
viewModelScope
iptal edildi.
Akış toplama aşağıdaki nedenlerle durdurulabilir:
- Bir önceki örnekte gösterildiği gibi, toplanan eş yordam iptal edildi. Bu işlem, ana yapımcıyı da durdurur.
- Üretici, öğeleri yayınlamayı bitirir. Bu durumda veri akışı
kapatılır ve
collect
adlı eş yordam yürütme işlemini devam ettirir.
Başka bir ara öğeyle belirtilmediği sürece akışlar soğuk ve geçtir
işleci. Bu, üretici kodunun, çalıştırılan her bir
terminal operatörü akışta çağrılır. Önceki örnekte,
birden fazla akış toplayıcıya sahip olmak, veri kaynağının
en son haberleri farklı sabit aralıklarla birden çok kez yayınlayabilirsiniz. Optimizasyon ve
Aynı anda birden fazla tüketici veri topladığında bir akış paylaşır,
shareIn
operatörü.
Beklenmeyen istisnaları yakalama
Üretici uygulaması, üçüncü taraf kitaplığından gelebilir.
Bu, beklenmedik istisnalar oluşturabileceği anlamına gelir. Bunları ele almak için:
istisnalar varsa
catch
ara operatör.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Yukarıdaki örnekte bir istisna oluştuğunda collect
Yeni bir öğe alınmadığından lambda çağrılmaz.
catch
, akışa emit
öğe de ekleyebilir. Örnek depo
katmanı, bunun yerine önbelleğe alınan değerleri emit
:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
Bu örnekte, bir istisna oluştuğunda collect
lambda
kabul edersiniz.
Farklı bir CoroutineContext'te yürütme
Varsayılan olarak, flow
oluşturucunun üreticisi
Kendisinden toplanan eş yordamın CoroutineContext
ve
daha önce belirtildiği gibi, farklı bir değerden emit
değerleri alamaz.
CoroutineContext
. Bazı durumlarda bu istenmeyen bir davranış olabilir.
Örneğin, bu konu genelinde kullanılan örneklerde depo
katmanının Dispatchers.Main
üzerinde şu işlemleri gerçekleştirmemesi gerekir:
viewModelScope
tarafından kullanılıyor.
Bir akışın CoroutineContext
değerini değiştirmek için ara operatörü kullanın
flowOn
flowOn
, yukarı akış akışının CoroutineContext
değerini değiştirir. Bu,
yapımcı ve önce (veya üstü) uygulanan ara operatörler
flowOn
Aşağı akış akışı (flowOn
tarihinden sonra ara operatörler)
tüketiciyle birlikte) bu durumdan etkilenmez ve müşteriyle
CoroutineContext
akıştan collect
olarak kullanıldı. Varsa
birden fazla flowOn
operatörü, her biri yukarı akış işlemini
geçerli konum.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Bu kodla, onEach
ve map
operatörleri defaultDispatcher
,
catch
operatörü ve tüketici ise
Dispatchers.Main
viewModelScope
tarafından kullanılıyor.
Veri kaynağı katmanı G/Ç çalışması yaptığı için bir görev dağıtıcısı optimize edilmiştir:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack kitaplıklarındaki akışlar
Akış, birçok Jetpack kitaplığına entegredir ve Android üçüncü taraf kitaplıkları. Akış, canlı veri güncellemeleri için idealdir ve sonsuz veri akışı vardır.
Tekliflerinizi otomatikleştirmek ve optimize etmek için
Odalı Akış
bir veritabanındaki değişikliklerle ilgili bildirim almak için kullanılır. Bunu kullanırken
veri erişimi nesneleri (DAO)
canlı güncellemeleri almak için bir Flow
türü döndürün.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Example
tablosunda her değişiklik olduğunda yeni bir liste yayınlanır
yeni öğelerle değiştirin.
Geri çağırmaya dayalı API'leri akışlara dönüştürme
callbackFlow
geri çağırmaya dayalı API'leri akışlara dönüştürmenizi sağlayan bir akış oluşturucudur.
Örneğin, Firebase Firestore
Android API'leri geri çağırma kullanır.
Bu API'leri akışlara dönüştürmek ve Firestore veritabanı güncellemelerini dinlemek için şu kodu kullanabilirsiniz:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
flow
oluşturucunun aksine, callbackFlow
bu değerlerin farklı bir CoroutineContext
öğesinden dağıtılmasına izin verir.
send
işlevi kullanılarak veya
trySend
işlevini kullanın.
callbackFlow
dahili olarak
channel,
Bu, kavramsal olarak engellemeye çok benzer.
sıralama'yı tıklayın.
Bir kanal, izin verilen maksimum öğe sayısı olan bir kapasite ile yapılandırılır.
veri havuzundan çıkarabilirsiniz. callbackFlow
adlı kanalda oluşturulan kanalın bir varsayılan değeri var
64 öğelik kapasite. Tam bir öğeye yeni bir öğe eklemeye çalıştığınızda,
kanalından, send
yeni video için yer açılıncaya kadar yapımcıyı askıya alır
öğesi içerir; trySend
ise öğeyi kanala eklemez ve
false
hemen.
trySend
, belirtilen öğeyi kanala hemen ekler.
kapasite kısıtlamalarını ihlal etmiyorsa ve daha sonra
yardımcı olur.
Ek akış kaynakları
- Android'de Kotlin akışlarını test etme
StateFlow
veSharedFlow
- Kotlin eş yordamları ve akışı için ek kaynaklar