scalaadvanced

Spark RDD Transformations and Actions

Process distributed data with Spark RDDs: map, reduce, aggregate, and pair RDD operations.

scala
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SparkRDDOps:
  def main(args: Array[String]): Unit =
    val conf = SparkConf()
      .setAppName("RDD Operations")
      .setMaster("local[*]")
    val sc = SparkContext(conf)
    sc.setLogLevel("WARN")

    // Create RDD
    val numbers = sc.parallelize(1 to 100)

    // Transformations (lazy)
    val doubled = numbers.map(_ * 2)
    val evens = numbers.filter(_ % 2 == 0)
    val pairs = numbers.map(n => (n % 10, n))

    // Actions (trigger computation)
    println(s"Count: ${numbers.count()}")
    println(s"Sum: ${numbers.reduce(_ + _)}")
    println(s"First 5: ${doubled.take(5).mkString(", ")}")

    // Word count
    val lines = sc.parallelize(List(
      "hello world hello",
      "world foo bar",
      "hello foo baz"
    ))

    val wordCounts = lines
      .flatMap(_.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)

    println("Word counts:")
    wordCounts.collect().foreach { (word, count) =>
      println(s"  $word: $count")
    }

    // Pair RDD operations
    val sales = sc.parallelize(List(
      ("Electronics", 1200.0),
      ("Books", 45.0),
      ("Electronics", 800.0),
      ("Books", 30.0),
      ("Clothing", 150.0),
      ("Electronics", 2000.0),
      ("Clothing", 200.0)
    ))

    // groupByKey and aggregation
    val totals = sales.reduceByKey(_ + _)
    println("\nSales totals:")
    totals.collect().foreach { (cat, total) =>
      println(f"  $cat: $$$total%.2f")
    }

    // aggregateByKey
    val stats = sales.aggregateByKey((0.0, 0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (a, b) => (a._1 + b._1, a._2 + b._2)
    ).mapValues { (total, count) =>
      f"avg: $$${total / count}%.2f, total: $$$total%.2f, count: $count"
    }

    println("\nSales stats:")
    stats.collect().foreach { (cat, stat) =>
      println(s"  $cat -> $stat")
    }

    // Join
    val inventory = sc.parallelize(List(
      ("Electronics", 500),
      ("Books", 1000),
      ("Clothing", 300)
    ))

    val joined = totals.join(inventory)
    println("\nJoined:")
    joined.collect().foreach { (cat, (sales, inv)) =>
      println(f"  $cat: sales=$$$sales%.2f, inventory=$inv")
    }

    // Cache and unpersist
    val cached = numbers.cache()
    cached.count()  // triggers caching
    println(s"Cached sum: ${cached.sum().toLong}")
    cached.unpersist()

    sc.stop()

Sponsored

Databricks

Use Cases

  • Distributed data processing
  • Word count and text analysis
  • Sales data aggregation

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.