scalaadvanced
ZIO Fibers and Concurrency
Use ZIO fibers for lightweight concurrency: fork, join, race, parallel operations, and supervision.
scalaPress ⌘/Ctrl + Shift + C to copy
import zio.*
import zio.Console.*
object ZIOFibers extends ZIOAppDefault:
// Basic fiber: fork and join
val forked: ZIO[Any, Nothing, String] = for
fiber <- ZIO.succeed {
Thread.sleep(100)
"computed"
}.fork
_ <- printLine("Doing other work...").orDie
result <- fiber.join
yield result
// Race: first to complete wins
def fetchFromDB: ZIO[Any, Nothing, String] =
ZIO.succeed("db-result").delay(200.millis)
def fetchFromCache: ZIO[Any, Nothing, String] =
ZIO.succeed("cache-result").delay(50.millis)
val raced = fetchFromCache.race(fetchFromDB)
// Parallel operations
def fetchUser(id: Int): ZIO[Any, Nothing, String] =
ZIO.succeed(s"User-$id").delay(100.millis)
val parallel = ZIO.collectAllPar(
(1 to 5).map(fetchUser)
)
// zipPar: run two effects in parallel
val zipped = fetchUser(1).zipPar(fetchUser(2))
// foreachPar with parallelism limit
val limited = ZIO.foreachPar(1 to 20) { i =>
fetchUser(i)
}.withParallelism(4) // max 4 concurrent
// Fiber supervision
val supervised = ZIO.scoped {
for
fiber <- ZIO.never.fork // long-running fiber
_ <- printLine("Supervised fiber running").orDie
_ <- ZIO.sleep(100.millis)
_ <- fiber.interrupt // explicitly interrupt
_ <- printLine("Fiber interrupted").orDie
yield ()
}
// Timeout
val withTimeout = ZIO.succeed {
Thread.sleep(5000)
"slow result"
}.timeout(1.second)
// Retry with schedule
var attempts = 0
val flaky = ZIO.attempt {
attempts += 1
if attempts < 3 then throw RuntimeException(s"Attempt $attempts failed")
else s"Success on attempt $attempts"
}
val retried = flaky.retry(
Schedule.recurs(5) && Schedule.exponential(100.millis)
)
// Concurrent queue
val queueDemo = for
queue <- Queue.bounded[String](10)
// Producer
producer <- ZIO.foreach(1 to 5) { i =>
queue.offer(s"item-$i") *> printLine(s" Produced: item-$i").orDie
}.fork
// Consumer
consumer <- ZIO.foreach(1 to 5) { _ =>
queue.take.flatMap(item => printLine(s" Consumed: $item").orDie)
}.fork
_ <- producer.join
_ <- consumer.join
yield ()
// Semaphore for limiting concurrency
val semaphoreDemo = for
sem <- Semaphore.make(2) // max 2 concurrent
_ <- ZIO.foreachPar(1 to 5) { i =>
sem.withPermit {
printLine(s" Task $i running").orDie *>
ZIO.sleep(100.millis) *>
printLine(s" Task $i done").orDie
}
}
yield ()
def run = for
_ <- printLine("=== Fork/Join ===")
r <- forked
_ <- printLine(s"Result: $r")
_ <- printLine("\n=== Race ===")
r <- raced
_ <- printLine(s"Winner: $r")
_ <- printLine("\n=== Parallel ===")
r <- parallel
_ <- printLine(s"All: $r")
_ <- printLine("\n=== Queue ===")
_ <- queueDemo
_ <- printLine("\n=== Semaphore ===")
_ <- semaphoreDemo
yield ()Use Cases
- Lightweight concurrent task execution
- Parallel data fetching
- Concurrency control with semaphores
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalaintermediate
Futures and Async Programming
Use Scala Futures for async operations: map, flatMap, recover, sequence, and race patterns.
Best for: Asynchronous API calls
#scala#futures
scalaintermediate
Parallel Processing with Collections
Process data in parallel using parallel collections, Future.traverse, and batched execution.
Best for: Speeding up CPU-bound operations
#scala#parallel
scalaadvanced
Akka Actor System Basics
Build concurrent applications with Akka actors: message passing, behavior switching, and supervision.
Best for: Concurrent message-passing systems
#scala#akka
scalaadvanced
ZIO Effect System Basics
Build programs with ZIO: effects, error handling, layers, and concurrent operations.
Best for: Typed error handling with ZIO
#scala#zio