Android'de Kotlin akışı

Eş yordamlarda akış, birden fazla değer yayımlayabilen bir türdür askıya alma işlevlerine kıyasla sıralı olarak ekleyebilirsiniz. Örneğin, canlı yayınları almak için otomatik olarak eklenir.

Akışlar eş yordamların üzerine kuruludur ve birden fazla değer sağlayabilir. Akış, kavramsal olarak hesaplanabilen bir veri akışıdır eşzamansız olarak ayarlayabilirsiniz. Yayınlanan değerler aynı türde olmalıdır. Örneğin, Örneğin, Flow<Int> tam sayı değerleri yayan bir akıştır.

Akış, bir dizi üreten Iterator ile çok benzerdir. değerleri üretmek ve kullanmak için askıya alma işlevlerini kullanır eşzamansız olarak ayarlayabilirsiniz. Bu da örneğin, akışın güvenli bir şekilde ağ isteği. ileti dizisi.

Veri akışlarına dahil olan üç varlık vardır:

  • Üretici, akışa eklenen veriler oluşturur. Teşekkürler eş yordamlar, akışlar verileri eşzamansız olarak da üretebilir.
  • (İsteğe bağlı) Aracılar, akış veya akışın kendisi olabilir.
  • Tüketiciler, akıştaki değerleri tüketir.

Veri akışlarına dahil olan tüzel kişiler; tüketici, isteğe bağlı
              emin olmak için
Şekil 1. Veri akışlarında yer alan varlıklar: tüketici, isteğe bağlı aracılar ve üreticilerle ilişkilidir.

Android'de depo, genellikle tüketici olarak kullanıcı arayüzüne (UI) sahip bir kullanıcı arayüzü verileri üreticisi verileri gösteren bir satırdır. Bazı durumlarda ise kullanıcı arayüzü katmanı Kullanıcı giriş etkinlikleri ve hiyerarşinin diğer katmanları bunları tüketir. Katman: genellikle, üretici ve tüketici arasındaki yerini alan aracılar, ayarlamak için veri akışı kullanır.

Akış oluşturma

Akış oluşturmak için akış oluşturucu API'ler. flow oluşturucu işlevi, manuel olarak yapabileceğiniz yeni bir akış oluşturur. emit işlevini kullanın.

Aşağıdaki örnekte, bir veri kaynağı en son haberleri getirir ve belirli bir arada otomatik olarak yapılır. Askıya alma işlevi birden çok ardışık değer döndürdüğünde, veri kaynağı aynı değeri bir akış şeması oluşturabilirsiniz. Bu durumda veri kaynağı, sizin görevinizdir.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow oluşturucu, bir eş yordam içinde yürütüldü. Böylelikle aynı eşzamansız API'lerden yayınlanır ancak bazı kısıtlamalar geçerlidir:

  • Akışlar sıralıdır. Yapımcı bir eş yordamdayken bir askıya alma işlevi söz konusuysa üretici, askıya alma işlevi belirtir. Bu örnekte yapımcı, fetchLatestNews tarihine kadar askıya alınır. ağ isteği tamamlanır. Sonuç, ancak sonrasında akışa iletilir.
  • flow oluşturucuyla, yapımcı emit farklı CoroutineContext. Bu nedenle, emit öğesini farklı bir Yeni eş yordamlar oluşturarak veya withContext kullanarak CoroutineContext gibi diyelim. Ayrıca, Google Etiket Yöneticisi ve callbackFlow durumlarda işe yarıyor.

Akışı değiştirme

Aracılar, akışı değiştirmek için ara operatörler kullanabilir. tükettiğini düşünelim. Bu operatörler, veya uygulanmadığı bir işlem zinciri oluşturun, bu şekilde devam eder. Daha fazla bilgi: ve ara operatörlerle Akış referans belgeleri.

Aşağıdaki örnekte, depo katmanı ara operatörü kullanır map View üzerinde görüntülenecek verileri dönüştürmek için:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Ara operatörler birbiri ardına uygulanarak bir zincir oluşturabilir bir öğe, akışı sağlar. Akışa yalnızca ara operatör uygulamanın veri toplama işlemini başlatmaz.

Akıştan veri toplama

Akışı, dinlemeye başlamak üzere tetiklemek için bir terminal operatörü kullanın değerler. Akıştaki tüm değerleri yayınlandıkça almak için collect. Terminal operatörleri hakkında daha fazla bilgiyi resmi akış dokümanlarını takip edin.

collect bir askıya alma işlevi olduğundan, eş yordam. Bir lambda parametresi olarak dahil edin. Bu bir askıya alma fonksiyonu olduğundan, collect çağrıları, akış kapatılana kadar askıya alınabilir.

Önceki örnekten devam edersek, Google Etiket Yöneticisi'nin depo katmanındaki verileri kullanan bir ViewModel:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Akışın toplanması, en son haberleri yenileyen yapımcıyı tetikler. ve ağ isteğinin sonucunu sabit bir aralıkta yayar. yapımcı, while(true) döngüsüyle her zaman etkin ViewModel temizlendiğinde ve veri türü temizlendiğinde kapatılacaktır. viewModelScope iptal edildi.

