kotlinintermediate

Flow — Reactive Streams with Coroutines

Build reactive pipelines with Kotlin Flow: emit, collect, transform, combine, and error handling.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Simple flow
fun numberFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)  // simulate async work
        emit(i)
    }
}

// Infinite flow with cancellation support
fun tickerFlow(intervalMs: Long): Flow<Long> = flow {
    var count = 0L
    while (true) {
        emit(count++)
        delay(intervalMs)
    }
}

// Database-style flow
data class User(val id: Int, val name: String, val score: Int)

fun userFlow(): Flow<User> = flowOf(
    User(1, "Alice", 95),
    User(2, "Bob", 72),
    User(3, "Charlie", 88),
    User(4, "Diana", 91),
    User(5, "Eve", 65)
)

fun main() = runBlocking {
    // Basic collect
    numberFlow().collect { println("Received: $it") }

    // Operators
    println("\n--- Operators ---")
    userFlow()
        .filter { it.score >= 80 }
        .map { "${it.name}: ${it.score}" }
        .collect { println(it) }

    // Transform
    println("\n--- Transform ---")
    userFlow()
        .transform { user ->
            emit("Processing ${user.name}...")
            if (user.score >= 80) emit("✓ ${user.name} passed")
            else emit("✗ ${user.name} failed")
        }
        .collect { println(it) }

    // Reduce / fold
    val total = userFlow().map { it.score }.reduce { a, b -> a + b }
    println("\nTotal score: $total")

    val avg = userFlow()
        .map { it.score }
        .fold(Pair(0, 0)) { (sum, count), score -> Pair(sum + score, count + 1) }
        .let { (sum, count) -> sum.toDouble() / count }
    println("Average: $avg")

    // take / drop
    println("\n--- Take/Drop ---")
    numberFlow().take(3).collect { print("$it ") } // 1 2 3
    println()
    numberFlow().drop(3).collect { print("$it ") } // 4 5
    println()

    // Combine flows
    println("\n--- Combine ---")
    val names = flowOf("Alice", "Bob", "Charlie")
    val ages = flowOf(30, 25, 35)
    names.zip(ages) { name, age -> "$name ($age)" }
        .collect { println(it) }

    // Error handling
    println("\n--- Error Handling ---")
    flow {
        emit(1)
        emit(2)
        throw RuntimeException("Oops!")
    }
        .catch { e -> emit(-1) } // recover
        .onCompletion { cause ->
            println(if (cause == null) "Done" else "Failed: $cause")
        }
        .collect { println("Value: $it") }

    // flowOn — change dispatcher
    println("\n--- Dispatchers ---")
    flow {
        println("Emitting on: ${Thread.currentThread().name}")
        emit(42)
    }
        .flowOn(Dispatchers.IO)
        .collect {
            println("Collecting on: ${Thread.currentThread().name}")
        }

    // Debounce / buffer (ticker example)
    println("\n--- Buffered ---")
    tickerFlow(100)
        .take(5)
        .buffer()  // don't slow down emitter
        .collect {
            println("Tick: $it")
            delay(200) // slow consumer
        }
}

Use Cases

  • Streaming data processing pipelines
  • Real-time event handling
  • Reactive UI state management

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.