scalaintermediate

Parallel Processing with Collections

Process data in parallel using parallel collections, Future.traverse, and batched execution.

scala
import scala.concurrent.{Future, Await}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.*
import scala.collection.parallel.CollectionConverters.*

def heavyComputation(n: Int): Int =
  Thread.sleep(50)
  n * n

def fetchData(id: Int): Future[String] = Future {
  Thread.sleep(50)
  s"Data-$id"
}

@main def run(): Unit =
  // Parallel collections
  val items = (1 to 20).toList

  val start1 = System.currentTimeMillis()
  val sequential = items.map(heavyComputation)
  val seqTime = System.currentTimeMillis() - start1
  println(s"Sequential: ${seqTime}ms")

  val start2 = System.currentTimeMillis()
  val parallel = items.par.map(heavyComputation).toList
  val parTime = System.currentTimeMillis() - start2
  println(s"Parallel: ${parTime}ms")
  println(s"Speedup: ${seqTime.toDouble / parTime}x")

  // Future.traverse: List[A] => Future[List[B]]
  val ids = (1 to 10).toList
  val allData = Future.traverse(ids)(fetchData)
  val results = Await.result(allData, 10.seconds)
  println(s"Fetched: ${results.take(3)}...")

  // Batched processing
  def processBatch[A, B](items: List[A], batchSize: Int)(f: A => Future[B]): Future[List[B]] =
    items.grouped(batchSize).foldLeft(Future.successful(List.empty[B])) {
      (accFuture, batch) =>
        for
          acc     <- accFuture
          results <- Future.traverse(batch)(f)
        yield acc ++ results
    }

  val batched = processBatch((1 to 20).toList, 5) { id =>
    Future {
      Thread.sleep(50)
      s"Processed-$id"
    }
  }
  val batchResults = Await.result(batched, 30.seconds)
  println(s"Batched: ${batchResults.size} items")

  // Parallel filter + map
  val data = (1 to 100).toList
  val processed = data.par
    .filter(_ % 2 == 0)
    .map(n => n * n)
    .filter(_ < 1000)
    .toList
    .sorted
  println(s"Parallel filtered: ${processed.take(10)}...")

  // Aggregate with parallel
  val words = "the quick brown fox jumps over the lazy brown dog".split(" ").toList
  val freq = words.par
    .groupBy(identity)
    .map((word, occurrences) => word -> occurrences.size)
    .toMap
  println(s"Frequencies: $freq")

Use Cases

  • Speeding up CPU-bound operations
  • Batched API calls with rate limiting
  • Parallel data aggregation

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.