Structured Concurrency Patterns
Apply structured concurrency: parallel decomposition, scoped tasks, cancellation, and resource cleanup.
import kotlinx.coroutines.*
// Parallel decomposition
suspend fun fetchUserProfile(userId: String): Map<String, Any> = coroutineScope {
// All three run in parallel within this scope
val user = async { fetchUser(userId) }
val posts = async { fetchPosts(userId) }
val followers = async { fetchFollowerCount(userId) }
mapOf(
"user" to user.await(),
"posts" to posts.await(),
"followers" to followers.await()
)
}
// Cancellation-aware work
suspend fun processLargeDataset(items: List<String>): List<String> = coroutineScope {
items.map { item ->
async {
ensureActive() // check if still active
processItem(item)
}
}.awaitAll()
}
// Scoped resource management
class DatabaseConnection(val name: String) {
fun query(sql: String) = "Result for: $sql"
fun close() = println("Closing $name")
}
suspend fun withDatabase(block: suspend (DatabaseConnection) -> Unit) {
val db = DatabaseConnection("MainDB")
try {
block(db)
} finally {
db.close()
}
}
// Timeout pattern
suspend fun fetchWithFallback(): String {
return try {
withTimeout(1000) {
delay(2000) // simulate slow network
"Network result"
}
} catch (e: TimeoutCancellationException) {
"Cached fallback result"
}
}
// Rate limiting with semaphore
suspend fun rateLimitedFetch(urls: List<String>): List<String> = coroutineScope {
val semaphore = kotlinx.coroutines.sync.Semaphore(3) // max 3 concurrent
urls.map { url ->
async {
semaphore.acquire()
try {
delay(100) // simulate fetch
"Response from $url"
} finally {
semaphore.release()
}
}
}.awaitAll()
}
// Simulated functions
suspend fun fetchUser(id: String): Map<String, String> {
delay(100); return mapOf("id" to id, "name" to "Alice")
}
suspend fun fetchPosts(userId: String): List<String> {
delay(150); return listOf("Post 1", "Post 2")
}
suspend fun fetchFollowerCount(userId: String): Int {
delay(80); return 1234
}
suspend fun processItem(item: String): String {
delay(50); return item.uppercase()
}
fun main() = runBlocking {
// Parallel decomposition
val profile = fetchUserProfile("user-1")
println("Profile: $profile")
// Cancellation
val job = launch {
val result = processLargeDataset(listOf("a", "b", "c", "d", "e"))
println("Processed: $result")
}
job.join()
// Scoped resource
withDatabase { db ->
println(db.query("SELECT * FROM users"))
}
// Timeout with fallback
println("Fetch: ${fetchWithFallback()}")
// Rate limited
val urls = (1..10).map { "https://api.example.com/$it" }
val responses = rateLimitedFetch(urls)
println("Fetched ${responses.size} responses")
}Use Cases
- Parallel API calls with structured lifecycle
- Timeout and fallback patterns
- Rate-limited concurrent operations
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Structured Concurrency Patterns
Master coroutine structured concurrency: coroutineScope, async/await, fan-out/fan-in, and parallel map.
Best for: Parallel I/O operations with concurrency limits
Coroutines — launch, async, and Structured Concurrency
Write concurrent code with Kotlin coroutines: launch, async/await, structured concurrency, and dispatchers.
Best for: Parallel API calls in backend services
Flow — Reactive Streams with Coroutines
Build reactive pipelines with Kotlin Flow: emit, collect, transform, combine, and error handling.
Best for: Streaming data processing pipelines
Channels — Producer-Consumer with Coroutines
Communicate between coroutines with Channels: produce, actor pattern, fan-out/fan-in, and select.
Best for: Producer-consumer patterns in coroutines