pythonadvanced

Databricks Notebook Data Pipeline

Databricks notebook with Delta Lake reads, transformations, merge operations, and table optimization.

python
from pyspark.sql import functions as F
from delta.tables import DeltaTable

# Read from Delta Lake
df = spark.read.table("bronze.raw_events")

# Transform: clean and enrich
silver_df = (
    df.filter(F.col("event_type").isNotNull())
    .withColumn("event_date", F.to_date("timestamp"))
    .withColumn("hour", F.hour("timestamp"))
    .withColumn("processed_at", F.current_timestamp())
    .dropDuplicates(["event_id"])
)

# Write to silver layer
silver_df.write.format("delta").mode("append").partitionBy("event_date").saveAsTable(
    "silver.events"
)

# Merge / Upsert into gold table
agg_df = (
    silver_df.groupBy("event_date", "event_type")
    .agg(
        F.count("event_id").alias("event_count"),
        F.countDistinct("user_id").alias("unique_users"),
    )
)

gold_table = DeltaTable.forName(spark, "gold.event_metrics")

gold_table.alias("target").merge(
    agg_df.alias("source"),
    "target.event_date = source.event_date AND target.event_type = source.event_type"
).whenMatchedUpdate(
    set={
        "event_count": "source.event_count",
        "unique_users": "source.unique_users",
    }
).whenNotMatchedInsertAll().execute()

# Optimize table performance
spark.sql("OPTIMIZE gold.event_metrics ZORDER BY (event_date)")
spark.sql("VACUUM gold.event_metrics RETAIN 168 HOURS")

print(f"Processed {silver_df.count()} events")

Sponsored

Databricks — Unified analytics platform

Use Cases

  • Medallion architecture data pipelines on Databricks
  • Delta Lake merge operations for SCD tables
  • Automated data lakehouse ETL workflows

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.