scalaadvanced
Spark RDD Transformations and Actions
Process distributed data with Spark RDDs: map, reduce, aggregate, and pair RDD operations.
scalaPress ⌘/Ctrl + Shift + C to copy
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SparkRDDOps:
def main(args: Array[String]): Unit =
val conf = SparkConf()
.setAppName("RDD Operations")
.setMaster("local[*]")
val sc = SparkContext(conf)
sc.setLogLevel("WARN")
// Create RDD
val numbers = sc.parallelize(1 to 100)
// Transformations (lazy)
val doubled = numbers.map(_ * 2)
val evens = numbers.filter(_ % 2 == 0)
val pairs = numbers.map(n => (n % 10, n))
// Actions (trigger computation)
println(s"Count: ${numbers.count()}")
println(s"Sum: ${numbers.reduce(_ + _)}")
println(s"First 5: ${doubled.take(5).mkString(", ")}")
// Word count
val lines = sc.parallelize(List(
"hello world hello",
"world foo bar",
"hello foo baz"
))
val wordCounts = lines
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
println("Word counts:")
wordCounts.collect().foreach { (word, count) =>
println(s" $word: $count")
}
// Pair RDD operations
val sales = sc.parallelize(List(
("Electronics", 1200.0),
("Books", 45.0),
("Electronics", 800.0),
("Books", 30.0),
("Clothing", 150.0),
("Electronics", 2000.0),
("Clothing", 200.0)
))
// groupByKey and aggregation
val totals = sales.reduceByKey(_ + _)
println("\nSales totals:")
totals.collect().foreach { (cat, total) =>
println(f" $cat: $$$total%.2f")
}
// aggregateByKey
val stats = sales.aggregateByKey((0.0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(a, b) => (a._1 + b._1, a._2 + b._2)
).mapValues { (total, count) =>
f"avg: $$${total / count}%.2f, total: $$$total%.2f, count: $count"
}
println("\nSales stats:")
stats.collect().foreach { (cat, stat) =>
println(s" $cat -> $stat")
}
// Join
val inventory = sc.parallelize(List(
("Electronics", 500),
("Books", 1000),
("Clothing", 300)
))
val joined = totals.join(inventory)
println("\nJoined:")
joined.collect().foreach { (cat, (sales, inv)) =>
println(f" $cat: sales=$$$sales%.2f, inventory=$inv")
}
// Cache and unpersist
val cached = numbers.cache()
cached.count() // triggers caching
println(s"Cached sum: ${cached.sum().toLong}")
cached.unpersist()
sc.stop()Sponsored
Databricks
Use Cases
- Distributed data processing
- Word count and text analysis
- Sales data aggregation
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalaintermediate
Apache Spark DataFrame Operations
Process big data with Spark DataFrames: read, filter, aggregate, join, and write operations.
Best for: Big data processing with Spark
#scala#spark
scalaadvanced
Spark Structured Streaming
Process real-time data with Spark Structured Streaming: sources, transformations, and sinks.
Best for: Real-time event processing
#scala#spark
pythonadvanced
Spark SQL Query Example
PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.
Best for: Processing large-scale datasets with Spark
#spark#pyspark
pythonadvanced
PySpark DataFrame — Filter and Aggregate
Common PySpark DataFrame operations: filter, group by, window functions, and write to Parquet.
Best for: Large-scale data aggregation on distributed clusters
#spark#pyspark