scalaintermediate

Apache Spark DataFrame Operations

Process big data with Spark DataFrames: read, filter, aggregate, join, and write operations.

scala
import org.apache.spark.sql.{SparkSession, DataFrame, Row}
import org.apache.spark.sql.functions.*
import org.apache.spark.sql.types.*

object SparkDataFrameOps:
  def main(args: Array[String]): Unit =
    val spark = SparkSession.builder()
      .appName("DataFrameOps")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits.*

    // Create DataFrame from sequence
    val employees = Seq(
      (1, "Alice", "Engineering", 120000),
      (2, "Bob", "Engineering", 110000),
      (3, "Carol", "Sales", 95000),
      (4, "Dave", "Sales", 105000),
      (5, "Eve", "Engineering", 130000),
      (6, "Frank", "HR", 90000)
    ).toDF("id", "name", "department", "salary")

    // Show schema and data
    employees.printSchema()
    employees.show()

    // Select and filter
    employees
      .select("name", "department", "salary")
      .filter(col("salary") > 100000)
      .orderBy(col("salary").desc)
      .show()

    // Aggregation
    employees
      .groupBy("department")
      .agg(
        count("*").as("count"),
        avg("salary").as("avg_salary"),
        max("salary").as("max_salary"),
        min("salary").as("min_salary")
      )
      .orderBy(col("avg_salary").desc)
      .show()

    // Add computed column
    val withBonus = employees
      .withColumn("bonus", col("salary") * 0.1)
      .withColumn("total_comp", col("salary") + col("bonus"))
    withBonus.show()

    // Join
    val departments = Seq(
      ("Engineering", "Building A"),
      ("Sales", "Building B"),
      ("HR", "Building C")
    ).toDF("dept_name", "location")

    employees
      .join(departments, employees("department") === departments("dept_name"))
      .select("name", "department", "salary", "location")
      .show()

    // Window functions
    import org.apache.spark.sql.expressions.Window
    val window = Window.partitionBy("department").orderBy(col("salary").desc)
    employees
      .withColumn("rank", rank().over(window))
      .withColumn("dept_avg", avg("salary").over(Window.partitionBy("department")))
      .show()

    // Read/Write CSV
    // employees.write.option("header", true).csv("output/employees")
    // val df = spark.read.option("header", true).csv("data/employees.csv")

    // SQL queries
    employees.createOrReplaceTempView("emp")
    spark.sql("""
      SELECT department, COUNT(*) as cnt, AVG(salary) as avg_sal
      FROM emp
      GROUP BY department
      ORDER BY avg_sal DESC
    """).show()

    spark.stop()

Sponsored

Databricks

Use Cases

  • Big data processing with Spark
  • Data aggregation and reporting
  • ETL pipeline transformations

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.