Structured Concurrency Patterns
Master coroutine structured concurrency: coroutineScope, async/await, fan-out/fan-in, and parallel map.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.measureTimeMillis
// Parallel map
suspend fun <T, R> List<T>.parallelMap(
concurrency: Int = this.size,
transform: suspend (T) -> R
): List<R> = coroutineScope {
val semaphore = kotlinx.coroutines.sync.Semaphore(concurrency)
map { item ->
async {
semaphore.acquire()
try {
transform(item)
} finally {
semaphore.release()
}
}
}.awaitAll()
}
// Parallel filter
suspend fun <T> List<T>.parallelFilter(
concurrency: Int = this.size,
predicate: suspend (T) -> Boolean
): List<T> = coroutineScope {
val semaphore = kotlinx.coroutines.sync.Semaphore(concurrency)
map { item ->
async {
semaphore.acquire()
try {
item to predicate(item)
} finally {
semaphore.release()
}
}
}.awaitAll()
.filter { it.second }
.map { it.first }
}
// Fan-out / Fan-in pattern
suspend fun fanOutFanIn() = coroutineScope {
val inputChannel = Channel<Int>(Channel.BUFFERED)
val outputChannel = Channel<String>(Channel.BUFFERED)
// Producer
launch {
repeat(20) {
inputChannel.send(it)
}
inputChannel.close()
}
// Fan-out: multiple workers
val workers = (1..3).map { workerId ->
launch {
for (item in inputChannel) {
delay(50) // simulate work
outputChannel.send("Worker-$workerId processed $item")
}
}
}
// Close output when all workers done
launch {
workers.forEach { it.join() }
outputChannel.close()
}
// Fan-in: collect results
val results = mutableListOf<String>()
for (result in outputChannel) {
results.add(result)
}
results
}
// Racing — first successful wins
suspend fun <T> raceSuccess(vararg tasks: suspend () -> T): T = coroutineScope {
val deferred = tasks.map { task ->
async {
task()
}
}
select {
deferred.forEach { d ->
d.onAwait { it }
}
}.also {
deferred.forEach { it.cancel() } // cancel losers
}
}
// Retry with backoff
suspend fun <T> retryWithBackoff(
maxRetries: Int = 3,
initialDelay: Long = 100,
maxDelay: Long = 5000,
factor: Double = 2.0,
block: suspend () -> T
): T {
var currentDelay = initialDelay
repeat(maxRetries - 1) { attempt ->
try {
return block()
} catch (e: Exception) {
println("Attempt ${attempt + 1} failed: ${e.message}")
}
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
return block() // last attempt, let exception propagate
}
fun main() = runBlocking {
// Parallel map
println("--- Parallel Map ---")
val time1 = measureTimeMillis {
val results = (1..10).toList().parallelMap(concurrency = 5) { n ->
delay(100) // simulate I/O
n * n
}
println("Results: $results")
}
println("Time: ${time1}ms (sequential would be ~1000ms)")
// Parallel filter
println("\n--- Parallel Filter ---")
val filtered = (1..20).toList().parallelFilter(concurrency = 5) { n ->
delay(50)
n % 3 == 0
}
println("Divisible by 3: $filtered")
// Fan-out/Fan-in
println("\n--- Fan-Out/Fan-In ---")
val fanResults = fanOutFanIn()
println("Processed ${fanResults.size} items")
fanResults.take(5).forEach { println(" $it") }
// Race
println("\n--- Race ---")
val winner = raceSuccess(
{ delay(300); "Slow" },
{ delay(100); "Fast" },
{ delay(200); "Medium" }
)
println("Winner: $winner")
// Retry
println("\n--- Retry ---")
var attempts = 0
val result = retryWithBackoff(maxRetries = 3) {
attempts++
if (attempts < 3) throw RuntimeException("Not ready")
"Success on attempt $attempts"
}
println(result)
// Structured scope — all children must complete
println("\n--- Structured ---")
val time2 = measureTimeMillis {
coroutineScope {
launch { delay(100); println(" Task A done") }
launch { delay(200); println(" Task B done") }
launch { delay(150); println(" Task C done") }
}
println(" All tasks completed")
}
println("Total time: ${time2}ms")
}Use Cases
- Parallel I/O operations with concurrency limits
- Worker pool patterns
- Resilient service calls with retry
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Coroutines — launch, async, and Structured Concurrency
Write concurrent code with Kotlin coroutines: launch, async/await, structured concurrency, and dispatchers.
Best for: Parallel API calls in backend services
Channels — Producer-Consumer with Coroutines
Communicate between coroutines with Channels: produce, actor pattern, fan-out/fan-in, and select.
Best for: Producer-consumer patterns in coroutines
Coroutine Channels Producer Consumer
Use Kotlin channels for coroutine communication: buffered, conflated, fan-out, and fan-in patterns.
Best for: Coroutine-based producer-consumer patterns
Coroutine Mutex and Synchronization
Synchronize shared mutable state in coroutines with Mutex, atomic operations, and thread confinement.
Best for: Thread-safe shared state in coroutines