scalaintermediate

Thread-Safe Concurrent Collections

Use concurrent collections for thread-safe access: TrieMap, concurrent queues, and synchronized wrappers.

scala
import scala.collection.concurrent.TrieMap
import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentLinkedDeque,
  LinkedBlockingQueue, ArrayBlockingQueue, Executors, CountDownLatch}
import scala.jdk.CollectionConverters.*

// TrieMap: concurrent hash map
class Registry[V]:
  private val store = TrieMap.empty[String, V]

  def register(key: String, value: V): Unit = store.put(key, value)
  def lookup(key: String): Option[V] = store.get(key)
  def remove(key: String): Option[V] = store.remove(key)
  def getOrRegister(key: String, default: => V): V =
    store.getOrElseUpdate(key, default)
  def all: Map[String, V] = store.toMap
  def size: Int = store.size

// Rate limiter using concurrent queue
class RateLimiter(maxRequests: Int, windowMs: Long):
  private val timestamps = ConcurrentLinkedQueue[Long]()

  def tryAcquire(): Boolean =
    val now = System.currentTimeMillis()
    // Remove old entries
    while !timestamps.isEmpty && timestamps.peek() < now - windowMs do
      timestamps.poll()
    if timestamps.size() < maxRequests then
      timestamps.offer(now)
      true
    else false

// Producer-Consumer with blocking queue
class MessageBus[T](capacity: Int):
  private val queue = LinkedBlockingQueue[T](capacity)

  def publish(msg: T): Unit = queue.put(msg)  // blocks if full
  def consume(): T = queue.take()              // blocks if empty
  def tryConsume(): Option[T] = Option(queue.poll())
  def pending: Int = queue.size()

@main def run(): Unit =
  // TrieMap demo
  val registry = Registry[String]()
  val pool = Executors.newFixedThreadPool(4)
  val latch = CountDownLatch(4)

  for i <- 0 until 4 do
    pool.submit(new Runnable:
      def run(): Unit =
        for j <- 1 to 25 do
          registry.register(s"service-${i * 25 + j}", s"http://host-${i}:${8000 + j}")
        latch.countDown()
    )

  latch.await()
  println(s"Registry size: ${registry.size}")  // 100
  println(s"Lookup: ${registry.lookup("service-1")}")

  // GetOrRegister (atomic)
  val cached = registry.getOrRegister("service-1", {
    println("Won't compute"); "fallback"
  })
  println(s"Cached: $cached")

  // Rate limiter
  val limiter = RateLimiter(5, 1000)  // 5 requests per second
  var allowed = 0
  for _ <- 1 to 10 do
    if limiter.tryAcquire() then allowed += 1
  println(s"Allowed: $allowed / 10 requests")

  // Message bus
  val bus = MessageBus[String](10)
  val producerLatch = CountDownLatch(2)

  // Producer
  pool.submit(new Runnable:
    def run(): Unit =
      for i <- 1 to 5 do bus.publish(s"msg-$i")
      producerLatch.countDown()
  )

  // Consumer
  pool.submit(new Runnable:
    def run(): Unit =
      for _ <- 1 to 5 do
        val msg = bus.consume()
        println(s"  Consumed: $msg")
      producerLatch.countDown()
  )

  producerLatch.await()
  println(s"Pending: ${bus.pending}")

  pool.shutdown()

Use Cases

  • Thread-safe service registries
  • Rate limiting with concurrent queues
  • Producer-consumer patterns

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.