flows channels 傻傻分不清

點擊上方藍字關注我,知識會給你力量

這個系列我做了協(xié)程和Flow開發(fā)者的一系列文章的翻譯,旨在了解當前協(xié)程、Flow、LiveData這樣設計的原因,從設計者的角度,發(fā)現他們的問題,以及如何解決這些問題,pls enjoy it。
很久以前,coroutines被引入到Kotlin,它們是輕量級的。我們可以啟動大量的coroutine,我們需要一種方法在這些coroutine之間進行通信,而不會遇到可怕的 "可變共享狀態(tài) "問題。
因此,Channel被添加為一個協(xié)程間的通信原語。Channel是很好的。Channel支持在不同內核之間進行一對一、一對多、多對一和多對多的通信,并且每個發(fā)送到Channel的值都會被接收一次。

你不能使用Channel來分發(fā)事件或狀態(tài)更新,以允許多個訂閱者獨立地接收并對其作出反應。
因此,BroadcastChannel接口被引入,它的實現是帶Buffer的ConflatedBroadcastChannel。它們在一段時間內為我們提供了很好的服務,但是它們被證明是一個設計的死胡同。現在,從kotlinx-coroutines 1.4版本開始,我們引入了一個更好的解決方案--shared flows。請繼續(xù)閱讀完整的故事。
Flows are simple
在庫的早期版本中,我們只有Channel,我們試圖將異步序列的各種轉換實現為函數,將一個Channel作為參數,返回另一個Channel作為結果。這意味著,例如,一個過濾運算符將在它自己的coroutine中運行。

這樣一個操作符的性能遠遠不夠好,尤其是與寫一個if語句相比。事后看來,這并不奇怪,因為Channel是一個同步原語。任何Channel,即使是為單個生產者和單個消費者優(yōu)化的實現,都必須支持并發(fā)的通信程序,它們之間的數據傳輸需要同步,這在現代多核系統(tǒng)中是很昂貴的。當你開始在異步數據流的基礎上構建你的應用架構時,自然會出現對轉換的需求,而Channel成本也開始累積。
Kotlin Flow的簡單設計允許有效地實現轉換操作。在基本的情況下,值的發(fā)射、轉換和收集都在同一個循環(huán)程序中進行,不需要任何同步。

只有當需要在不同的程序中發(fā)射和收集數值時,才會引入流的同步性。
https://elizarov.medium.com/kotlin-flows-and-coroutines-256260fb3bdb
Flows are cold
然而,流量通常是冷的--由flow { ... }構建器函數創(chuàng)建的Flow
val coldFlow = flow {
while (isActive) {
emit(nextEvent)
}
}
流程本身沒有任何形式的計算作為支撐,在開始收集之前,它本身沒有任何狀態(tài)。每個收集器的coroutine都會執(zhí)行它自己的發(fā)射代碼的實例。關于 "cold flow,hot channel "的故事描述了Kotlin flow背后的原因,并展示了它們比Channel更適合的使用情況--返回按需計算的異步值流。

但你如何處理像用戶行為、外部設備事件、狀態(tài)更新等事情?它們的運行是獨立于是否有任何代碼對它們感興趣的。它們應該支持應用程序內部的多個觀察者。這些是所謂的事件的熱源。
Shared flows
這就是shared flow的概念的來源。一個shared flow的存在,不管它是否被收集。shared flow的收集者被稱為訂閱者。一個shared flow的所有訂閱者都會收到相同的數值序列。它有效地像一個 "廣播頻道 "一樣工作,沒有大部分的頻道開銷。它使廣播頻道的概念變得過時。

本質上,shared flow是一個輕量級的廣播事件總線,你可以在你的應用架構中創(chuàng)建和使用。
class BroadcastEventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow() // read-only public view
suspend fun postEvent(event: Event) {
_events.emit(event) // suspends until subscribers receive it
}
}
它有可調整的參數,如為新的訂閱者保留和重放的舊事件的數量,以及為快速發(fā)射器和慢速訂閱者提供緩沖的extraBufferCapacity。
一個shared flow的所有訂閱者都在自己的上下文中異步地收集事件。發(fā)射器并不等待,直到訂閱者完成對事件的處理。然而,當shared flow的緩沖區(qū)滿了,發(fā)射器會暫停,直到緩沖區(qū)有空間。在緩沖區(qū)溢出時,發(fā)射器的這種暫停提供了背壓,在收集器無法跟上時減緩發(fā)射。通過BufferOverlow參數支持處理緩沖區(qū)溢出的其他策略。
State flows
處理緩沖區(qū)溢出的一個流行方法是放棄最舊的事件,只保留最近的、最新的事件。特別是,它是在一個應用程序中對狀態(tài)變量進行建模的一個好方法。它是如此廣泛的使用情況,以至于它有自己專門的StateFlow類型,作為ConflatedBroadcastChannel的替代,后者也已經過時了。
class StateModel {
private val _state = MutableStateFlow(initial)
val state = _state.asStateFlow() // read-only public view
fun update(newValue: Value) {
_state.value = newValue // NOT suspending
}
}
可以把val x: StateFlow
有了狀態(tài)流,復雜Channel和簡單流之間的性能差異變得非常明顯。狀態(tài)流的實現具有無分配的更新,而混雜的廣播Channel則不是這樣的。
A use-case for channels
隨著不同類型的shared flow量取代了不同類型的廣播頻道,流行的問題是普通的、常規(guī)的頻道會發(fā)生什么?由于許多原因,它們將繼續(xù)存在。其中一個原因是,Channel是用于實現許多復雜流量操作的低級基元。
但是,Channel也有其應用案例。Channel被用來處理那些必須被精確處理一次的事件*(詳見下面的附注)。這種情況發(fā)生在有一種事件類型的設計中,這種事件通常有一個訂閱者,但間歇性地(在啟動或某種重新配置期間)根本沒有訂閱者,而且有一個要求,即所有發(fā)布的事件必須保留,直到有訂閱者出現。
class SingleShotEventBus {
private val _events = Channel<Event>()
val events = _events.receiveAsFlow() // expose as flow
suspend fun postEvent(event: Event) {
_events.send(event) // suspends on buffer overflow
}
}
在第一個例子中,BroadcastEventBus是用SharedFlow編寫的,而SingleShotEventBus是用Channel編寫的,它們都把事件公開為Flow
在shared flow中,事件被廣播給未知數量(零或更多)的訂閱者。在沒有訂閱者的情況下,任何發(fā)布的事件都會被立即放棄。這是一種設計模式,用于必須立即處理或根本不處理的事件。
在Channel中,每個事件被傳遞給一個訂閱者。試圖在沒有訂閱者的情況下發(fā)布事件,一旦Channel緩沖區(qū)變滿就會暫停,等待訂閱者出現。發(fā)布的事件不會被丟棄。
請注意,有Channel的SingleShotEventBus實現只在沒有取消的情況下對每個發(fā)布的事件精確地處理一次。當流的訂閱者被取消時,事件可能無法被傳遞。詳情請參見Channel中未交付的元素的文檔。
Bottom line
了解兩者的區(qū)別,適當地使用shared flows和Channel。它們都很有用,被設計成可以很好地一起工作。然而,廣播Channel是過去的過時的人工制品,它們將在未來被廢棄和刪除。
原文鏈接:https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
向大家推薦下我的網站 https://xuyisheng.top/ 點擊原文一鍵直達
專注 Android-Kotlin-Flutter 歡迎大家訪問
往期推薦
更文不易,點個“三連”支持一下??
