kotlinadvanced

Coroutine Mutex and Shared Mutable State

Safely manage shared state across coroutines: Mutex, atomic operations, and actor-based state.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger

fun main() = runBlocking {
    // Problem: shared mutable state without synchronization
    println("--- Unsafe counter ---")
    var unsafeCounter = 0
    val jobs1 = (1..100).map {
        launch(Dispatchers.Default) {
            repeat(1000) { unsafeCounter++ }
        }
    }
    jobs1.forEach { it.join() }
    println("Unsafe: $unsafeCounter (expected 100000)") // likely wrong

    // Solution 1: Mutex
    println("\n--- Mutex counter ---")
    var mutexCounter = 0
    val mutex = Mutex()
    val jobs2 = (1..100).map {
        launch(Dispatchers.Default) {
            repeat(1000) {
                mutex.withLock { mutexCounter++ }
            }
        }
    }
    jobs2.forEach { it.join() }
    println("Mutex: $mutexCounter") // exactly 100000

    // Solution 2: AtomicInteger
    println("\n--- Atomic counter ---")
    val atomicCounter = AtomicInteger(0)
    val jobs3 = (1..100).map {
        launch(Dispatchers.Default) {
            repeat(1000) { atomicCounter.incrementAndGet() }
        }
    }
    jobs3.forEach { it.join() }
    println("Atomic: ${atomicCounter.get()}") // exactly 100000

    // Solution 3: Confinement to single thread
    println("\n--- Single thread ---")
    val singleThread = newSingleThreadContext("counter")
    var confinedCounter = 0
    val jobs4 = (1..100).map {
        launch {
            repeat(1000) {
                withContext(singleThread) { confinedCounter++ }
            }
        }
    }
    jobs4.forEach { it.join() }
    println("Confined: $confinedCounter") // exactly 100000
    singleThread.close()

    // Mutex-protected data structure
    println("\n--- Thread-safe map ---")
    class SafeMap<K, V> {
        private val map = mutableMapOf<K, V>()
        private val mutex = Mutex()

        suspend fun put(key: K, value: V) = mutex.withLock { map[key] = value }
        suspend fun get(key: K): V? = mutex.withLock { map[key] }
        suspend fun getOrPut(key: K, defaultValue: () -> V): V = mutex.withLock {
            map.getOrPut(key, defaultValue)
        }
        suspend fun size(): Int = mutex.withLock { map.size }
        suspend fun snapshot(): Map<K, V> = mutex.withLock { map.toMap() }
    }

    val safeMap = SafeMap<String, Int>()
    val mapJobs = (1..100).map { i ->
        launch(Dispatchers.Default) {
            safeMap.put("key-$i", i)
        }
    }
    mapJobs.forEach { it.join() }
    println("Map size: ${safeMap.size()}") // 100
}

Use Cases

  • Thread-safe counters and accumulators
  • Synchronized access to shared resources
  • Concurrent data structure management

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.