Kotlin работает на Android,Kotlin работает на Android

В сопрограммах поток — это тип, который может последовательно выдавать несколько значений, в отличие от функций приостановки , которые возвращают только одно значение. Например, вы можете использовать поток для получения оперативных обновлений из базы данных.

Потоки строятся на основе сопрограмм и могут предоставлять несколько значений. Поток концептуально представляет собой поток данных , который может вычисляться асинхронно. Выдаваемые значения должны быть одного типа. Например, Flow<Int> — это поток, который выдает целочисленные значения.

Поток очень похож на Iterator , который создает последовательность значений, но использует функции приостановки для асинхронного создания и потребления значений. Это означает, например, что поток может безопасно выполнить сетевой запрос для получения следующего значения, не блокируя основной поток.

В потоках данных участвуют три объекта:

  • Производитель создает данные, которые добавляются в поток. Благодаря сопрограммам потоки также могут создавать данные асинхронно.
  • (Необязательно) Посредники могут изменять каждое значение, передаваемое в поток, или сам поток.
  • Потребитель потребляет значения из потока.

объекты, участвующие в потоках данных; потребитель, необязательные посредники и производитель
Рисунок 1. Объекты, участвующие в потоках данных: потребитель, дополнительные посредники и производитель.

В Android репозиторий обычно является производителем данных пользовательского интерфейса, у которого пользовательский интерфейс (UI) является потребителем, который в конечном итоге отображает данные. В других случаях уровень пользовательского интерфейса является источником событий пользовательского ввода, а другие уровни иерархии потребляют их. Уровни между производителем и потребителем обычно действуют как посредники, которые изменяют поток данных, чтобы адаптировать его к требованиям следующего уровня.

Создание потока

Для создания потоков используйте API-интерфейсы построителя потоков . Функция построения flow создает новый поток, в котором вы можете вручную добавлять новые значения в поток данных с помощью функции emit .

В следующем примере источник данных автоматически получает последние новости через фиксированный интервал. Поскольку функция приостановки не может возвращать несколько последовательных значений, источник данных создает и возвращает поток для выполнения этого требования. В этом случае источник данных выступает в роли производителя.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Построитель flow выполняется внутри сопрограммы. Таким образом, он получает преимущества от тех же асинхронных API, но применяются некоторые ограничения:

  • Потоки являются последовательными . Поскольку производитель находится в сопрограмме, при вызове функции приостановки он приостанавливается до тех пор, пока функция приостановки не вернется. В этом примере производитель приостанавливает работу до тех пор, пока не завершится сетевой запрос fetchLatestNews . Только после этого результат передается в поток.
  • С помощью построителя flow производитель не может emit значения из другого CoroutineContext . Поэтому не вызывайте emit в другом CoroutineContext создавая новые сопрограммы или используя блоки кода withContext . В этих случаях вы можете использовать другие построители потоков, такие как callbackFlow .

Изменение потока

Посредники могут использовать промежуточные операторы для изменения потока данных без использования значений. Эти операторы представляют собой функции, которые при применении к потоку данных создают цепочку операций, которые не выполняются до тех пор, пока значения не будут использованы в будущем. Подробнее о промежуточных операторах читайте в справочной документации Flow .

В приведенном ниже примере уровень репозитория использует map промежуточных операторов для преобразования данных, которые будут отображаться в View :

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Промежуточные операторы можно применять один за другим, образуя цепочку операций, которые лениво выполняются при попадании элемента в поток. Обратите внимание, что простое применение промежуточного оператора к потоку не запускает сбор потока.

Сбор из потока

Используйте оператор терминала , чтобы запустить поток и начать прослушивание значений. Чтобы получить все значения в потоке по мере их отправки, используйте collect . Подробнее о терминальных операторах можно узнать в официальной потоковой документации .

Поскольку collect — это функция приостановки, ее необходимо выполнять внутри сопрограммы. Он принимает лямбду в качестве параметра, который вызывается для каждого нового значения. Поскольку это функция приостановки, сопрограмма, вызывающая collect может приостанавливаться до тех пор, пока поток не закроется.

Продолжая предыдущий пример, вот простая реализация ViewModel , использующая данные из уровня репозитория:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Сбор потока запускает производителя, который обновляет последние новости и выдает результат сетевого запроса через фиксированный интервал. Поскольку производитель всегда остается активным в цикле while(true) , поток данных будет закрыт при очистке ViewModel и отмене viewModelScope .

