kotlinadvanced

StateFlow and SharedFlow — Hot Streams

Manage state with hot flows: StateFlow for reactive state, SharedFlow for events, and MutableStateFlow.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// StateFlow — holds current value, emits to new collectors
class CounterViewModel {
    private val _count = MutableStateFlow(0)
    val count: StateFlow<Int> = _count.asStateFlow()

    private val _loading = MutableStateFlow(false)
    val loading: StateFlow<Boolean> = _loading.asStateFlow()

    fun increment() { _count.value++ }
    fun decrement() { _count.value-- }
    fun reset() { _count.value = 0 }

    suspend fun loadData() {
        _loading.value = true
        delay(1000) // simulate
        _count.value = 42
        _loading.value = false
    }
}

// SharedFlow — event bus (no initial value, replay configurable)
class EventBus {
    private val _events = MutableSharedFlow<Event>(
        replay = 0,          // don't replay to new collectors
        extraBufferCapacity = 64
    )
    val events: SharedFlow<Event> = _events.asSharedFlow()

    suspend fun emit(event: Event) = _events.emit(event)
    fun tryEmit(event: Event) = _events.tryEmit(event)
}

sealed class Event {
    data class UserLoggedIn(val userId: String) : Event()
    data class ItemAdded(val itemId: String) : Event()
    data class Error(val message: String) : Event()
    data object AppStarted : Event()
}

// Combining multiple StateFlows
data class UiState(
    val count: Int,
    val loading: Boolean,
    val message: String
)

fun main() = runBlocking {
    // StateFlow
    println("--- StateFlow ---")
    val viewModel = CounterViewModel()

    // Collector 1
    val collector1 = launch {
        viewModel.count.collect { println("Collector 1: count = $it") }
    }

    delay(100)
    viewModel.increment()
    viewModel.increment()
    viewModel.increment()
    delay(100)

    // Collector 2 gets CURRENT value immediately
    val collector2 = launch {
        viewModel.count.take(1).collect { println("Collector 2 (late): count = $it") }
    }
    delay(100)

    collector1.cancel()
    collector2.cancel()

    // SharedFlow events
    println("\n--- SharedFlow ---")
    val bus = EventBus()

    val subscriber = launch {
        bus.events.collect { event ->
            when (event) {
                is Event.UserLoggedIn -> println("User ${event.userId} logged in")
                is Event.ItemAdded -> println("Item ${event.itemId} added")
                is Event.Error -> println("Error: ${event.message}")
                Event.AppStarted -> println("App started")
            }
        }
    }

    delay(50)
    bus.emit(Event.AppStarted)
    bus.emit(Event.UserLoggedIn("alice"))
    bus.emit(Event.ItemAdded("item-1"))
    delay(100)
    subscriber.cancel()

    // Combining flows
    println("\n--- Combine ---")
    val nameFlow = MutableStateFlow("Alice")
    val ageFlow = MutableStateFlow(30)

    val combined = launch {
        combine(nameFlow, ageFlow) { name, age ->
            "$name (age $age)"
        }.take(3).collect { println("Combined: $it") }
    }

    delay(100)
    nameFlow.value = "Bob"
    delay(100)
    ageFlow.value = 25
    delay(100)
    combined.cancel()

    // stateIn — convert cold flow to StateFlow
    println("\n--- stateIn ---")
    val coldFlow = flow {
        var n = 0
        while (true) {
            emit(n++)
            delay(200)
        }
    }
    val hotState = coldFlow.stateIn(
        scope = this,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = -1
    )
    println("Current: ${hotState.value}")
    delay(500)
    println("After 500ms: ${hotState.value}")

    coroutineContext.cancelChildren()
}

Use Cases

  • Reactive UI state management
  • Application-wide event bus
  • Combining multiple data sources

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.