kotlinadvanced

Kotlin Flow Operators and Transforms

Master Flow operators: map, filter, combine, zip, debounce, retry, buffer, and custom operators.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // Basic flow with operators
    val numbers = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    numbers
        .filter { it % 2 == 0 }
        .map { it * it }
        .take(3)
        .collect { println("Square: $it") } // 4, 16, 36

    // Flow from suspend function
    fun fetchPages(): Flow<String> = flow {
        for (page in 1..5) { delay(100); emit("Page $page data") }
    }

    // transform: emit multiple values
    fetchPages()
        .transform { emit("Loading: $it"); emit("Loaded: $it") }
        .take(6)
        .collect { println(it) }

    // combine: latest values from multiple flows
    val names = flowOf("Alice", "Bob", "Carol").onEach { delay(100) }
    val scores = flowOf(95, 87, 92).onEach { delay(150) }
    names.combine(scores) { name, score -> "$name: $score" }
        .collect { println("Combined: $it") }

    // zip: pair elements 1:1
    names.zip(scores) { name, score -> "$name scored $score" }
        .collect { println("Zipped: $it") }

    // flatMapConcat: sequential inner flows
    flowOf(1, 2, 3)
        .flatMapConcat { id -> flow { emit("$id-a"); delay(50); emit("$id-b") } }
        .collect { println("FlatMap: $it") }

    // Error handling
    flow { emit(1); emit(2); throw RuntimeException("Oops") }
        .catch { e -> emit(-1) }
        .onCompletion { if (it == null) println("Completed normally") }
        .collect { println("Value: $it") }

    // retry
    var attempt = 0
    flow {
        attempt++
        if (attempt < 3) throw RuntimeException("Attempt $attempt")
        emit("Success on attempt $attempt")
    }
        .retry(3) { println("Retrying: ${it.message}"); delay(100); true }
        .collect { println(it) }

    // distinctUntilChanged
    flowOf(1, 1, 2, 2, 3, 1, 1)
        .distinctUntilChanged()
        .collect { print("$it ") } // 1 2 3 1
    println()

    // scan (running accumulation)
    flowOf(1, 2, 3, 4, 5)
        .scan(0) { acc, value -> acc + value }
        .collect { print("$it ") } // 0 1 3 6 10 15
    println()
}

Use Cases

  • Reactive data streams with transformation
  • Combining multiple async data sources
  • Error recovery and retry in data pipelines

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.