Kotlin ทำงานอย่างลื่นไหลใน Android

ในโครูทีน flow คือชนิดที่ปล่อยค่าได้หลายค่า ตามลำดับ ซึ่งตรงข้ามกับระงับฟังก์ชันที่ส่งกลับเฉพาะ ค่าเดี่ยว เช่น คุณสามารถใช้ขั้นตอนแสดงการถ่ายทอดสด จากฐานข้อมูล

โฟลว์สร้างขึ้นบนโครูทีนและสามารถระบุค่าได้หลายค่า โฟลว์คือแนวคิดคือสตรีมข้อมูลที่สามารถคํานวณได้ แบบไม่พร้อมกัน ค่าที่ปล่อยออกมาต้องเป็นประเภทเดียวกัน สำหรับ ตัวอย่างเช่น Flow<Int> คือโฟลว์ที่แสดงค่าจำนวนเต็ม

โฟลว์คล้ายกับ Iterator อย่างมากซึ่งสร้างลำดับ แต่ใช้ฟังก์ชันการระงับเพื่อสร้างและใช้ค่า แบบไม่พร้อมกัน ซึ่งหมายความว่าขั้นตอนสามารถทำให้ เพื่อสร้างค่าถัดไปโดยไม่บล็อก ชุดข้อความ

สตรีมข้อมูลมีอยู่ 3 เอนทิตี ได้แก่

  • ผู้ผลิตจะสร้างข้อมูลที่เพิ่มลงในสตรีม ขอขอบคุณ โครูทีน โฟลว์ยังสามารถให้ข้อมูลแบบไม่พร้อมกัน
  • (ไม่บังคับ) ตัวกลางสามารถแก้ไขแต่ละค่าที่ส่งลงในค่า หรือตัวสตรีมเอง
  • ผู้บริโภคจะใช้ค่าจากสตรีม

เอนทิตีที่เกี่ยวข้องในสตรีมข้อมูล ผู้บริโภค, ไม่บังคับ
              ตัวกลาง และผู้ผลิต
รูปที่ 1 เอนทิตีที่เกี่ยวข้องในสตรีมข้อมูล: ผู้บริโภค ตัวกลางที่ไม่บังคับ และผู้ผลิต

ใน Android ที่เก็บคือ ซึ่งโดยทั่วไปแล้วเป็นผู้ผลิตข้อมูล UI ที่มีอินเทอร์เฟซผู้ใช้ (UI) เป็นผู้บริโภค ซึ่งจะแสดงข้อมูลดังกล่าวในท้ายที่สุด แต่บางครั้งเลเยอร์ UI ก็เป็นการสร้าง เหตุการณ์อินพุตของผู้ใช้และเลเยอร์อื่นๆ ของลำดับชั้นจะมีผล เลเยอร์ใน ระหว่างผู้ผลิตและผู้บริโภคมักจะทำหน้าที่เป็นตัวกลางที่แก้ไข สตรีมข้อมูลเพื่อปรับให้เข้ากับข้อกำหนดของเลเยอร์ต่อไปนี้

การสร้างขั้นตอน

หากต้องการสร้างขั้นตอน ให้ใช้ เครื่องมือสร้างขั้นตอน API ฟังก์ชันเครื่องมือสร้าง flow จะสร้างขั้นตอนใหม่ที่คุณดำเนินการด้วยตนเองได้ แสดงค่าใหม่ลงในสตรีมข้อมูลโดยใช้ emit

ในตัวอย่างต่อไปนี้ แหล่งข้อมูลจะดึงข้อมูลข่าวล่าสุด โดยอัตโนมัติตามช่วงเวลาคงที่ เนื่องจากฟังก์ชันการระงับไม่สามารถ แสดงผลค่าติดกันหลายค่า แหล่งข้อมูลจะสร้างและแสดงผล วิธีการทำตามข้อกำหนดนี้ ในกรณีนี้ แหล่งข้อมูลจะทำหน้าที่ ในฐานะโปรดิวเซอร์

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

