pythonadvanced

Spark SQL Query Example

PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.

python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("analytics").getOrCreate()

# Read data
df = spark.read.parquet("s3a://bucket/sales/")

# DataFrame API transformations
result = (
    df.filter(F.col("status") == "completed")
    .withColumn("year", F.year("order_date"))
    .withColumn("month", F.month("order_date"))
    .groupBy("year", "month", "region")
    .agg(
        F.sum("amount").alias("total_revenue"),
        F.count("order_id").alias("order_count"),
        F.avg("amount").alias("avg_order_value"),
    )
    .orderBy("year", "month")
)

# Window functions for running totals and ranking
window_spec = Window.partitionBy("region").orderBy("year", "month")

result = result.withColumn(
    "running_total", F.sum("total_revenue").over(window_spec)
).withColumn(
    "rank", F.dense_rank().over(
        Window.partitionBy("year", "month").orderBy(F.desc("total_revenue"))
    )
)

# SQL approach
df.createOrReplaceTempView("sales")

sql_result = spark.sql("""
    SELECT
        region,
        DATE_TRUNC('month', order_date) AS month,
        SUM(amount) AS revenue,
        COUNT(*) AS orders,
        SUM(amount) / COUNT(*) AS aov
    FROM sales
    WHERE status = 'completed'
    GROUP BY region, DATE_TRUNC('month', order_date)
    HAVING SUM(amount) > 1000
    ORDER BY revenue DESC
""")

result.write.mode("overwrite").partitionBy("year").parquet("s3a://bucket/output/")

spark.stop()

Sponsored

Databricks — Unified analytics platform

Use Cases

  • Processing large-scale datasets with Spark
  • Running analytics queries on data lake files
  • Building aggregated reports from raw event data

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.