ในโครูทีน flow คือชนิดที่ปล่อยค่าได้หลายค่า ตามลำดับ ซึ่งตรงข้ามกับระงับฟังก์ชันที่ส่งกลับเฉพาะ ค่าเดี่ยว เช่น คุณสามารถใช้ขั้นตอนแสดงการถ่ายทอดสด จากฐานข้อมูล
โฟลว์สร้างขึ้นบนโครูทีนและสามารถระบุค่าได้หลายค่า
โฟลว์คือแนวคิดคือสตรีมข้อมูลที่สามารถคํานวณได้
แบบไม่พร้อมกัน ค่าที่ปล่อยออกมาต้องเป็นประเภทเดียวกัน สำหรับ
ตัวอย่างเช่น Flow<Int>
คือโฟลว์ที่แสดงค่าจำนวนเต็ม
โฟลว์คล้ายกับ Iterator
อย่างมากซึ่งสร้างลำดับ
แต่ใช้ฟังก์ชันการระงับเพื่อสร้างและใช้ค่า
แบบไม่พร้อมกัน ซึ่งหมายความว่าขั้นตอนสามารถทำให้
เพื่อสร้างค่าถัดไปโดยไม่บล็อก
ชุดข้อความ
สตรีมข้อมูลมีอยู่ 3 เอนทิตี ได้แก่
- ผู้ผลิตจะสร้างข้อมูลที่เพิ่มลงในสตรีม ขอขอบคุณ โครูทีน โฟลว์ยังสามารถให้ข้อมูลแบบไม่พร้อมกัน
- (ไม่บังคับ) ตัวกลางสามารถแก้ไขแต่ละค่าที่ส่งลงในค่า หรือตัวสตรีมเอง
- ผู้บริโภคจะใช้ค่าจากสตรีม
ใน Android ที่เก็บคือ ซึ่งโดยทั่วไปแล้วเป็นผู้ผลิตข้อมูล UI ที่มีอินเทอร์เฟซผู้ใช้ (UI) เป็นผู้บริโภค ซึ่งจะแสดงข้อมูลดังกล่าวในท้ายที่สุด แต่บางครั้งเลเยอร์ UI ก็เป็นการสร้าง เหตุการณ์อินพุตของผู้ใช้และเลเยอร์อื่นๆ ของลำดับชั้นจะมีผล เลเยอร์ใน ระหว่างผู้ผลิตและผู้บริโภคมักจะทำหน้าที่เป็นตัวกลางที่แก้ไข สตรีมข้อมูลเพื่อปรับให้เข้ากับข้อกำหนดของเลเยอร์ต่อไปนี้
การสร้างขั้นตอน
หากต้องการสร้างขั้นตอน ให้ใช้
เครื่องมือสร้างขั้นตอน
API ฟังก์ชันเครื่องมือสร้าง flow
จะสร้างขั้นตอนใหม่ที่คุณดำเนินการด้วยตนเองได้
แสดงค่าใหม่ลงในสตรีมข้อมูลโดยใช้
emit
ในตัวอย่างต่อไปนี้ แหล่งข้อมูลจะดึงข้อมูลข่าวล่าสุด โดยอัตโนมัติตามช่วงเวลาคงที่ เนื่องจากฟังก์ชันการระงับไม่สามารถ แสดงผลค่าติดกันหลายค่า แหล่งข้อมูลจะสร้างและแสดงผล วิธีการทำตามข้อกำหนดนี้ ในกรณีนี้ แหล่งข้อมูลจะทำหน้าที่ ในฐานะโปรดิวเซอร์
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
เรียกใช้เครื่องมือสร้าง flow
ภายในโครูทีน ดังนั้นจึงมีประโยชน์
จาก API แบบอะซิงโครนัสเดียวกัน แต่มีข้อจำกัดบางประการดังนี้
- ขั้นตอนจะเป็นตามลำดับ ในฐานะผู้ผลิตอยู่ในโครูทีน เมื่อเรียก
ฟังก์ชันระงับ ผู้ผลิตจะระงับจนกว่าฟังก์ชันระงับ
ที่เกินออกมา ในตัวอย่างนี้ ผู้ผลิตจะระงับจนกว่าจะถึง
fetchLatestNews
คำขอเครือข่ายเสร็จสมบูรณ์ หลังจากนั้น ผลลัพธ์ที่ได้ก็ส่งมายังสตรีม - เมื่อใช้เครื่องมือสร้าง
flow
ผู้ผลิตจะemit
ค่าจากCoroutineContext
ที่แตกต่างกัน ดังนั้น อย่าเรียกemit
ในCoroutineContext
โดยการสร้างโครูทีนใหม่หรือใช้withContext
บล็อกโค้ด คุณสามารถใช้เครื่องมือสร้างขั้นตอนอื่นๆ เช่นcallbackFlow
ในกรณีเหล่านี้
การแก้ไขสตรีม
ตัวกลางสามารถใช้โอเปอเรเตอร์ระดับกลางเพื่อแก้ไขสตรีมของ ข้อมูลได้โดยไม่ต้องใช้ค่า โอเปอเรเตอร์เหล่านี้เป็นฟังก์ชันที่ ใช้กับสตรีมข้อมูล ให้ตั้งค่าห่วงโซ่การดำเนินการที่ไม่ใช่ ดำเนินการจนกว่าจะมีการใช้ค่านั้นในอนาคต ดูข้อมูลเพิ่มเติมเกี่ยวกับ โอเปอเรเตอร์ระดับกลางในส่วน เอกสารอ้างอิงโฟลว์
ในตัวอย่างด้านล่าง เลเยอร์ที่เก็บใช้โอเปอเรเตอร์ตัวกลาง
map
เพื่อแปลงข้อมูลที่จะแสดงใน View
ดังนี้
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
สามารถใช้ตัวดำเนินการขั้นกลางต่อแบบตัวอื่นๆ เพื่อสร้างเป็นห่วงโซ่ ของการดำเนินการที่จะดำเนินการแบบ Lazy Loading เมื่อมีการส่งออกรายการไปยัง โปรดทราบว่าการใช้โอเปอเรเตอร์ระดับกลางกับสตรีมจะส่งผล ไม่เริ่มการรวบรวมโฟลว์
การรวบรวมจากขั้นตอน
ใช้โอเปอเรเตอร์เทอร์มินัลเพื่อทริกเกอร์กระบวนการเพื่อเริ่มการฟัง
หากต้องการดูค่าทั้งหมดในสตรีมเมื่อมีการปล่อยออกมา ให้ใช้
collect
คุณสามารถเรียนรู้เพิ่มเติมเกี่ยวกับโอเปอเรเตอร์เทอร์มินัลใน
เอกสารเกี่ยวกับกระบวนการอย่างเป็นทางการ
เนื่องจาก collect
เป็นฟังก์ชันระงับ จึงจำเป็นต้องดำเนินการภายใน
โครูทีน จะใช้ lambda เป็นพารามิเตอร์ที่จะเรียกใช้
ค่าใหม่ทุกๆ ค่า และเนื่องจากเป็นฟังก์ชันระงับ โครูทีนที่
การโทร collect
อาจระงับจนกว่าจะมีการปิดโฟลว์
ต่อจากตัวอย่างก่อนหน้านี้ นี่คือการใช้งาน
ViewModel
โดยใช้ข้อมูลจากเลเยอร์ที่เก็บ:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
การรวบรวมความต่อเนื่องจะทริกเกอร์ผู้ผลิตที่รีเฟรชข่าวสารล่าสุด
และแสดงผลลัพธ์ของคำขอเครือข่ายในช่วงเวลาที่คงที่ ตามที่
ผู้ผลิตจะยังคงมีการใช้งานอยู่เสมอเมื่อมีการวนซ้ำ while(true)
ซึ่งเป็นสตรีม
ของ ข้อมูลจะปิดเมื่อล้าง ViewModel และ
viewModelScope
ถูกยกเลิก
การรวบรวมโฟลว์อาจหยุดด้วยเหตุผลต่อไปนี้
- โครูทีนที่รวบรวมมาถูกยกเลิกตามที่แสดงในตัวอย่างก่อนหน้า ซึ่งจะหยุดผู้ผลิตที่เกี่ยวข้องด้วย
- ผู้ผลิตปล่อยสัญญาณรายการเสร็จแล้ว ในกรณีนี้ สตรีมข้อมูล
ปิดและโครูทีนที่ชื่อ
collect
กลับมาดำเนินการอีกครั้งแล้ว
โฟลว์เป็นแบบเย็นและแบบ Lazy Loading เว้นแต่จะระบุด้วยตัวกลางอื่นๆ
โอเปอเรเตอร์ ซึ่งหมายความว่าโค้ดผู้ผลิตจะถูกเรียกใช้ทุกครั้ง
ระบบจะเรียกใช้โอเปอเรเตอร์เทอร์มินัลในโฟลว์ ในตัวอย่างก่อนหน้านี้
การมี Flow Collector หลายรายการจะทำให้แหล่งข้อมูลดึงข้อมูล
ข่าวล่าสุดหลายครั้งในช่วงเวลาต่างๆ คงที่ เพื่อเพิ่มประสิทธิภาพและ
แชร์ขั้นตอนเมื่อผู้บริโภคหลายรายเก็บข้อมูลในเวลาเดียวกัน ให้ใช้
shareIn
การตรวจจับข้อยกเว้นที่ไม่คาดคิด
การใช้งานของผู้ผลิตอาจมาจากไลบรารีของบุคคลที่สาม
ซึ่งหมายความว่าอาจมีข้อยกเว้นที่ไม่คาดคิด เพื่อจัดการกับเรื่องเหล่านี้
ให้ใช้เมธอด
catch
ตัวกลาง
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
ในตัวอย่างก่อนหน้านี้ เมื่อมีข้อยกเว้น collect
ยังไม่มีการเรียกใช้ lambda เพราะยังไม่ได้รับรายการใหม่
catch
ยังemit
รายการไปยังโฟลว์ได้ ที่เก็บตัวอย่าง
เลเยอร์สามารถemit
ค่าที่แคชไว้แทน
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
ในตัวอย่างนี้ เมื่อข้อยกเว้นเกิดขึ้น collect
lambda จะ
เนื่องจากระบบได้ปล่อยรายการใหม่ไปยังสตรีมเนื่องจาก
ข้อยกเว้น
การดำเนินการใน CoroutineContext อื่น
โดยค่าเริ่มต้น ผู้สร้างเครื่องมือสร้าง flow
จะดำเนินการใน
CoroutineContext
ของโครูทีนที่รวบรวมจากมัน และ
ที่กล่าวถึงก่อนหน้านี้ ไม่สามารถemit
ค่าจาก
CoroutineContext
การทำงานลักษณะนี้อาจไม่เป็นที่ต้องการในบางกรณี
ตัวอย่างเช่น ในตัวอย่างที่ใช้ตลอดทั้งหัวข้อนี้ ที่เก็บ
เลเยอร์ไม่ควรจะดำเนินการกับ Dispatchers.Main
ที่
viewModelScope
ใช้
หากต้องการเปลี่ยน CoroutineContext
ของโฟลว์ ให้ใช้โอเปอเรเตอร์ระดับกลาง
flowOn
flowOn
เปลี่ยน CoroutineContext
ของขั้นตอนอัปสตรีม ซึ่งหมายความว่า
ผู้ผลิตและโอเปอเรเตอร์ระดับกลางที่ใช้ก่อน (หรือสูงกว่า)
flowOn
ขั้นตอนดาวน์สตรีม (โอเปอเรเตอร์ระดับกลางหลัง flowOn
ร่วมกับผู้บริโภค) จะไม่ได้รับผลกระทบและดำเนินการตาม
ใช้ CoroutineContext
เพื่อcollect
จากโฟลว์ หากมี
โอเปอเรเตอร์ flowOn
หลายรายการ แต่ละรายการเปลี่ยนอัปสตรีมจาก
ตำแหน่งปัจจุบัน
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
เมื่อใช้โค้ดนี้ โอเปอเรเตอร์ onEach
และ map
จะใช้ defaultDispatcher
ขณะที่โอเปอเรเตอร์ catch
และผู้บริโภคดำเนินการ
viewModelScope
ใช้ไป Dispatchers.Main
เนื่องจากเลเยอร์แหล่งข้อมูลกำลังทำงาน I/O คุณควรใช้ผู้มอบหมายงาน ที่ได้รับการเพิ่มประสิทธิภาพเพื่อการดำเนินการ I/O
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
โฟลว์ในคลัง Jetpack
Flow ผสานรวมอยู่ในไลบรารี Jetpack จำนวนมาก และเป็นที่นิยมในหมู่ ไลบรารี Android ของบุคคลที่สาม Flow เหมาะสำหรับการอัปเดตข้อมูลแบบสด และสตรีมข้อมูลที่ไม่รู้จบ
คุณสามารถใช้
เข้ากับห้องแชท
ได้รับแจ้งเกี่ยวกับการเปลี่ยนแปลงในฐานข้อมูล เมื่อใช้
ออบเจ็กต์การเข้าถึงข้อมูล (DAO)
แสดงผลประเภท Flow
เพื่อรับการอัปเดตแบบสด
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
ทุกครั้งที่มีการเปลี่ยนแปลงในตาราง Example
ระบบจะนำรายการใหม่ออก
รายการใหม่ในฐานข้อมูล
แปลง API ที่อิงตาม Callback เป็นโฟลว์
callbackFlow
เป็นเครื่องมือสร้างโฟลว์ที่ช่วยให้คุณแปลง API แบบ Callback เป็นโฟลว์
ตัวอย่างเช่น Firebase Firestore
Android API ใช้การติดต่อกลับ
หากต้องการแปลง API เหล่านี้เป็นโฟลว์และรอฟังการอัปเดตฐานข้อมูล Firestore คุณต้อง สามารถใช้โค้ดต่อไปนี้
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
สิ่งที่ต่างจากเครื่องมือสร้าง flow
คือ callbackFlow
อนุญาตให้ปล่อยค่าจาก CoroutineContext
อื่นที่มี
send
หรือภายนอกโครูทีนซึ่งมีค่า
trySend
จากภายใน callbackFlow
จะใช้
ช่อง
ซึ่งในแนวคิดนี้คล้ายกับ
การบล็อก
คิว
แชแนลได้รับการกำหนดค่าด้วยความจุ ซึ่งเป็นจำนวนองค์ประกอบสูงสุด
ที่สามารถบัฟเฟอร์ได้ ช่องที่สร้างใน callbackFlow
มีค่าเริ่มต้น
องค์ประกอบได้ 64 รายการ เมื่อคุณพยายามเพิ่ม
องค์ประกอบใหม่ลงใน
ช่อง send
จะระงับครีเอเตอร์จนกว่าจะมีพื้นที่สำหรับรายการใหม่
องค์ประกอบ ในขณะที่ trySend
จะไม่เพิ่มองค์ประกอบลงในช่องและแสดงผล
false
ทันที
trySend
จะเพิ่มองค์ประกอบที่ระบุลงในช่องทันที
เฉพาะในกรณีที่ข้อความนี้ไม่ได้ละเมิดข้อจำกัดทางความจุ แล้วส่งคืน
ที่ประสบความสำเร็จ
แหล่งข้อมูลขั้นตอนเพิ่มเติม
- การทดสอบการทำงานของ Kotlin ใน Android
StateFlow
และSharedFlow
- แหล่งข้อมูลเพิ่มเติมเกี่ยวกับโครูทีนและโฟลว์ของ Kotlin