scalaadvanced

Streaming with fs2

Build composable streaming pipelines with fs2: chunks, transformations, concurrency, and resource safety.

scala
import fs2.{Stream, Pipe, Chunk}
import cats.effect.{IO, IOApp}
import scala.concurrent.duration.*

object StreamingApp extends IOApp.Simple:

  // Basic streams
  val numbers: Stream[IO, Int] = Stream.range(1, 11)
  val hello: Stream[IO, String] = Stream.emit("Hello, fs2!")
  val repeated: Stream[IO, String] = Stream.constant("tick").take(5)

  // Stream from effects
  val effectful: Stream[IO, String] = Stream.eval(IO.pure("computed"))
  val timed: Stream[IO, Long] = Stream.eval(IO.realTime.map(_.toMillis))

  // Transformations
  val pipeline: Stream[IO, String] = numbers
    .filter(_ % 2 == 0)
    .map(n => n * n)
    .map(n => s"val=$n")

  // Pipes (reusable transformations)
  def doubler[F[_]]: Pipe[F, Int, Int] = _.map(_ * 2)
  def stringify[F[_]]: Pipe[F, Int, String] = _.map(_.toString)
  def take10[F[_], A]: Pipe[F, A, A] = _.take(10)

  val piped = numbers.through(doubler).through(stringify)

  // Chunked processing
  val chunked: Stream[IO, Chunk[Int]] = Stream
    .range(1, 101)
    .chunkN(10)

  // Merge streams (concurrent)
  val stream1 = Stream.awakeEvery[IO](100.millis).map(_ => "A").take(5)
  val stream2 = Stream.awakeEvery[IO](150.millis).map(_ => "B").take(5)
  val merged = stream1.merge(stream2)

  // Scan (running accumulator)
  val runningSum = numbers.scan(0)(_ + _)

  // GroupWithin
  val batched = Stream.range(1, 21)
    .covary[IO]
    .metered(50.millis)
    .groupWithin(5, 200.millis)

  // Handle errors
  val withErrors: Stream[IO, Int] = Stream(1, 2, 3) ++ Stream.raiseError[IO](
    RuntimeException("boom")
  ) ++ Stream(4, 5)

  val recovered = withErrors.handleErrorWith { e =>
    Stream.eval(IO.println(s"Error: ${e.getMessage}")) >> Stream(-1)
  }

  // Resource-safe stream
  def lines(filename: String): Stream[IO, String] =
    Stream.bracket(
      IO.println(s"Opening $filename") >> IO.pure(filename)
    )(
      name => IO.println(s"Closing $name")
    ).flatMap { _ =>
      Stream("line1", "line2", "line3")
    }

  def run: IO[Unit] = for
    _ <- IO.println("=== Pipeline ===")
    _ <- pipeline.evalMap(s => IO.println(s)).compile.drain

    _ <- IO.println("\n=== Piped ===")
    _ <- piped.evalMap(s => IO.println(s)).compile.drain

    _ <- IO.println("\n=== Running Sum ===")
    sums <- runningSum.compile.toList
    _ <- IO.println(s"Sums: $sums")

    _ <- IO.println("\n=== Chunks ===")
    _ <- chunked.evalMap(c => IO.println(s"Chunk: ${c.toList}")).compile.drain

    _ <- IO.println("\n=== Recovered ===")
    r <- recovered.compile.toList
    _ <- IO.println(s"Recovered: $r")

    _ <- IO.println("\n=== Resource ===")
    _ <- lines("test.txt").evalMap(l => IO.println(s"  $l")).compile.drain
  yield ()

Use Cases

  • Streaming data processing
  • Real-time event pipelines
  • Resource-safe file processing

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.