scalaadvanced

Akka Streams Data Processing

Build reactive data pipelines with Akka Streams: sources, flows, sinks, and graph DSL.

scala
import akka.actor.ActorSystem
import akka.stream.*
import akka.stream.scaladsl.*
import scala.concurrent.{Future, Await}
import scala.concurrent.duration.*

object AkkaStreamApp:
  given system: ActorSystem = ActorSystem("streams")
  import system.dispatcher

  def main(args: Array[String]): Unit =
    // Basic: Source -> Flow -> Sink
    val source = Source(1 to 100)
    val flow = Flow[Int].filter(_ % 2 == 0).map(_ * 2)
    val sink = Sink.foreach[Int](n => print(s"$n "))

    val result: Future[Done] = source.via(flow).runWith(sink)
    Await.result(result, 5.seconds)
    println()

    // Materialized value (fold)
    val sumSink = Sink.fold[Int, Int](0)(_ + _)
    val sumFuture = Source(1 to 100).runWith(sumSink)
    println(s"Sum: ${Await.result(sumFuture, 5.seconds)}")

    // Async boundaries for parallelism
    val pipeline = Source(1 to 1000)
      .map(n => { /* CPU work */ n * n })
      .async  // run next stage on different thread
      .filter(_ % 3 == 0)
      .async
      .map(_.toString)
      .take(10)

    val asyncResult = pipeline.runWith(Sink.seq)
    println(s"Async: ${Await.result(asyncResult, 5.seconds)}")

    // Throttle (rate limiting)
    val throttled = Source(1 to 10)
      .throttle(2, 1.second)  // 2 elements per second
      .map(n => s"item-$n")
      .runWith(Sink.foreach(println))
    Await.result(throttled, 10.seconds)

    // Buffer with overflow strategy
    val buffered = Source(1 to 100)
      .buffer(10, OverflowStrategy.dropHead)
      .map(n => { Thread.sleep(10); n })
      .take(20)
      .runWith(Sink.seq)
    println(s"Buffered: ${Await.result(buffered, 10.seconds)}")

    // Grouped processing
    val batched = Source(1 to 100)
      .grouped(10)
      .map(group => group.sum)
      .runWith(Sink.seq)
    println(s"Batch sums: ${Await.result(batched, 5.seconds)}")

    // Merge multiple sources
    val source1 = Source(List("a", "b", "c"))
    val source2 = Source(List("1", "2", "3"))
    val merged = source1.merge(source2)
      .runWith(Sink.seq)
    println(s"Merged: ${Await.result(merged, 5.seconds)}")

    // Broadcast (fan-out)
    val graph = RunnableGraph.fromGraph(GraphDSL.create(Sink.foreach[String](s => print(s"L:$s ")), Sink.foreach[String](s => print(s"R:$s ")))((_, _)) {
      implicit builder => (leftSink, rightSink) =>
        import GraphDSL.Implicits.*
        val broadcast = builder.add(Broadcast[String](2))
        Source(List("x", "y", "z")) ~> broadcast
        broadcast.out(0) ~> leftSink
        broadcast.out(1) ~> rightSink
        ClosedShape
    })

    val (f1, f2) = graph.run()
    Await.result(f1, 5.seconds)
    Await.result(f2, 5.seconds)
    println()

    system.terminate()
    Await.result(system.whenTerminated, 5.seconds)

Use Cases

  • Reactive data pipeline processing
  • Backpressure-aware streaming
  • Fan-out/fan-in data processing

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.