scalaintermediate
Thread-Safe Concurrent Collections
Use concurrent collections for thread-safe access: TrieMap, concurrent queues, and synchronized wrappers.
scalaPress ⌘/Ctrl + Shift + C to copy
import scala.collection.concurrent.TrieMap
import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentLinkedDeque,
LinkedBlockingQueue, ArrayBlockingQueue, Executors, CountDownLatch}
import scala.jdk.CollectionConverters.*
// TrieMap: concurrent hash map
class Registry[V]:
private val store = TrieMap.empty[String, V]
def register(key: String, value: V): Unit = store.put(key, value)
def lookup(key: String): Option[V] = store.get(key)
def remove(key: String): Option[V] = store.remove(key)
def getOrRegister(key: String, default: => V): V =
store.getOrElseUpdate(key, default)
def all: Map[String, V] = store.toMap
def size: Int = store.size
// Rate limiter using concurrent queue
class RateLimiter(maxRequests: Int, windowMs: Long):
private val timestamps = ConcurrentLinkedQueue[Long]()
def tryAcquire(): Boolean =
val now = System.currentTimeMillis()
// Remove old entries
while !timestamps.isEmpty && timestamps.peek() < now - windowMs do
timestamps.poll()
if timestamps.size() < maxRequests then
timestamps.offer(now)
true
else false
// Producer-Consumer with blocking queue
class MessageBus[T](capacity: Int):
private val queue = LinkedBlockingQueue[T](capacity)
def publish(msg: T): Unit = queue.put(msg) // blocks if full
def consume(): T = queue.take() // blocks if empty
def tryConsume(): Option[T] = Option(queue.poll())
def pending: Int = queue.size()
@main def run(): Unit =
// TrieMap demo
val registry = Registry[String]()
val pool = Executors.newFixedThreadPool(4)
val latch = CountDownLatch(4)
for i <- 0 until 4 do
pool.submit(new Runnable:
def run(): Unit =
for j <- 1 to 25 do
registry.register(s"service-${i * 25 + j}", s"http://host-${i}:${8000 + j}")
latch.countDown()
)
latch.await()
println(s"Registry size: ${registry.size}") // 100
println(s"Lookup: ${registry.lookup("service-1")}")
// GetOrRegister (atomic)
val cached = registry.getOrRegister("service-1", {
println("Won't compute"); "fallback"
})
println(s"Cached: $cached")
// Rate limiter
val limiter = RateLimiter(5, 1000) // 5 requests per second
var allowed = 0
for _ <- 1 to 10 do
if limiter.tryAcquire() then allowed += 1
println(s"Allowed: $allowed / 10 requests")
// Message bus
val bus = MessageBus[String](10)
val producerLatch = CountDownLatch(2)
// Producer
pool.submit(new Runnable:
def run(): Unit =
for i <- 1 to 5 do bus.publish(s"msg-$i")
producerLatch.countDown()
)
// Consumer
pool.submit(new Runnable:
def run(): Unit =
for _ <- 1 to 5 do
val msg = bus.consume()
println(s" Consumed: $msg")
producerLatch.countDown()
)
producerLatch.await()
println(s"Pending: ${bus.pending}")
pool.shutdown()Use Cases
- Thread-safe service registries
- Rate limiting with concurrent queues
- Producer-consumer patterns
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalaadvanced
Concurrent Programming with Atomic Refs
Use atomic references and concurrent data structures for thread-safe state management.
Best for: Lock-free data structures
#scala#concurrent
scalabeginner
Collections Map Filter Fold Operations
Master Scala collections: map, flatMap, filter, fold, groupBy, partition, and zip operations.
Best for: Data transformation and aggregation
#scala#collections
scalabeginner
Map and HashMap Operations
Work with Scala Maps: create, update, merge, transform, and use default values.
Best for: Configuration management
#scala#map
scalabeginner
Set Operations and Algorithms
Perform set operations: union, intersection, difference, subsets, and practical set algorithms.
Best for: Deduplication and uniqueness
#scala#set