kotlinadvanced

Structured Concurrency Patterns

Apply structured concurrency: parallel decomposition, scoped tasks, cancellation, and resource cleanup.

kotlin
import kotlinx.coroutines.*

// Parallel decomposition
suspend fun fetchUserProfile(userId: String): Map<String, Any> = coroutineScope {
    // All three run in parallel within this scope
    val user = async { fetchUser(userId) }
    val posts = async { fetchPosts(userId) }
    val followers = async { fetchFollowerCount(userId) }

    mapOf(
        "user" to user.await(),
        "posts" to posts.await(),
        "followers" to followers.await()
    )
}

// Cancellation-aware work
suspend fun processLargeDataset(items: List<String>): List<String> = coroutineScope {
    items.map { item ->
        async {
            ensureActive() // check if still active
            processItem(item)
        }
    }.awaitAll()
}

// Scoped resource management
class DatabaseConnection(val name: String) {
    fun query(sql: String) = "Result for: $sql"
    fun close() = println("Closing $name")
}

suspend fun withDatabase(block: suspend (DatabaseConnection) -> Unit) {
    val db = DatabaseConnection("MainDB")
    try {
        block(db)
    } finally {
        db.close()
    }
}

// Timeout pattern
suspend fun fetchWithFallback(): String {
    return try {
        withTimeout(1000) {
            delay(2000) // simulate slow network
            "Network result"
        }
    } catch (e: TimeoutCancellationException) {
        "Cached fallback result"
    }
}

// Rate limiting with semaphore
suspend fun rateLimitedFetch(urls: List<String>): List<String> = coroutineScope {
    val semaphore = kotlinx.coroutines.sync.Semaphore(3) // max 3 concurrent
    urls.map { url ->
        async {
            semaphore.acquire()
            try {
                delay(100) // simulate fetch
                "Response from $url"
            } finally {
                semaphore.release()
            }
        }
    }.awaitAll()
}

// Simulated functions
suspend fun fetchUser(id: String): Map<String, String> {
    delay(100); return mapOf("id" to id, "name" to "Alice")
}
suspend fun fetchPosts(userId: String): List<String> {
    delay(150); return listOf("Post 1", "Post 2")
}
suspend fun fetchFollowerCount(userId: String): Int {
    delay(80); return 1234
}
suspend fun processItem(item: String): String {
    delay(50); return item.uppercase()
}

fun main() = runBlocking {
    // Parallel decomposition
    val profile = fetchUserProfile("user-1")
    println("Profile: $profile")

    // Cancellation
    val job = launch {
        val result = processLargeDataset(listOf("a", "b", "c", "d", "e"))
        println("Processed: $result")
    }
    job.join()

    // Scoped resource
    withDatabase { db ->
        println(db.query("SELECT * FROM users"))
    }

    // Timeout with fallback
    println("Fetch: ${fetchWithFallback()}")

    // Rate limited
    val urls = (1..10).map { "https://api.example.com/$it" }
    val responses = rateLimitedFetch(urls)
    println("Fetched ${responses.size} responses")
}

Use Cases

  • Parallel API calls with structured lifecycle
  • Timeout and fallback patterns
  • Rate-limited concurrent operations

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.