kotlinadvanced

Structured Concurrency Patterns

Master coroutine structured concurrency: coroutineScope, async/await, fan-out/fan-in, and parallel map.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.measureTimeMillis

// Parallel map
suspend fun <T, R> List<T>.parallelMap(
    concurrency: Int = this.size,
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    val semaphore = kotlinx.coroutines.sync.Semaphore(concurrency)
    map { item ->
        async {
            semaphore.acquire()
            try {
                transform(item)
            } finally {
                semaphore.release()
            }
        }
    }.awaitAll()
}

// Parallel filter
suspend fun <T> List<T>.parallelFilter(
    concurrency: Int = this.size,
    predicate: suspend (T) -> Boolean
): List<T> = coroutineScope {
    val semaphore = kotlinx.coroutines.sync.Semaphore(concurrency)
    map { item ->
        async {
            semaphore.acquire()
            try {
                item to predicate(item)
            } finally {
                semaphore.release()
            }
        }
    }.awaitAll()
        .filter { it.second }
        .map { it.first }
}

// Fan-out / Fan-in pattern
suspend fun fanOutFanIn() = coroutineScope {
    val inputChannel = Channel<Int>(Channel.BUFFERED)
    val outputChannel = Channel<String>(Channel.BUFFERED)

    // Producer
    launch {
        repeat(20) {
            inputChannel.send(it)
        }
        inputChannel.close()
    }

    // Fan-out: multiple workers
    val workers = (1..3).map { workerId ->
        launch {
            for (item in inputChannel) {
                delay(50) // simulate work
                outputChannel.send("Worker-$workerId processed $item")
            }
        }
    }

    // Close output when all workers done
    launch {
        workers.forEach { it.join() }
        outputChannel.close()
    }

    // Fan-in: collect results
    val results = mutableListOf<String>()
    for (result in outputChannel) {
        results.add(result)
    }
    results
}

// Racing — first successful wins
suspend fun <T> raceSuccess(vararg tasks: suspend () -> T): T = coroutineScope {
    val deferred = tasks.map { task ->
        async {
            task()
        }
    }
    select {
        deferred.forEach { d ->
            d.onAwait { it }
        }
    }.also {
        deferred.forEach { it.cancel() } // cancel losers
    }
}

// Retry with backoff
suspend fun <T> retryWithBackoff(
    maxRetries: Int = 3,
    initialDelay: Long = 100,
    maxDelay: Long = 5000,
    factor: Double = 2.0,
    block: suspend () -> T
): T {
    var currentDelay = initialDelay
    repeat(maxRetries - 1) { attempt ->
        try {
            return block()
        } catch (e: Exception) {
            println("Attempt ${attempt + 1} failed: ${e.message}")
        }
        delay(currentDelay)
        currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
    }
    return block() // last attempt, let exception propagate
}

fun main() = runBlocking {
    // Parallel map
    println("--- Parallel Map ---")
    val time1 = measureTimeMillis {
        val results = (1..10).toList().parallelMap(concurrency = 5) { n ->
            delay(100) // simulate I/O
            n * n
        }
        println("Results: $results")
    }
    println("Time: ${time1}ms (sequential would be ~1000ms)")

    // Parallel filter
    println("\n--- Parallel Filter ---")
    val filtered = (1..20).toList().parallelFilter(concurrency = 5) { n ->
        delay(50)
        n % 3 == 0
    }
    println("Divisible by 3: $filtered")

    // Fan-out/Fan-in
    println("\n--- Fan-Out/Fan-In ---")
    val fanResults = fanOutFanIn()
    println("Processed ${fanResults.size} items")
    fanResults.take(5).forEach { println("  $it") }

    // Race
    println("\n--- Race ---")
    val winner = raceSuccess(
        { delay(300); "Slow" },
        { delay(100); "Fast" },
        { delay(200); "Medium" }
    )
    println("Winner: $winner")

    // Retry
    println("\n--- Retry ---")
    var attempts = 0
    val result = retryWithBackoff(maxRetries = 3) {
        attempts++
        if (attempts < 3) throw RuntimeException("Not ready")
        "Success on attempt $attempts"
    }
    println(result)

    // Structured scope — all children must complete
    println("\n--- Structured ---")
    val time2 = measureTimeMillis {
        coroutineScope {
            launch { delay(100); println("  Task A done") }
            launch { delay(200); println("  Task B done") }
            launch { delay(150); println("  Task C done") }
        }
        println("  All tasks completed")
    }
    println("Total time: ${time2}ms")
}

Use Cases

  • Parallel I/O operations with concurrency limits
  • Worker pool patterns
  • Resilient service calls with retry

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.