kotlinadvanced
Flow Advanced Operators
Advanced flow operations: debounce, flatMapMerge, conflate, buffer, retry, and custom operators.
kotlinPress ⌘/Ctrl + Shift + C to copy
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Buffer — producer and collector run concurrently
println("--- Buffer ---")
val unbuffered = measureTimeMillis {
flow {
repeat(3) { emit(it); delay(100) }
}.collect {
delay(200) // slow consumer
println("Collected $it")
}
}
println("Without buffer: ${unbuffered}ms")
val buffered = measureTimeMillis {
flow {
repeat(3) { emit(it); delay(100) }
}.buffer().collect {
delay(200)
println("Buffered $it")
}
}
println("With buffer: ${buffered}ms")
// Conflate — drop intermediate values
println("\n--- Conflate ---")
flow {
repeat(10) {
emit(it)
delay(50)
}
}.conflate().collect {
println("Conflated: $it")
delay(200)
}
// Debounce — wait for pause
println("\n--- Debounce ---")
flow {
emit("a"); delay(100)
emit("ab"); delay(100)
emit("abc"); delay(300) // settled
emit("x"); delay(300) // settled
}.debounce(200).collect { println("Debounced: $it") }
// FlatMapConcat vs FlatMapMerge
println("\n--- FlatMap ---")
fun requestFlow(id: Int) = flow {
emit("$id: Start")
delay(100)
emit("$id: End")
}
// Sequential
(1..3).asFlow()
.flatMapConcat { requestFlow(it) }
.collect { println("Concat: $it") }
// Concurrent
(1..3).asFlow()
.flatMapMerge(concurrency = 3) { requestFlow(it) }
.collect { println("Merge: $it") }
// Retry with exponential backoff
println("\n--- Retry ---")
var attempt = 0
flow {
attempt++
println("Attempt $attempt")
if (attempt < 3) throw RuntimeException("Failed")
emit("Success on attempt $attempt")
}.retry(3) { cause ->
println("Retrying due to: ${cause.message}")
delay(100 * attempt.toLong()) // backoff
true
}.catch { emit("Final fallback") }
.collect { println(it) }
// Scan — running accumulation
println("\n--- Scan ---")
(1..5).asFlow()
.scan(0) { acc, value -> acc + value }
.collect { println("Running sum: $it") }
// Transform (general-purpose)
println("\n--- Transform ---")
(1..5).asFlow()
.transform { value ->
emit("Processing $value...")
delay(50)
emit("Result: ${value * value}")
}
.collect { println(it) }
// Zip — combine two flows element-wise
println("\n--- Zip ---")
val names = flowOf("Alice", "Bob", "Charlie")
val ages = flowOf(30, 25, 35)
names.zip(ages) { name, age -> "$name ($age)" }
.collect { println(it) }
// Custom operator
println("\n--- Custom chunked ---")
fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = flow {
val chunk = mutableListOf<T>()
collect { value ->
chunk.add(value)
if (chunk.size == size) {
emit(chunk.toList())
chunk.clear()
}
}
if (chunk.isNotEmpty()) emit(chunk.toList())
}
(1..7).asFlow()
.chunked(3)
.collect { println("Chunk: $it") }
}
private inline fun measureTimeMillis(block: () -> Unit): Long {
val start = System.currentTimeMillis()
block()
return System.currentTimeMillis() - start
}Use Cases
- Search input debouncing
- Concurrent API request batching
- Backpressure handling in data pipelines
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
kotlinintermediate
Flow — Reactive Streams with Coroutines
Build reactive pipelines with Kotlin Flow: emit, collect, transform, combine, and error handling.
Best for: Streaming data processing pipelines
#kotlin#flow
kotlinadvanced
Kotlin Flow Operators and Transforms
Master Flow operators: map, filter, combine, zip, debounce, retry, buffer, and custom operators.
Best for: Reactive data streams with transformation
#kotlin#flow
kotlinadvanced
StateFlow and SharedFlow — Hot Streams
Manage state with hot flows: StateFlow for reactive state, SharedFlow for events, and MutableStateFlow.
Best for: Reactive UI state management
#kotlin#stateflow
kotlinintermediate
Coroutines — launch, async, and Structured Concurrency
Write concurrent code with Kotlin coroutines: launch, async/await, structured concurrency, and dispatchers.
Best for: Parallel API calls in backend services
#kotlin#coroutines