scalaadvanced

Effect Composition Patterns

Compose effects with traverse, sequence, parTraverse, and error accumulation patterns.

scala
import cats.syntax.all.*
import cats.effect.IO
import cats.{Applicative, Traverse}
import scala.concurrent.duration.*

def fetchUser(id: Int): IO[String] =
  IO.sleep(50.millis) *> IO.pure(s"User-$id")

def fetchOrders(userId: String): IO[List[String]] =
  IO.sleep(50.millis) *> IO.pure(List(s"$userId-order1", s"$userId-order2"))

def validateEmail(email: String): IO[Either[String, String]] =
  IO.pure(
    if email.contains("@") then Right(email)
    else Left(s"Invalid: $email")
  )

object EffectComposition extends cats.effect.IOApp.Simple:

  // traverse: List[A] => (A => F[B]) => F[List[B]]
  val userIds = List(1, 2, 3, 4, 5)
  val fetchAll: IO[List[String]] = userIds.traverse(fetchUser)

  // parTraverse: same but parallel
  val fetchAllPar: IO[List[String]] = userIds.parTraverse(fetchUser)

  // sequence: List[F[A]] => F[List[A]]
  val effects: List[IO[String]] = userIds.map(fetchUser)
  val sequenced: IO[List[String]] = effects.sequence

  // parSequence: parallel
  val parSequenced: IO[List[String]] = effects.parSequence

  // Applicative composition: independent effects
  val combined: IO[(String, String, String)] =
    (fetchUser(1), fetchUser(2), fetchUser(3)).parTupled

  val mapped: IO[String] =
    (fetchUser(1), fetchUser(2)).parMapN { (u1, u2) =>
      s"$u1 and $u2"
    }

  // flatTraverse: traverse + flatten
  val allOrders: IO[List[String]] =
    userIds.flatTraverse { id =>
      fetchUser(id).flatMap(fetchOrders)
    }

  // Conditional effects
  val conditional: IO[Option[String]] =
    val shouldFetch = true
    IO.pure(shouldFetch).ifM(
      fetchUser(1).map(Some(_)),
      IO.pure(None)
    )

  // whenA / unlessA
  val logIfDebug: IO[Unit] =
    val debug = true
    IO.println("Debug info").whenA(debug)

  // Error handling on list of effects
  val riskyOps: List[IO[Int]] = List(
    IO.pure(1),
    IO.raiseError(RuntimeException("Boom!")),
    IO.pure(3)
  )

  // Collect successes, ignore failures
  val resilient: IO[List[Int]] =
    riskyOps.traverse(_.attempt).map(_.collect { case Right(v) => v })

  // redeemWith: handle both success and failure
  val redeemed: IO[String] = fetchUser(1).redeemWith(
    err => IO.pure(s"Failed: ${err.getMessage}"),
    user => IO.pure(s"Got: $user")
  )

  def run: IO[Unit] = for
    users <- fetchAllPar
    _ <- IO.println(s"Users: $users")

    combo <- combined
    _ <- IO.println(s"Combined: $combo")

    orders <- allOrders
    _ <- IO.println(s"All orders: $orders")

    safe <- resilient
    _ <- IO.println(s"Resilient: $safe")

    r <- redeemed
    _ <- IO.println(s"Redeemed: $r")

    _ <- logIfDebug
  yield ()

Use Cases

  • Parallel API calls
  • Batch processing with effects
  • Resilient error handling patterns

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.