pythonadvanced

PySpark DataFrame — Filter and Aggregate

Common PySpark DataFrame operations: filter, group by, window functions, and write to Parquet.

python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("etl").getOrCreate()

# Read data
df = spark.read.parquet("s3://bucket/events/")

# Filter and transform
active = (
    df
    .filter(F.col("status") == "active")
    .filter(F.col("created_at") >= "2024-01-01")
    .withColumn("year_month", F.date_format("created_at", "yyyy-MM"))
    .withColumn("amount_usd", F.col("amount") * F.col("exchange_rate"))
)

# Group by and aggregate
summary = (
    active
    .groupBy("region", "year_month")
    .agg(
        F.sum("amount_usd").alias("total_revenue"),
        F.countDistinct("user_id").alias("unique_users"),
        F.avg("amount_usd").alias("avg_order_value"),
    )
    .orderBy("region", "year_month")
)

# Window: running total per region
window = Window.partitionBy("region").orderBy("year_month")
summary = summary.withColumn(
    "cumulative_revenue",
    F.sum("total_revenue").over(window)
)

# Write partitioned output
summary.write.partitionBy("region").mode("overwrite").parquet(
    "s3://bucket/output/revenue-summary/"
)

spark.stop()

Sponsored

Databricks

Use Cases

  • Large-scale data aggregation on distributed clusters
  • ETL pipelines reading from and writing to S3
  • Window function analytics on big data

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.