scalaadvanced

ZIO Fibers and Concurrency

Use ZIO fibers for lightweight concurrency: fork, join, race, parallel operations, and supervision.

scala
import zio.*
import zio.Console.*

object ZIOFibers extends ZIOAppDefault:

  // Basic fiber: fork and join
  val forked: ZIO[Any, Nothing, String] = for
    fiber <- ZIO.succeed {
      Thread.sleep(100)
      "computed"
    }.fork
    _      <- printLine("Doing other work...").orDie
    result <- fiber.join
  yield result

  // Race: first to complete wins
  def fetchFromDB: ZIO[Any, Nothing, String] =
    ZIO.succeed("db-result").delay(200.millis)

  def fetchFromCache: ZIO[Any, Nothing, String] =
    ZIO.succeed("cache-result").delay(50.millis)

  val raced = fetchFromCache.race(fetchFromDB)

  // Parallel operations
  def fetchUser(id: Int): ZIO[Any, Nothing, String] =
    ZIO.succeed(s"User-$id").delay(100.millis)

  val parallel = ZIO.collectAllPar(
    (1 to 5).map(fetchUser)
  )

  // zipPar: run two effects in parallel
  val zipped = fetchUser(1).zipPar(fetchUser(2))

  // foreachPar with parallelism limit
  val limited = ZIO.foreachPar(1 to 20) { i =>
    fetchUser(i)
  }.withParallelism(4)  // max 4 concurrent

  // Fiber supervision
  val supervised = ZIO.scoped {
    for
      fiber <- ZIO.never.fork  // long-running fiber
      _     <- printLine("Supervised fiber running").orDie
      _     <- ZIO.sleep(100.millis)
      _     <- fiber.interrupt  // explicitly interrupt
      _     <- printLine("Fiber interrupted").orDie
    yield ()
  }

  // Timeout
  val withTimeout = ZIO.succeed {
    Thread.sleep(5000)
    "slow result"
  }.timeout(1.second)

  // Retry with schedule
  var attempts = 0
  val flaky = ZIO.attempt {
    attempts += 1
    if attempts < 3 then throw RuntimeException(s"Attempt $attempts failed")
    else s"Success on attempt $attempts"
  }

  val retried = flaky.retry(
    Schedule.recurs(5) && Schedule.exponential(100.millis)
  )

  // Concurrent queue
  val queueDemo = for
    queue <- Queue.bounded[String](10)
    // Producer
    producer <- ZIO.foreach(1 to 5) { i =>
      queue.offer(s"item-$i") *> printLine(s"  Produced: item-$i").orDie
    }.fork
    // Consumer
    consumer <- ZIO.foreach(1 to 5) { _ =>
      queue.take.flatMap(item => printLine(s"  Consumed: $item").orDie)
    }.fork
    _ <- producer.join
    _ <- consumer.join
  yield ()

  // Semaphore for limiting concurrency
  val semaphoreDemo = for
    sem <- Semaphore.make(2)  // max 2 concurrent
    _   <- ZIO.foreachPar(1 to 5) { i =>
      sem.withPermit {
        printLine(s"  Task $i running").orDie *>
          ZIO.sleep(100.millis) *>
          printLine(s"  Task $i done").orDie
      }
    }
  yield ()

  def run = for
    _ <- printLine("=== Fork/Join ===")
    r <- forked
    _ <- printLine(s"Result: $r")

    _ <- printLine("\n=== Race ===")
    r <- raced
    _ <- printLine(s"Winner: $r")

    _ <- printLine("\n=== Parallel ===")
    r <- parallel
    _ <- printLine(s"All: $r")

    _ <- printLine("\n=== Queue ===")
    _ <- queueDemo

    _ <- printLine("\n=== Semaphore ===")
    _ <- semaphoreDemo
  yield ()

Use Cases

  • Lightweight concurrent task execution
  • Parallel data fetching
  • Concurrency control with semaphores

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.