scalaadvanced

Concurrent Programming with Atomic Refs

Use atomic references and concurrent data structures for thread-safe state management.

scala
import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicInteger}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors}
import scala.jdk.CollectionConverters.*

class AtomicCounter:
  private val count = AtomicLong(0)

  def increment(): Long = count.incrementAndGet()
  def decrement(): Long = count.decrementAndGet()
  def get: Long = count.get()
  def addAndGet(delta: Long): Long = count.addAndGet(delta)

// Lock-free stack
class ConcurrentStack[A]:
  private val ref = AtomicReference[List[A]](Nil)

  def push(item: A): Unit =
    var success = false
    while !success do
      val current = ref.get()
      success = ref.compareAndSet(current, item :: current)

  def pop(): Option[A] =
    var result: Option[A] = None
    var success = false
    while !success do
      val current = ref.get()
      current match
        case head :: tail =>
          if ref.compareAndSet(current, tail) then
            result = Some(head)
            success = true
        case Nil =>
          result = None
          success = true
    result

  def toList: List[A] = ref.get()

// Thread-safe cache
class Cache[K, V]:
  private val store = ConcurrentHashMap[K, V]()

  def get(key: K): Option[V] = Option(store.get(key))
  def put(key: K, value: V): Unit = store.put(key, value)
  def getOrCompute(key: K)(compute: => V): V =
    store.computeIfAbsent(key, _ => compute)
  def size: Int = store.size()
  def entries: Map[K, V] = store.asScala.toMap

@main def run(): Unit =
  val counter = AtomicCounter()
  val latch = CountDownLatch(10)
  val pool = Executors.newFixedThreadPool(4)

  // Concurrent increments
  for _ <- 1 to 10 do
    pool.submit(new Runnable:
      def run(): Unit =
        for _ <- 1 to 1000 do counter.increment()
        latch.countDown()
    )

  latch.await()
  println(s"Counter: ${counter.get}")  // Always 10000

  // Concurrent stack
  val stack = ConcurrentStack[Int]()
  val latch2 = CountDownLatch(4)

  for i <- 0 until 4 do
    pool.submit(new Runnable:
      def run(): Unit =
        for j <- 1 to 25 do stack.push(i * 25 + j)
        latch2.countDown()
    )

  latch2.await()
  println(s"Stack size: ${stack.toList.size}")  // 100

  // Cache
  val cache = Cache[String, Int]()
  val computed = cache.getOrCompute("fib-10") {
    println("Computing...")
    55
  }
  val cached = cache.getOrCompute("fib-10") {
    println("Won't run")
    0
  }
  println(s"Computed: $computed, Cached: $cached")

  pool.shutdown()

Use Cases

  • Lock-free data structures
  • Thread-safe counters and caches
  • Concurrent state management

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.