kotlinintermediate
Flow — Reactive Streams with Coroutines
Build reactive pipelines with Kotlin Flow: emit, collect, transform, combine, and error handling.
kotlinPress ⌘/Ctrl + Shift + C to copy
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Simple flow
fun numberFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(100) // simulate async work
emit(i)
}
}
// Infinite flow with cancellation support
fun tickerFlow(intervalMs: Long): Flow<Long> = flow {
var count = 0L
while (true) {
emit(count++)
delay(intervalMs)
}
}
// Database-style flow
data class User(val id: Int, val name: String, val score: Int)
fun userFlow(): Flow<User> = flowOf(
User(1, "Alice", 95),
User(2, "Bob", 72),
User(3, "Charlie", 88),
User(4, "Diana", 91),
User(5, "Eve", 65)
)
fun main() = runBlocking {
// Basic collect
numberFlow().collect { println("Received: $it") }
// Operators
println("\n--- Operators ---")
userFlow()
.filter { it.score >= 80 }
.map { "${it.name}: ${it.score}" }
.collect { println(it) }
// Transform
println("\n--- Transform ---")
userFlow()
.transform { user ->
emit("Processing ${user.name}...")
if (user.score >= 80) emit("✓ ${user.name} passed")
else emit("✗ ${user.name} failed")
}
.collect { println(it) }
// Reduce / fold
val total = userFlow().map { it.score }.reduce { a, b -> a + b }
println("\nTotal score: $total")
val avg = userFlow()
.map { it.score }
.fold(Pair(0, 0)) { (sum, count), score -> Pair(sum + score, count + 1) }
.let { (sum, count) -> sum.toDouble() / count }
println("Average: $avg")
// take / drop
println("\n--- Take/Drop ---")
numberFlow().take(3).collect { print("$it ") } // 1 2 3
println()
numberFlow().drop(3).collect { print("$it ") } // 4 5
println()
// Combine flows
println("\n--- Combine ---")
val names = flowOf("Alice", "Bob", "Charlie")
val ages = flowOf(30, 25, 35)
names.zip(ages) { name, age -> "$name ($age)" }
.collect { println(it) }
// Error handling
println("\n--- Error Handling ---")
flow {
emit(1)
emit(2)
throw RuntimeException("Oops!")
}
.catch { e -> emit(-1) } // recover
.onCompletion { cause ->
println(if (cause == null) "Done" else "Failed: $cause")
}
.collect { println("Value: $it") }
// flowOn — change dispatcher
println("\n--- Dispatchers ---")
flow {
println("Emitting on: ${Thread.currentThread().name}")
emit(42)
}
.flowOn(Dispatchers.IO)
.collect {
println("Collecting on: ${Thread.currentThread().name}")
}
// Debounce / buffer (ticker example)
println("\n--- Buffered ---")
tickerFlow(100)
.take(5)
.buffer() // don't slow down emitter
.collect {
println("Tick: $it")
delay(200) // slow consumer
}
}Use Cases
- Streaming data processing pipelines
- Real-time event handling
- Reactive UI state management
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
kotlinadvanced
Flow Advanced Operators
Advanced flow operations: debounce, flatMapMerge, conflate, buffer, retry, and custom operators.
Best for: Search input debouncing
#kotlin#flow
kotlinadvanced
Kotlin Flow Operators and Transforms
Master Flow operators: map, filter, combine, zip, debounce, retry, buffer, and custom operators.
Best for: Reactive data streams with transformation
#kotlin#flow
kotlinadvanced
StateFlow and SharedFlow — Hot Streams
Manage state with hot flows: StateFlow for reactive state, SharedFlow for events, and MutableStateFlow.
Best for: Reactive UI state management
#kotlin#stateflow
kotlinintermediate
Coroutines — launch, async, and Structured Concurrency
Write concurrent code with Kotlin coroutines: launch, async/await, structured concurrency, and dispatchers.
Best for: Parallel API calls in backend services
#kotlin#coroutines