KotlinのSharedFlowとStateFlowについて【shareIn/stateIn】
Kotlin Coroutines Flow
Kotlinには便利なFlow
というインターフェースがあります。
これがとても便利なのですが、いろいろな種類のクラスと拡張関数が用意されていて使いこなすのは一苦労です。
この記事では、私がよく使うFlow
のテクニックとその解説を紹介します。
Flowとは何か
Flow
はコールドストリームな非同期処理インターフェースです。
コールドストリームとは、購読者が現れて初めて動き出すストリームで、対となるのはホットストリームなChannel
です。
それぞれよく使われる用途があるのですが、Flow
の方が圧倒的に使う場面は多いと思います。
よく使うFlowの例
大きく分けて、
StateFlow
/MutableStateFlow
SharedFlow
/MutableSharedFlow
callbackflow { }
に分かれます。
StateFlow
とSharedFlow
は Flow
というインターフェースを継承しており、MutableとImmutableが存在します。
callbackFlow
も返すのは Flow
型です。
ちなみにAndoirdのRoomライブラリで設計したDAOでも Flow
を返すことができます。
ただのFlow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
/** 0~9までカウントアップするFlow */
fun countUpFlow() = flow {
repeat(10) {
delay(1000L)
emit(it)
}
}
fun main() {
runBlocking {
countUpFlow().collect {
println(it)
}
}
}
ただ1秒ごとにカウントアップするFlow
です。
collect
するもの (サブスクライバー)が現れてから動き始めます。
ただし、flow
のスコープ外で値を新たにemit
することはできません。
こういったFlow
の使い方はあまり実践では使うことはないでしょう。
StateFlow
StateFlow
は最も使い勝手が良いFlow
で、Androidで提供されるLiveData
とよく似ています。
通常のFlow
は値を取る first()
などのメソッドはsuspendですが、StateFlow
は value
に最新の値が格納され、非同期スコープ外であっても取得が可能です。
LiveData
との大きな違いは、「同じ値をset (emit) したときにオブザーバ (サブスクライバー) に通知されるか否か」です。
LiveData
は通知されますが、StateFlow
はされません。
また、StateFlow
はKotlinのライブラリであってAndoirdのライブラリではないので、ライフサイクルが考慮されているLiveData
と違って、StateFlow
は通常利用では考慮されていません。
具体的には、アプリがSTOPPED状態になればLiveData
は収集も止めますが、StateFlow
は収集が止まりません。
これは repeatOnLifecycle
という関数を利用すれば解決できるので、AndoirdでStateFlow
を利用する場合はこちらも併用しましょう。
シンプルな例は以下の通りです。
import kotlinx.coroutines.flow.*
/** Int型のStateFlow。StateFlowは初期値が必要です。 */
val intStateFlow = MutableStateFlow(0)
fun main() {
// 本当はGlobalScopeの利用は非推奨です。
intStateFlow.onEach {
println("onEach: $it")
}.launchIn(GlobalScope)
runBlocking {
intStateFlow.emit(1)
delay(10)
intStateFlow.emit(1) // 同じ値は通知されない
delay(10)
intStateFlow.emit(2)
delay(10)
}
// value は非同期スコープ外でも取得できる。
println("value=${intStateFlow.value}")
}
出力:
onEach: 1
onEach: 2
value=2
onEach
が使い勝手が良く、私はよく利用しますが下記のような書き方と同義です。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
/** Int型のStateFlow。StateFlowは初期値が必要です。 */
val intStateFlow = MutableStateFlow(0)
fun main() {
// 本当はGlobalScopeの利用は非推奨です。
GlobalScope.launch {
intStateFlow.collect {
println("collect: $it")
}
}
runBlocking {
intStateFlow.emit(1)
delay(10)
intStateFlow.emit(1) // 同じ値は通知されない
delay(10)
intStateFlow.emit(2)
delay(10)
}
// value は非同期スコープ外でも取得できる。
println("value=${intStateFlow.value}")
}
stateInでFlow → StateFlowに変換する
ときどき、Flow
で返されたものをStateFlow
に変換したいときがあります。
そんなときはstateIn
です。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun intFlow() = flow {
repeat(10) {
emit(it)
}
}
fun main() {
val intStateFlow = intFlow().stateIn(
scope = GlobalScope, // どのCoroutineScopeで管理するか
started = SharingStarted.WhileSubscribed(), // 購読を開始するタイミング
initialValue = 0, // 初期値
)
}
ちょっとややこしいのがstarted
という引数で、選択肢としては下記の通りです。
種類 | 意味 |
WhileSubscribed() |
誰かが購読を開始したら収集を始める。誰も購読しなくなったら収集をやめる。通常のコールドFlow と同じ。 |
Eagerly |
購読者がいなくても、即座に収集を開始する。ホットなFlow にできる。 |
Lazily |
購読者が1つでも現れたら収集開始。購読者がいなくなっても収集はやめない。上記2つの間を取ったような形。 |
また、map
と併用するとStateFlow
を自分好みに変換ができます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun intFlow() = flow {
repeat(10) {
emit(it)
}
}
fun main() {
val stringStateFlow = intFlow()
.map { "StateFlow($it)" } // int -> String
.stateIn(
scope = GlobalScope,
started = SharingStarted.Eagerly,
initialValue = 0,
)
runBlocking {
delay(1000)
}
println(stringStateFlow.value) // => StateFlow(9)
}
上記はstarted = Eagerly
にしたので、購読者がいなくともStateFlow
が値を収集しています。
SharedFlow
SharedFlow
はStateFlow
と似ていますが、用途はちょっと異なります。
実は、StateFlow
はSharedFlow
でもあります。
public interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
決定的に違う点は、SharedFlow
はreplayCache
が任意の大きさで決定できます。
replayCache
とは「古い値を保持するためのキャッシュ」です。
したがって、SharedFlow
は過去の値を参照することができ、どの程度過去の値を記憶しておくかも任意に決定できます。
この点はChannel
とよく似ていますね。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.BufferOverflow
fun main() {
val intSharedFlow = MutableSharedFlow<Int>(
replay = 3, // 最新の3つ値を保持する
onBufferOverflow = BufferOverflow.DROP_OLDEST, // 溢れたら古い値を落とす
)
intSharedFlow.onEach {
println("onEach: $it (replay=${intSharedFlow.replayCache})")
}.launchIn(GlobalScope)
runBlocking {
intSharedFlow.emit(1)
delay(10)
intSharedFlow.emit(2)
delay(10)
intSharedFlow.emit(3)
delay(10)
intSharedFlow.emit(4)
delay(10)
intSharedFlow.emit(5)
delay(10)
}
}
出力:
onEach: 1 (replay=[1])
onEach: 2 (replay=[1, 2])
onEach: 3 (replay=[1, 2, 3])
onEach: 4 (replay=[2, 3, 4])
onEach: 5 (replay=[3, 4, 5])
ちなみにStateFlow
のreplayCache
は下記のような実装なので、最新の値1つしか取れません。
override val replayCache: List<T>
get() = listOf(value)
この辺りはもっと融通が効くようになってほしいと思うのですが、Kotlin 1.5の段階ではStateFlow
のreplayCache
をいじることはできなさそうです。
shareInでFlow → SharedFlowに変換する
このあたりはstateIn
と大体同じです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun intFlow() = flow {
repeat(10) {
emit(it)
}
}
fun main() {
val stringStateFlow = intFlow()
.shareIn(
scope = GlobalScope,
started = SharingStarted.WhileSubscribed(),
replay = 3,
)
stringStateFlow.onEach {
println("onEach: $it (replay=${stringStateFlow.replayCache})")
}.launchIn(GlobalScope)
runBlocking {
delay(1000)
}
}
出力:
onEach: 0 (replay=[7, 8, 9])
onEach: 1 (replay=[7, 8, 9])
onEach: 2 (replay=[7, 8, 9])
onEach: 3 (replay=[7, 8, 9])
onEach: 4 (replay=[7, 8, 9])
onEach: 5 (replay=[7, 8, 9])
onEach: 6 (replay=[7, 8, 9])
onEach: 7 (replay=[7, 8, 9])
onEach: 8 (replay=[7, 8, 9])
onEach: 9 (replay=[7, 8, 9])
上記出力結果を見ると、replayCache
にはすでに内部で収集した値が入っていますね。
callbackFlow
callbackFlow
はとあるコールバックを行うインターフェースを購読する際に、効果を発揮します。
今までそのインターフェースを各オブジェクト内で、もしくはそのオブジェクト自身が実装して、コールバックを受け取る形が普通でしたが、callbackFlow
を使うとFlow
でそのインターフェースを購読できます。
例を書いてみると下記のようなイメージです。
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
class DataManager {
private var value: Int = 0
private val listeners = mutableSetOf<Listener>()
fun countUp() {
value++
// ...
notifyChanged()
}
fun addListener(l: Listener) {
println("addListener")
listeners.add(l)
}
fun removeListener(l: Listener) {
println("removeListener")
listeners.remove(l)
}
private fun notifyChanged() {
listeners.forEach { it.onChange(value) }
}
/** とあるコールバックインターフェース */
fun interface Listener {
fun onChange(newValue: Int)
}
}
class Sample {
private val manager = DataManager()
fun update() = manager.countUp()
/** ListenerをFlowとして購読する。 */
fun observeDataManagerChange() = callbackFlow {
val listener = DataManager.Listener {
// do something
println("callbackFlow: $it")
trySend(it)
}
manager.addListener(listener)
awaitClose {
manager.removeListener(listener) // Listenerの解除は勝手にやってくれる
}
}
}
fun main() {
val sample = Sample()
val observeJob = sample.observeDataManagerChange().onEach {
println("onEach: $it")
}.launchIn(GlobalScope)
runBlocking {
delay(100)
sample.update()
sample.update()
sample.update()
delay(100)
}
observeJob.cancel()
}
出力:
addListener
callbackFlow: 1
callbackFlow: 2
callbackFlow: 3
onEach: 1
onEach: 2
onEach: 3
removeListener
こうすることで、コールバックインターフェース (上記でいうListener
) の登録と解除を全てcallbackFlow
内で記述することができ、一つのFlow
として扱うことが可能です。
もちろん、これをStateFlow
などに変換すれば、より柔軟な使い方ができます。