Akış toplama aşağıdaki nedenlerle durdurulabilir:

  • Bir önceki örnekte gösterildiği gibi, toplanan eş yordam iptal edildi. Bu işlem, ana yapımcıyı da durdurur.
  • Üretici, öğeleri yayınlamayı bitirir. Bu durumda veri akışı kapatılır ve collect adlı eş yordam yürütme işlemini devam ettirir.

Başka bir ara öğeyle belirtilmediği sürece akışlar soğuk ve geçtir işleci. Bu, üretici kodunun, çalıştırılan her bir terminal operatörü akışta çağrılır. Önceki örnekte, birden fazla akış toplayıcıya sahip olmak, veri kaynağının en son haberleri farklı sabit aralıklarla birden çok kez yayınlayabilirsiniz. Optimizasyon ve Aynı anda birden fazla tüketici veri topladığında bir akış paylaşır, shareIn operatörü.

Beklenmeyen istisnaları yakalama

Üretici uygulaması, üçüncü taraf kitaplığından gelebilir. Bu, beklenmedik istisnalar oluşturabileceği anlamına gelir. Bunları ele almak için: istisnalar varsa catch ara operatör.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Yukarıdaki örnekte bir istisna oluştuğunda collect Yeni bir öğe alınmadığından lambda çağrılmaz.

catch, akışa emit öğe de ekleyebilir. Örnek depo katmanı, bunun yerine önbelleğe alınan değerleri emit:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Bu örnekte, bir istisna oluştuğunda collect lambda kabul edersiniz.

Farklı bir CoroutineContext'te yürütme

Varsayılan olarak, flow oluşturucunun üreticisi Kendisinden toplanan eş yordamın CoroutineContext ve daha önce belirtildiği gibi, farklı bir değerden emit değerleri alamaz. CoroutineContext. Bazı durumlarda bu istenmeyen bir davranış olabilir. Örneğin, bu konu genelinde kullanılan örneklerde depo katmanının Dispatchers.Main üzerinde şu işlemleri gerçekleştirmemesi gerekir: viewModelScope tarafından kullanılıyor.

Bir akışın CoroutineContext değerini değiştirmek için ara operatörü kullanın flowOn flowOn, yukarı akış akışının CoroutineContext değerini değiştirir. Bu, yapımcı ve önce (veya üstü) uygulanan ara operatörler flowOn Aşağı akış akışı (flowOn tarihinden sonra ara operatörler) tüketiciyle birlikte) bu durumdan etkilenmez ve müşteriyle CoroutineContext akıştan collect olarak kullanıldı. Varsa birden fazla flowOn operatörü, her biri yukarı akış işlemini geçerli konum.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Bu kodla, onEach ve map operatörleri defaultDispatcher, catch operatörü ve tüketici ise Dispatchers.Main viewModelScope tarafından kullanılıyor.

Veri kaynağı katmanı G/Ç çalışması yaptığı için bir görev dağıtıcısı optimize edilmiştir:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Jetpack kitaplıklarındaki akışlar

Akış, birçok Jetpack kitaplığına entegredir ve Android üçüncü taraf kitaplıkları. Akış, canlı veri güncellemeleri için idealdir ve sonsuz veri akışı vardır.

Tekliflerinizi otomatikleştirmek ve optimize etmek için Odalı Akış bir veritabanındaki değişikliklerle ilgili bildirim almak için kullanılır. Bunu kullanırken veri erişimi nesneleri (DAO) canlı güncellemeleri almak için bir Flow türü döndürün.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Example tablosunda her değişiklik olduğunda yeni bir liste yayınlanır yeni öğelerle değiştirin.

Geri çağırmaya dayalı API'leri akışlara dönüştürme

callbackFlow geri çağırmaya dayalı API'leri akışlara dönüştürmenizi sağlayan bir akış oluşturucudur. Örneğin, Firebase Firestore Android API'leri geri çağırma kullanır.

Bu API'leri akışlara dönüştürmek ve Firestore veritabanı güncellemelerini dinlemek için şu kodu kullanabilirsiniz:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow oluşturucunun aksine, callbackFlow bu değerlerin farklı bir CoroutineContext öğesinden dağıtılmasına izin verir. send işlevi kullanılarak veya trySend işlevini kullanın.

callbackFlow dahili olarak channel, Bu, kavramsal olarak engellemeye çok benzer. sıralama'yı tıklayın. Bir kanal, izin verilen maksimum öğe sayısı olan bir kapasite ile yapılandırılır. veri havuzundan çıkarabilirsiniz. callbackFlow adlı kanalda oluşturulan kanalın bir varsayılan değeri var 64 öğelik kapasite. Tam bir öğeye yeni bir öğe eklemeye çalıştığınızda, kanalından, send yeni video için yer açılıncaya kadar yapımcıyı askıya alır öğesi içerir; trySend ise öğeyi kanala eklemez ve false hemen.

trySend, belirtilen öğeyi kanala hemen ekler. kapasite kısıtlamalarını ihlal etmiyorsa ve daha sonra yardımcı olur.

Ek akış kaynakları