scalaadvanced
Streaming with fs2
Build composable streaming pipelines with fs2: chunks, transformations, concurrency, and resource safety.
scalaPress ⌘/Ctrl + Shift + C to copy
import fs2.{Stream, Pipe, Chunk}
import cats.effect.{IO, IOApp}
import scala.concurrent.duration.*
object StreamingApp extends IOApp.Simple:
// Basic streams
val numbers: Stream[IO, Int] = Stream.range(1, 11)
val hello: Stream[IO, String] = Stream.emit("Hello, fs2!")
val repeated: Stream[IO, String] = Stream.constant("tick").take(5)
// Stream from effects
val effectful: Stream[IO, String] = Stream.eval(IO.pure("computed"))
val timed: Stream[IO, Long] = Stream.eval(IO.realTime.map(_.toMillis))
// Transformations
val pipeline: Stream[IO, String] = numbers
.filter(_ % 2 == 0)
.map(n => n * n)
.map(n => s"val=$n")
// Pipes (reusable transformations)
def doubler[F[_]]: Pipe[F, Int, Int] = _.map(_ * 2)
def stringify[F[_]]: Pipe[F, Int, String] = _.map(_.toString)
def take10[F[_], A]: Pipe[F, A, A] = _.take(10)
val piped = numbers.through(doubler).through(stringify)
// Chunked processing
val chunked: Stream[IO, Chunk[Int]] = Stream
.range(1, 101)
.chunkN(10)
// Merge streams (concurrent)
val stream1 = Stream.awakeEvery[IO](100.millis).map(_ => "A").take(5)
val stream2 = Stream.awakeEvery[IO](150.millis).map(_ => "B").take(5)
val merged = stream1.merge(stream2)
// Scan (running accumulator)
val runningSum = numbers.scan(0)(_ + _)
// GroupWithin
val batched = Stream.range(1, 21)
.covary[IO]
.metered(50.millis)
.groupWithin(5, 200.millis)
// Handle errors
val withErrors: Stream[IO, Int] = Stream(1, 2, 3) ++ Stream.raiseError[IO](
RuntimeException("boom")
) ++ Stream(4, 5)
val recovered = withErrors.handleErrorWith { e =>
Stream.eval(IO.println(s"Error: ${e.getMessage}")) >> Stream(-1)
}
// Resource-safe stream
def lines(filename: String): Stream[IO, String] =
Stream.bracket(
IO.println(s"Opening $filename") >> IO.pure(filename)
)(
name => IO.println(s"Closing $name")
).flatMap { _ =>
Stream("line1", "line2", "line3")
}
def run: IO[Unit] = for
_ <- IO.println("=== Pipeline ===")
_ <- pipeline.evalMap(s => IO.println(s)).compile.drain
_ <- IO.println("\n=== Piped ===")
_ <- piped.evalMap(s => IO.println(s)).compile.drain
_ <- IO.println("\n=== Running Sum ===")
sums <- runningSum.compile.toList
_ <- IO.println(s"Sums: $sums")
_ <- IO.println("\n=== Chunks ===")
_ <- chunked.evalMap(c => IO.println(s"Chunk: ${c.toList}")).compile.drain
_ <- IO.println("\n=== Recovered ===")
r <- recovered.compile.toList
_ <- IO.println(s"Recovered: $r")
_ <- IO.println("\n=== Resource ===")
_ <- lines("test.txt").evalMap(l => IO.println(s" $l")).compile.drain
yield ()Use Cases
- Streaming data processing
- Real-time event pipelines
- Resource-safe file processing
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalabeginner
Collections Map Filter Fold Operations
Master Scala collections: map, flatMap, filter, fold, groupBy, partition, and zip operations.
Best for: Data transformation and aggregation
#scala#collections
scalaintermediate
For-Comprehensions and Monadic Composition
Use for-comprehensions with Option, Either, Future, and custom monads for elegant composition.
Best for: Chaining optional computations
#scala#for-comprehension
scalaadvanced
Type Class Pattern Implementation
Implement the type class pattern in Scala 3: define, provide instances, and use with extension methods.
Best for: Ad-hoc polymorphism without inheritance
#scala#type-class
scalaadvanced
Cats Effect IO Monad Basics
Use Cats Effect IO for pure functional effects: sequencing, error handling, resource management.
Best for: Pure functional effect management
#scala#cats-effect