scalaadvanced
Akka Streams Data Processing
Build reactive data pipelines with Akka Streams: sources, flows, sinks, and graph DSL.
scalaPress ⌘/Ctrl + Shift + C to copy
import akka.actor.ActorSystem
import akka.stream.*
import akka.stream.scaladsl.*
import scala.concurrent.{Future, Await}
import scala.concurrent.duration.*
object AkkaStreamApp:
given system: ActorSystem = ActorSystem("streams")
import system.dispatcher
def main(args: Array[String]): Unit =
// Basic: Source -> Flow -> Sink
val source = Source(1 to 100)
val flow = Flow[Int].filter(_ % 2 == 0).map(_ * 2)
val sink = Sink.foreach[Int](n => print(s"$n "))
val result: Future[Done] = source.via(flow).runWith(sink)
Await.result(result, 5.seconds)
println()
// Materialized value (fold)
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val sumFuture = Source(1 to 100).runWith(sumSink)
println(s"Sum: ${Await.result(sumFuture, 5.seconds)}")
// Async boundaries for parallelism
val pipeline = Source(1 to 1000)
.map(n => { /* CPU work */ n * n })
.async // run next stage on different thread
.filter(_ % 3 == 0)
.async
.map(_.toString)
.take(10)
val asyncResult = pipeline.runWith(Sink.seq)
println(s"Async: ${Await.result(asyncResult, 5.seconds)}")
// Throttle (rate limiting)
val throttled = Source(1 to 10)
.throttle(2, 1.second) // 2 elements per second
.map(n => s"item-$n")
.runWith(Sink.foreach(println))
Await.result(throttled, 10.seconds)
// Buffer with overflow strategy
val buffered = Source(1 to 100)
.buffer(10, OverflowStrategy.dropHead)
.map(n => { Thread.sleep(10); n })
.take(20)
.runWith(Sink.seq)
println(s"Buffered: ${Await.result(buffered, 10.seconds)}")
// Grouped processing
val batched = Source(1 to 100)
.grouped(10)
.map(group => group.sum)
.runWith(Sink.seq)
println(s"Batch sums: ${Await.result(batched, 5.seconds)}")
// Merge multiple sources
val source1 = Source(List("a", "b", "c"))
val source2 = Source(List("1", "2", "3"))
val merged = source1.merge(source2)
.runWith(Sink.seq)
println(s"Merged: ${Await.result(merged, 5.seconds)}")
// Broadcast (fan-out)
val graph = RunnableGraph.fromGraph(GraphDSL.create(Sink.foreach[String](s => print(s"L:$s ")), Sink.foreach[String](s => print(s"R:$s ")))((_, _)) {
implicit builder => (leftSink, rightSink) =>
import GraphDSL.Implicits.*
val broadcast = builder.add(Broadcast[String](2))
Source(List("x", "y", "z")) ~> broadcast
broadcast.out(0) ~> leftSink
broadcast.out(1) ~> rightSink
ClosedShape
})
val (f1, f2) = graph.run()
Await.result(f1, 5.seconds)
Await.result(f2, 5.seconds)
println()
system.terminate()
Await.result(system.whenTerminated, 5.seconds)Use Cases
- Reactive data pipeline processing
- Backpressure-aware streaming
- Fan-out/fan-in data processing
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalaadvanced
Akka Actor System Basics
Build concurrent applications with Akka actors: message passing, behavior switching, and supervision.
Best for: Concurrent message-passing systems
#scala#akka
scalaintermediate
Lazy Evaluation and LazyList Streams
Use lazy evaluation with LazyList for infinite sequences, memoization, and deferred computation.
Best for: Infinite sequence generation
#scala#lazy
javaadvanced
Java Flow API — Reactive Streams
Implement reactive publishers and subscribers with Java Flow API: backpressure, buffering, and transformation.
Best for: Reactive data processing with backpressure
#java#reactive
scalabeginner
Scala Hello World Application
Create a basic Scala application with main method, string interpolation, and val/var basics.
Best for: Getting started with Scala
#scala#basics