scalaintermediate
Apache Spark DataFrame Operations
Process big data with Spark DataFrames: read, filter, aggregate, join, and write operations.
scalaPress ⌘/Ctrl + Shift + C to copy
import org.apache.spark.sql.{SparkSession, DataFrame, Row}
import org.apache.spark.sql.functions.*
import org.apache.spark.sql.types.*
object SparkDataFrameOps:
def main(args: Array[String]): Unit =
val spark = SparkSession.builder()
.appName("DataFrameOps")
.master("local[*]")
.getOrCreate()
import spark.implicits.*
// Create DataFrame from sequence
val employees = Seq(
(1, "Alice", "Engineering", 120000),
(2, "Bob", "Engineering", 110000),
(3, "Carol", "Sales", 95000),
(4, "Dave", "Sales", 105000),
(5, "Eve", "Engineering", 130000),
(6, "Frank", "HR", 90000)
).toDF("id", "name", "department", "salary")
// Show schema and data
employees.printSchema()
employees.show()
// Select and filter
employees
.select("name", "department", "salary")
.filter(col("salary") > 100000)
.orderBy(col("salary").desc)
.show()
// Aggregation
employees
.groupBy("department")
.agg(
count("*").as("count"),
avg("salary").as("avg_salary"),
max("salary").as("max_salary"),
min("salary").as("min_salary")
)
.orderBy(col("avg_salary").desc)
.show()
// Add computed column
val withBonus = employees
.withColumn("bonus", col("salary") * 0.1)
.withColumn("total_comp", col("salary") + col("bonus"))
withBonus.show()
// Join
val departments = Seq(
("Engineering", "Building A"),
("Sales", "Building B"),
("HR", "Building C")
).toDF("dept_name", "location")
employees
.join(departments, employees("department") === departments("dept_name"))
.select("name", "department", "salary", "location")
.show()
// Window functions
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("department").orderBy(col("salary").desc)
employees
.withColumn("rank", rank().over(window))
.withColumn("dept_avg", avg("salary").over(Window.partitionBy("department")))
.show()
// Read/Write CSV
// employees.write.option("header", true).csv("output/employees")
// val df = spark.read.option("header", true).csv("data/employees.csv")
// SQL queries
employees.createOrReplaceTempView("emp")
spark.sql("""
SELECT department, COUNT(*) as cnt, AVG(salary) as avg_sal
FROM emp
GROUP BY department
ORDER BY avg_sal DESC
""").show()
spark.stop()Sponsored
Databricks
Use Cases
- Big data processing with Spark
- Data aggregation and reporting
- ETL pipeline transformations
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalaadvanced
Spark RDD Transformations and Actions
Process distributed data with Spark RDDs: map, reduce, aggregate, and pair RDD operations.
Best for: Distributed data processing
#scala#spark
scalaadvanced
Spark Structured Streaming
Process real-time data with Spark Structured Streaming: sources, transformations, and sinks.
Best for: Real-time event processing
#scala#spark
pythonadvanced
Spark SQL Query Example
PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.
Best for: Processing large-scale datasets with Spark
#spark#pyspark
pythonadvanced
PySpark DataFrame — Filter and Aggregate
Common PySpark DataFrame operations: filter, group by, window functions, and write to Parquet.
Best for: Large-scale data aggregation on distributed clusters
#spark#pyspark