เรียกใช้เครื่องมือสร้าง flow ภายในโครูทีน ดังนั้นจึงมีประโยชน์ จาก API แบบอะซิงโครนัสเดียวกัน แต่มีข้อจำกัดบางประการดังนี้

  • ขั้นตอนจะเป็นตามลำดับ ในฐานะผู้ผลิตอยู่ในโครูทีน เมื่อเรียก ฟังก์ชันระงับ ผู้ผลิตจะระงับจนกว่าฟังก์ชันระงับ ที่เกินออกมา ในตัวอย่างนี้ ผู้ผลิตจะระงับจนกว่าจะถึง fetchLatestNews คำขอเครือข่ายเสร็จสมบูรณ์ หลังจากนั้น ผลลัพธ์ที่ได้ก็ส่งมายังสตรีม
  • เมื่อใช้เครื่องมือสร้าง flow ผู้ผลิตจะemitค่าจาก CoroutineContext ที่แตกต่างกัน ดังนั้น อย่าเรียก emit ใน CoroutineContext โดยการสร้างโครูทีนใหม่หรือใช้ withContext บล็อกโค้ด คุณสามารถใช้เครื่องมือสร้างขั้นตอนอื่นๆ เช่น callbackFlow ในกรณีเหล่านี้

การแก้ไขสตรีม

ตัวกลางสามารถใช้โอเปอเรเตอร์ระดับกลางเพื่อแก้ไขสตรีมของ ข้อมูลได้โดยไม่ต้องใช้ค่า โอเปอเรเตอร์เหล่านี้เป็นฟังก์ชันที่ ใช้กับสตรีมข้อมูล ให้ตั้งค่าห่วงโซ่การดำเนินการที่ไม่ใช่ ดำเนินการจนกว่าจะมีการใช้ค่านั้นในอนาคต ดูข้อมูลเพิ่มเติมเกี่ยวกับ โอเปอเรเตอร์ระดับกลางในส่วน เอกสารอ้างอิงโฟลว์

ในตัวอย่างด้านล่าง เลเยอร์ที่เก็บใช้โอเปอเรเตอร์ตัวกลาง map เพื่อแปลงข้อมูลที่จะแสดงใน View ดังนี้

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

สามารถใช้ตัวดำเนินการขั้นกลางต่อแบบตัวอื่นๆ เพื่อสร้างเป็นห่วงโซ่ ของการดำเนินการที่จะดำเนินการแบบ Lazy Loading เมื่อมีการส่งออกรายการไปยัง โปรดทราบว่าการใช้โอเปอเรเตอร์ระดับกลางกับสตรีมจะส่งผล ไม่เริ่มการรวบรวมโฟลว์

การรวบรวมจากขั้นตอน

ใช้โอเปอเรเตอร์เทอร์มินัลเพื่อทริกเกอร์กระบวนการเพื่อเริ่มการฟัง หากต้องการดูค่าทั้งหมดในสตรีมเมื่อมีการปล่อยออกมา ให้ใช้ collect คุณสามารถเรียนรู้เพิ่มเติมเกี่ยวกับโอเปอเรเตอร์เทอร์มินัลใน เอกสารเกี่ยวกับกระบวนการอย่างเป็นทางการ

เนื่องจาก collect เป็นฟังก์ชันระงับ จึงจำเป็นต้องดำเนินการภายใน โครูทีน จะใช้ lambda เป็นพารามิเตอร์ที่จะเรียกใช้ ค่าใหม่ทุกๆ ค่า และเนื่องจากเป็นฟังก์ชันระงับ โครูทีนที่ การโทร collect อาจระงับจนกว่าจะมีการปิดโฟลว์

ต่อจากตัวอย่างก่อนหน้านี้ นี่คือการใช้งาน ViewModel โดยใช้ข้อมูลจากเลเยอร์ที่เก็บ:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

