kotlinadvanced

Flow Advanced Operators

Advanced flow operations: debounce, flatMapMerge, conflate, buffer, retry, and custom operators.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // Buffer — producer and collector run concurrently
    println("--- Buffer ---")
    val unbuffered = measureTimeMillis {
        flow {
            repeat(3) { emit(it); delay(100) }
        }.collect {
            delay(200) // slow consumer
            println("Collected $it")
        }
    }
    println("Without buffer: ${unbuffered}ms")

    val buffered = measureTimeMillis {
        flow {
            repeat(3) { emit(it); delay(100) }
        }.buffer().collect {
            delay(200)
            println("Buffered $it")
        }
    }
    println("With buffer: ${buffered}ms")

    // Conflate — drop intermediate values
    println("\n--- Conflate ---")
    flow {
        repeat(10) {
            emit(it)
            delay(50)
        }
    }.conflate().collect {
        println("Conflated: $it")
        delay(200)
    }

    // Debounce — wait for pause
    println("\n--- Debounce ---")
    flow {
        emit("a"); delay(100)
        emit("ab"); delay(100)
        emit("abc"); delay(300) // settled
        emit("x"); delay(300)   // settled
    }.debounce(200).collect { println("Debounced: $it") }

    // FlatMapConcat vs FlatMapMerge
    println("\n--- FlatMap ---")
    fun requestFlow(id: Int) = flow {
        emit("$id: Start")
        delay(100)
        emit("$id: End")
    }

    // Sequential
    (1..3).asFlow()
        .flatMapConcat { requestFlow(it) }
        .collect { println("Concat: $it") }

    // Concurrent
    (1..3).asFlow()
        .flatMapMerge(concurrency = 3) { requestFlow(it) }
        .collect { println("Merge: $it") }

    // Retry with exponential backoff
    println("\n--- Retry ---")
    var attempt = 0
    flow {
        attempt++
        println("Attempt $attempt")
        if (attempt < 3) throw RuntimeException("Failed")
        emit("Success on attempt $attempt")
    }.retry(3) { cause ->
        println("Retrying due to: ${cause.message}")
        delay(100 * attempt.toLong()) // backoff
        true
    }.catch { emit("Final fallback") }
     .collect { println(it) }

    // Scan — running accumulation
    println("\n--- Scan ---")
    (1..5).asFlow()
        .scan(0) { acc, value -> acc + value }
        .collect { println("Running sum: $it") }

    // Transform (general-purpose)
    println("\n--- Transform ---")
    (1..5).asFlow()
        .transform { value ->
            emit("Processing $value...")
            delay(50)
            emit("Result: ${value * value}")
        }
        .collect { println(it) }

    // Zip — combine two flows element-wise
    println("\n--- Zip ---")
    val names = flowOf("Alice", "Bob", "Charlie")
    val ages = flowOf(30, 25, 35)
    names.zip(ages) { name, age -> "$name ($age)" }
        .collect { println(it) }

    // Custom operator
    println("\n--- Custom chunked ---")
    fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = flow {
        val chunk = mutableListOf<T>()
        collect { value ->
            chunk.add(value)
            if (chunk.size == size) {
                emit(chunk.toList())
                chunk.clear()
            }
        }
        if (chunk.isNotEmpty()) emit(chunk.toList())
    }

    (1..7).asFlow()
        .chunked(3)
        .collect { println("Chunk: $it") }
}

private inline fun measureTimeMillis(block: () -> Unit): Long {
    val start = System.currentTimeMillis()
    block()
    return System.currentTimeMillis() - start
}

Use Cases

  • Search input debouncing
  • Concurrent API request batching
  • Backpressure handling in data pipelines

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.