KotlinのSharedFlowとStateFlowについて【shareIn/stateIn】
Kotlin Coroutines Flow
Kotlinには便利なFlowというインターフェースがあります。
これがとても便利なのですが、いろいろな種類のクラスと拡張関数が用意されていて使いこなすのは一苦労です。
この記事では、私がよく使うFlowのテクニックとその解説を紹介します。
Flowとは何か
Flowはコールドストリームな非同期処理インターフェースです。
コールドストリームとは、購読者が現れて初めて動き出すストリームで、対となるのはホットストリームなChannelです。
それぞれよく使われる用途があるのですが、Flowの方が圧倒的に使う場面は多いと思います。
よく使うFlowの例
大きく分けて、
StateFlow/MutableStateFlowSharedFlow/MutableSharedFlowcallbackflow { }
に分かれます。
StateFlowとSharedFlowは Flow というインターフェースを継承しており、MutableとImmutableが存在します。
callbackFlowも返すのは Flow 型です。
ちなみにAndoirdのRoomライブラリで設計したDAOでも Flow を返すことができます。
ただのFlow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
/** 0~9までカウントアップするFlow */
fun countUpFlow() = flow {
repeat(10) {
delay(1000L)
emit(it)
}
}
fun main() {
runBlocking {
countUpFlow().collect {
println(it)
}
}
}
ただ1秒ごとにカウントアップするFlowです。
collectするもの (サブスクライバー)が現れてから動き始めます。
ただし、flowのスコープ外で値を新たにemitすることはできません。
こういったFlowの使い方はあまり実践では使うことはないでしょう。
StateFlow
StateFlowは最も使い勝手が良いFlowで、Androidで提供されるLiveDataとよく似ています。
通常のFlowは値を取る first() などのメソッドはsuspendですが、StateFlow は value に最新の値が格納され、非同期スコープ外であっても取得が可能です。
LiveDataとの大きな違いは、「同じ値をset (emit) したときにオブザーバ (サブスクライバー) に通知されるか否か」です。
LiveDataは通知されますが、StateFlowはされません。
また、StateFlowはKotlinのライブラリであってAndoirdのライブラリではないので、ライフサイクルが考慮されているLiveDataと違って、StateFlowは通常利用では考慮されていません。
具体的には、アプリがSTOPPED状態になればLiveDataは収集も止めますが、StateFlowは収集が止まりません。
これは repeatOnLifecycle という関数を利用すれば解決できるので、AndoirdでStateFlow を利用する場合はこちらも併用しましょう。
シンプルな例は以下の通りです。
import kotlinx.coroutines.flow.*
/** Int型のStateFlow。StateFlowは初期値が必要です。 */
val intStateFlow = MutableStateFlow(0)
fun main() {
// 本当はGlobalScopeの利用は非推奨です。
intStateFlow.onEach {
println("onEach: $it")
}.launchIn(GlobalScope)
runBlocking {
intStateFlow.emit(1)
delay(10)
intStateFlow.emit(1) // 同じ値は通知されない
delay(10)
intStateFlow.emit(2)
delay(10)
}
// value は非同期スコープ外でも取得できる。
println("value=${intStateFlow.value}")
}
出力:
onEach: 1
onEach: 2
value=2
onEachが使い勝手が良く、私はよく利用しますが下記のような書き方と同義です。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
/** Int型のStateFlow。StateFlowは初期値が必要です。 */
val intStateFlow = MutableStateFlow(0)
fun main() {
// 本当はGlobalScopeの利用は非推奨です。
GlobalScope.launch {
intStateFlow.collect {
println("collect: $it")
}
}
runBlocking {
intStateFlow.emit(1)
delay(10)
intStateFlow.emit(1) // 同じ値は通知されない
delay(10)
intStateFlow.emit(2)
delay(10)
}
// value は非同期スコープ外でも取得できる。
println("value=${intStateFlow.value}")
}
stateInでFlow → StateFlowに変換する
ときどき、Flowで返されたものをStateFlowに変換したいときがあります。
そんなときはstateInです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun intFlow() = flow {
repeat(10) {
emit(it)
}
}
fun main() {
val intStateFlow = intFlow().stateIn(
scope = GlobalScope, // どのCoroutineScopeで管理するか
started = SharingStarted.WhileSubscribed(), // 購読を開始するタイミング
initialValue = 0, // 初期値
)
}
ちょっとややこしいのがstartedという引数で、選択肢としては下記の通りです。
| 種類 | 意味 |
WhileSubscribed() |
誰かが購読を開始したら収集を始める。誰も購読しなくなったら収集をやめる。通常のコールドFlowと同じ。 |
Eagerly |
購読者がいなくても、即座に収集を開始する。ホットなFlowにできる。 |
Lazily |
購読者が1つでも現れたら収集開始。購読者がいなくなっても収集はやめない。上記2つの間を取ったような形。 |
また、mapと併用するとStateFlowを自分好みに変換ができます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun intFlow() = flow {
repeat(10) {
emit(it)
}
}
fun main() {
val stringStateFlow = intFlow()
.map { "StateFlow($it)" } // int -> String
.stateIn(
scope = GlobalScope,
started = SharingStarted.Eagerly,
initialValue = 0,
)
runBlocking {
delay(1000)
}
println(stringStateFlow.value) // => StateFlow(9)
}
上記はstarted = Eagerlyにしたので、購読者がいなくともStateFlowが値を収集しています。
SharedFlow
SharedFlowはStateFlowと似ていますが、用途はちょっと異なります。
実は、StateFlowはSharedFlowでもあります。
public interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
決定的に違う点は、SharedFlowはreplayCacheが任意の大きさで決定できます。
replayCacheとは「古い値を保持するためのキャッシュ」です。
したがって、SharedFlowは過去の値を参照することができ、どの程度過去の値を記憶しておくかも任意に決定できます。
この点はChannelとよく似ていますね。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.BufferOverflow
fun main() {
val intSharedFlow = MutableSharedFlow<Int>(
replay = 3, // 最新の3つ値を保持する
onBufferOverflow = BufferOverflow.DROP_OLDEST, // 溢れたら古い値を落とす
)
intSharedFlow.onEach {
println("onEach: $it (replay=${intSharedFlow.replayCache})")
}.launchIn(GlobalScope)
runBlocking {
intSharedFlow.emit(1)
delay(10)
intSharedFlow.emit(2)
delay(10)
intSharedFlow.emit(3)
delay(10)
intSharedFlow.emit(4)
delay(10)
intSharedFlow.emit(5)
delay(10)
}
}
出力:
onEach: 1 (replay=[1])
onEach: 2 (replay=[1, 2])
onEach: 3 (replay=[1, 2, 3])
onEach: 4 (replay=[2, 3, 4])
onEach: 5 (replay=[3, 4, 5])
ちなみにStateFlowのreplayCacheは下記のような実装なので、最新の値1つしか取れません。
override val replayCache: List<T>
get() = listOf(value)
この辺りはもっと融通が効くようになってほしいと思うのですが、Kotlin 1.5の段階ではStateFlowのreplayCacheをいじることはできなさそうです。
shareInでFlow → SharedFlowに変換する
このあたりはstateInと大体同じです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun intFlow() = flow {
repeat(10) {
emit(it)
}
}
fun main() {
val stringStateFlow = intFlow()
.shareIn(
scope = GlobalScope,
started = SharingStarted.WhileSubscribed(),
replay = 3,
)
stringStateFlow.onEach {
println("onEach: $it (replay=${stringStateFlow.replayCache})")
}.launchIn(GlobalScope)
runBlocking {
delay(1000)
}
}
出力:
onEach: 0 (replay=[7, 8, 9])
onEach: 1 (replay=[7, 8, 9])
onEach: 2 (replay=[7, 8, 9])
onEach: 3 (replay=[7, 8, 9])
onEach: 4 (replay=[7, 8, 9])
onEach: 5 (replay=[7, 8, 9])
onEach: 6 (replay=[7, 8, 9])
onEach: 7 (replay=[7, 8, 9])
onEach: 8 (replay=[7, 8, 9])
onEach: 9 (replay=[7, 8, 9])
上記出力結果を見ると、replayCacheにはすでに内部で収集した値が入っていますね。
callbackFlow
callbackFlowはとあるコールバックを行うインターフェースを購読する際に、効果を発揮します。
今までそのインターフェースを各オブジェクト内で、もしくはそのオブジェクト自身が実装して、コールバックを受け取る形が普通でしたが、callbackFlowを使うとFlowでそのインターフェースを購読できます。
例を書いてみると下記のようなイメージです。
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
class DataManager {
private var value: Int = 0
private val listeners = mutableSetOf<Listener>()
fun countUp() {
value++
// ...
notifyChanged()
}
fun addListener(l: Listener) {
println("addListener")
listeners.add(l)
}
fun removeListener(l: Listener) {
println("removeListener")
listeners.remove(l)
}
private fun notifyChanged() {
listeners.forEach { it.onChange(value) }
}
/** とあるコールバックインターフェース */
fun interface Listener {
fun onChange(newValue: Int)
}
}
class Sample {
private val manager = DataManager()
fun update() = manager.countUp()
/** ListenerをFlowとして購読する。 */
fun observeDataManagerChange() = callbackFlow {
val listener = DataManager.Listener {
// do something
println("callbackFlow: $it")
trySend(it)
}
manager.addListener(listener)
awaitClose {
manager.removeListener(listener) // Listenerの解除は勝手にやってくれる
}
}
}
fun main() {
val sample = Sample()
val observeJob = sample.observeDataManagerChange().onEach {
println("onEach: $it")
}.launchIn(GlobalScope)
runBlocking {
delay(100)
sample.update()
sample.update()
sample.update()
delay(100)
}
observeJob.cancel()
}
出力:
addListener
callbackFlow: 1
callbackFlow: 2
callbackFlow: 3
onEach: 1
onEach: 2
onEach: 3
removeListener
こうすることで、コールバックインターフェース (上記でいうListener) の登録と解除を全てcallbackFlow内で記述することができ、一つのFlowとして扱うことが可能です。
もちろん、これをStateFlowなどに変換すれば、より柔軟な使い方ができます。