การรวบรวมความต่อเนื่องจะทริกเกอร์ผู้ผลิตที่รีเฟรชข่าวสารล่าสุด และแสดงผลลัพธ์ของคำขอเครือข่ายในช่วงเวลาที่คงที่ ตามที่ ผู้ผลิตจะยังคงมีการใช้งานอยู่เสมอเมื่อมีการวนซ้ำ while(true) ซึ่งเป็นสตรีม ของ ข้อมูลจะปิดเมื่อล้าง ViewModel และ viewModelScope ถูกยกเลิก

การรวบรวมโฟลว์อาจหยุดด้วยเหตุผลต่อไปนี้

  • โครูทีนที่รวบรวมมาถูกยกเลิกตามที่แสดงในตัวอย่างก่อนหน้า ซึ่งจะหยุดผู้ผลิตที่เกี่ยวข้องด้วย
  • ผู้ผลิตปล่อยสัญญาณรายการเสร็จแล้ว ในกรณีนี้ สตรีมข้อมูล ปิดและโครูทีนที่ชื่อ collect กลับมาดำเนินการอีกครั้งแล้ว

โฟลว์เป็นแบบเย็นและแบบ Lazy Loading เว้นแต่จะระบุด้วยตัวกลางอื่นๆ โอเปอเรเตอร์ ซึ่งหมายความว่าโค้ดผู้ผลิตจะถูกเรียกใช้ทุกครั้ง ระบบจะเรียกใช้โอเปอเรเตอร์เทอร์มินัลในโฟลว์ ในตัวอย่างก่อนหน้านี้ การมี Flow Collector หลายรายการจะทำให้แหล่งข้อมูลดึงข้อมูล ข่าวล่าสุดหลายครั้งในช่วงเวลาต่างๆ คงที่ เพื่อเพิ่มประสิทธิภาพและ แชร์ขั้นตอนเมื่อผู้บริโภคหลายรายเก็บข้อมูลในเวลาเดียวกัน ให้ใช้ shareIn

การตรวจจับข้อยกเว้นที่ไม่คาดคิด

การใช้งานของผู้ผลิตอาจมาจากไลบรารีของบุคคลที่สาม ซึ่งหมายความว่าอาจมีข้อยกเว้นที่ไม่คาดคิด เพื่อจัดการกับเรื่องเหล่านี้ ให้ใช้เมธอด catch ตัวกลาง

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

ในตัวอย่างก่อนหน้านี้ เมื่อมีข้อยกเว้น collect ยังไม่มีการเรียกใช้ lambda เพราะยังไม่ได้รับรายการใหม่

catch ยังemitรายการไปยังโฟลว์ได้ ที่เก็บตัวอย่าง เลเยอร์สามารถemitค่าที่แคชไว้แทน

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

ในตัวอย่างนี้ เมื่อข้อยกเว้นเกิดขึ้น collect lambda จะ เนื่องจากระบบได้ปล่อยรายการใหม่ไปยังสตรีมเนื่องจาก ข้อยกเว้น

การดำเนินการใน CoroutineContext อื่น

โดยค่าเริ่มต้น ผู้สร้างเครื่องมือสร้าง flow จะดำเนินการใน CoroutineContext ของโครูทีนที่รวบรวมจากมัน และ ที่กล่าวถึงก่อนหน้านี้ ไม่สามารถemitค่าจาก CoroutineContext การทำงานลักษณะนี้อาจไม่เป็นที่ต้องการในบางกรณี ตัวอย่างเช่น ในตัวอย่างที่ใช้ตลอดทั้งหัวข้อนี้ ที่เก็บ เลเยอร์ไม่ควรจะดำเนินการกับ Dispatchers.Main ที่ viewModelScope ใช้