Сбор потока может прекратиться по следующим причинам:

  • Собирающая сопрограмма отменяется, как показано в предыдущем примере. Это также останавливает основного производителя.
  • Производитель завершает выпуск элементов. В этом случае поток данных закрывается и сопрограмма, вызвавшая collect , возобновляет выполнение.

Потоки являются холодными и ленивыми , если не указано иное с другими промежуточными операторами. Это означает, что код производителя выполняется каждый раз, когда в потоке вызывается оператор терминала. В предыдущем примере наличие нескольких сборщиков потоков приводит к тому, что источник данных получает последние новости несколько раз с разными фиксированными интервалами. Чтобы оптимизировать и совместно использовать поток, когда несколько потребителей собирают данные одновременно, используйте оператор shareIn .

Перехват неожиданных исключений

Реализация производителя может быть взята из сторонней библиотеки. Это означает, что он может генерировать неожиданные исключения. Для обработки этих исключений используйте промежуточный оператор catch .

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

В предыдущем примере при возникновении исключения лямбда-выражение collect не вызывается, поскольку новый элемент не был получен.

catch также может emit элементы в поток. Вместо этого уровень репозитория примера может emit кэшированные значения:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

В этом примере при возникновении исключения вызывается лямбда- collect , поскольку из-за исключения в поток был отправлен новый элемент.

Выполнение в другом CoroutineContext

По умолчанию производитель построителя flow выполняется в CoroutineContext сопрограммы, которая собирает данные из него, и, как упоминалось ранее, он не может emit значения из другого CoroutineContext . В некоторых случаях такое поведение может быть нежелательным. Например, в примерах, используемых в этом разделе, уровень репозитория не должен выполнять операции над Dispatchers.Main , который используется viewModelScope .

Чтобы изменить CoroutineContext потока, используйте промежуточный оператор flowOn . flowOn изменяет CoroutineContext восходящего потока , то есть производителя и любые промежуточные операторы, примененные до (или выше) flowOn . Нисходящий поток (промежуточные операторы после flowOn вместе с потребителем) не затрагивается и выполняется в CoroutineContext используемом для collect из потока. Если существует несколько операторов flowOn , каждый из них меняет восходящий поток с его текущего местоположения.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

В этом коде операторы onEach и map используют defaultDispatcher , тогда как оператор catch и потребитель выполняются в Dispatchers.Main , используемом viewModelScope .

Поскольку уровень источника данных выполняет работу ввода-вывода, вам следует использовать диспетчер, оптимизированный для операций ввода-вывода:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Потоки в библиотеках Jetpack

Flow интегрирован во многие библиотеки Jetpack и популярен среди сторонних библиотек Android. Flow отлично подходит для оперативного обновления данных и бесконечных потоков данных.

Вы можете использовать Flow with Room, чтобы получать уведомления об изменениях в базе данных. При использовании объектов доступа к данным (DAO) верните тип Flow , чтобы получать обновления в реальном времени.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Каждый раз, когда в таблице Example происходит изменение, создается новый список с новыми элементами в базе данных.

Преобразование API на основе обратного вызова в потоки

callbackFlow — это построитель потоков, который позволяет конвертировать API-интерфейсы на основе обратного вызова в потоки. Например, API Firebase Firestore Android используют обратные вызовы.

Чтобы преобразовать эти API в потоки и прослушивать обновления базы данных Firestore, вы можете использовать следующий код:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

В отличие от построителя flow , callbackFlow позволяет передавать значения из другого CoroutineContext с помощью функции send или вне сопрограммы с помощью функции trySend .

Внутри callbackFlow используется канал , который концептуально очень похож на блокирующую очередь . Канал настроен с емкостью — максимальным количеством элементов, которые можно буферизовать. Канал, созданный в callbackFlow имеет емкость по умолчанию — 64 элемента. Когда вы пытаетесь добавить новый элемент в полный канал, send приостанавливает работу производителя до тех пор, пока не освободится место для нового элемента, тогда как trySend не добавляет элемент в канал и немедленно возвращает false .

trySend немедленно добавляет указанный элемент в канал, только если это не нарушает его ограничений по пропускной способности, а затем возвращает успешный результат.

Дополнительные ресурсы потока