scalaintermediate
Parallel Processing with Collections
Process data in parallel using parallel collections, Future.traverse, and batched execution.
scalaPress ⌘/Ctrl + Shift + C to copy
import scala.concurrent.{Future, Await}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.*
import scala.collection.parallel.CollectionConverters.*
def heavyComputation(n: Int): Int =
Thread.sleep(50)
n * n
def fetchData(id: Int): Future[String] = Future {
Thread.sleep(50)
s"Data-$id"
}
@main def run(): Unit =
// Parallel collections
val items = (1 to 20).toList
val start1 = System.currentTimeMillis()
val sequential = items.map(heavyComputation)
val seqTime = System.currentTimeMillis() - start1
println(s"Sequential: ${seqTime}ms")
val start2 = System.currentTimeMillis()
val parallel = items.par.map(heavyComputation).toList
val parTime = System.currentTimeMillis() - start2
println(s"Parallel: ${parTime}ms")
println(s"Speedup: ${seqTime.toDouble / parTime}x")
// Future.traverse: List[A] => Future[List[B]]
val ids = (1 to 10).toList
val allData = Future.traverse(ids)(fetchData)
val results = Await.result(allData, 10.seconds)
println(s"Fetched: ${results.take(3)}...")
// Batched processing
def processBatch[A, B](items: List[A], batchSize: Int)(f: A => Future[B]): Future[List[B]] =
items.grouped(batchSize).foldLeft(Future.successful(List.empty[B])) {
(accFuture, batch) =>
for
acc <- accFuture
results <- Future.traverse(batch)(f)
yield acc ++ results
}
val batched = processBatch((1 to 20).toList, 5) { id =>
Future {
Thread.sleep(50)
s"Processed-$id"
}
}
val batchResults = Await.result(batched, 30.seconds)
println(s"Batched: ${batchResults.size} items")
// Parallel filter + map
val data = (1 to 100).toList
val processed = data.par
.filter(_ % 2 == 0)
.map(n => n * n)
.filter(_ < 1000)
.toList
.sorted
println(s"Parallel filtered: ${processed.take(10)}...")
// Aggregate with parallel
val words = "the quick brown fox jumps over the lazy brown dog".split(" ").toList
val freq = words.par
.groupBy(identity)
.map((word, occurrences) => word -> occurrences.size)
.toMap
println(s"Frequencies: $freq")Use Cases
- Speeding up CPU-bound operations
- Batched API calls with rate limiting
- Parallel data aggregation
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalaintermediate
Futures and Async Programming
Use Scala Futures for async operations: map, flatMap, recover, sequence, and race patterns.
Best for: Asynchronous API calls
#scala#futures
scalaadvanced
Akka Actor System Basics
Build concurrent applications with Akka actors: message passing, behavior switching, and supervision.
Best for: Concurrent message-passing systems
#scala#akka
scalaintermediate
Collection Views for Lazy Operations
Use collection views for efficient lazy transformations: avoid intermediate collections and improve performance.
Best for: Memory-efficient data processing
#scala#views
scalaintermediate
Future Async Patterns
Work with Scala Futures: composition, error handling, timeout, retry, and parallel execution.
Best for: Asynchronous API calls
#scala#future