หากต้องการเปลี่ยน CoroutineContext ของโฟลว์ ให้ใช้โอเปอเรเตอร์ระดับกลาง flowOn flowOn เปลี่ยน CoroutineContext ของขั้นตอนอัปสตรีม ซึ่งหมายความว่า ผู้ผลิตและโอเปอเรเตอร์ระดับกลางที่ใช้ก่อน (หรือสูงกว่า) flowOn ขั้นตอนดาวน์สตรีม (โอเปอเรเตอร์ระดับกลางหลัง flowOn ร่วมกับผู้บริโภค) จะไม่ได้รับผลกระทบและดำเนินการตาม ใช้ CoroutineContext เพื่อcollectจากโฟลว์ หากมี โอเปอเรเตอร์ flowOn หลายรายการ แต่ละรายการเปลี่ยนอัปสตรีมจาก ตำแหน่งปัจจุบัน

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

เมื่อใช้โค้ดนี้ โอเปอเรเตอร์ onEach และ map จะใช้ defaultDispatcher ขณะที่โอเปอเรเตอร์ catch และผู้บริโภคดำเนินการ viewModelScope ใช้ไป Dispatchers.Main

เนื่องจากเลเยอร์แหล่งข้อมูลกำลังทำงาน I/O คุณควรใช้ผู้มอบหมายงาน ที่ได้รับการเพิ่มประสิทธิภาพเพื่อการดำเนินการ I/O

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

โฟลว์ในคลัง Jetpack

Flow ผสานรวมอยู่ในไลบรารี Jetpack จำนวนมาก และเป็นที่นิยมในหมู่ ไลบรารี Android ของบุคคลที่สาม Flow เหมาะสำหรับการอัปเดตข้อมูลแบบสด และสตรีมข้อมูลที่ไม่รู้จบ

คุณสามารถใช้ เข้ากับห้องแชท ได้รับแจ้งเกี่ยวกับการเปลี่ยนแปลงในฐานข้อมูล เมื่อใช้ ออบเจ็กต์การเข้าถึงข้อมูล (DAO) แสดงผลประเภท Flow เพื่อรับการอัปเดตแบบสด

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

ทุกครั้งที่มีการเปลี่ยนแปลงในตาราง Example ระบบจะนำรายการใหม่ออก รายการใหม่ในฐานข้อมูล

แปลง API ที่อิงตาม Callback เป็นโฟลว์

callbackFlow เป็นเครื่องมือสร้างโฟลว์ที่ช่วยให้คุณแปลง API แบบ Callback เป็นโฟลว์ ตัวอย่างเช่น Firebase Firestore Android API ใช้การติดต่อกลับ

หากต้องการแปลง API เหล่านี้เป็นโฟลว์และรอฟังการอัปเดตฐานข้อมูล Firestore คุณต้อง สามารถใช้โค้ดต่อไปนี้

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

สิ่งที่ต่างจากเครื่องมือสร้าง flow คือ callbackFlow อนุญาตให้ปล่อยค่าจาก CoroutineContext อื่นที่มี send หรือภายนอกโครูทีนซึ่งมีค่า trySend

จากภายใน callbackFlow จะใช้ ช่อง ซึ่งในแนวคิดนี้คล้ายกับ การบล็อก คิว แชแนลได้รับการกำหนดค่าด้วยความจุ ซึ่งเป็นจำนวนองค์ประกอบสูงสุด ที่สามารถบัฟเฟอร์ได้ ช่องที่สร้างใน callbackFlow มีค่าเริ่มต้น องค์ประกอบได้ 64 รายการ เมื่อคุณพยายามเพิ่ม องค์ประกอบใหม่ลงใน ช่อง send จะระงับครีเอเตอร์จนกว่าจะมีพื้นที่สำหรับรายการใหม่ องค์ประกอบ ในขณะที่ trySend จะไม่เพิ่มองค์ประกอบลงในช่องและแสดงผล false ทันที

trySend จะเพิ่มองค์ประกอบที่ระบุลงในช่องทันที เฉพาะในกรณีที่ข้อความนี้ไม่ได้ละเมิดข้อจำกัดทางความจุ แล้วส่งคืน ที่ประสบความสำเร็จ

แหล่งข้อมูลขั้นตอนเพิ่มเติม