Databricks Notebook Data Pipeline
Databricks notebook with Delta Lake reads, transformations, merge operations, and table optimization.
from pyspark.sql import functions as F
from delta.tables import DeltaTable
# Read from Delta Lake
df = spark.read.table("bronze.raw_events")
# Transform: clean and enrich
silver_df = (
df.filter(F.col("event_type").isNotNull())
.withColumn("event_date", F.to_date("timestamp"))
.withColumn("hour", F.hour("timestamp"))
.withColumn("processed_at", F.current_timestamp())
.dropDuplicates(["event_id"])
)
# Write to silver layer
silver_df.write.format("delta").mode("append").partitionBy("event_date").saveAsTable(
"silver.events"
)
# Merge / Upsert into gold table
agg_df = (
silver_df.groupBy("event_date", "event_type")
.agg(
F.count("event_id").alias("event_count"),
F.countDistinct("user_id").alias("unique_users"),
)
)
gold_table = DeltaTable.forName(spark, "gold.event_metrics")
gold_table.alias("target").merge(
agg_df.alias("source"),
"target.event_date = source.event_date AND target.event_type = source.event_type"
).whenMatchedUpdate(
set={
"event_count": "source.event_count",
"unique_users": "source.unique_users",
}
).whenNotMatchedInsertAll().execute()
# Optimize table performance
spark.sql("OPTIMIZE gold.event_metrics ZORDER BY (event_date)")
spark.sql("VACUUM gold.event_metrics RETAIN 168 HOURS")
print(f"Processed {silver_df.count()} events")Sponsored
Databricks — Unified analytics platform
Use Cases
- Medallion architecture data pipelines on Databricks
- Delta Lake merge operations for SCD tables
- Automated data lakehouse ETL workflows
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
Spark SQL Query Example
PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.
Best for: Processing large-scale datasets with Spark
Retry Logic for Data Pipelines
Configurable retry decorator with exponential backoff and jitter for resilient data pipeline tasks.
Best for: Resilient API calls in data pipelines
PySpark DataFrame — Filter and Aggregate
Common PySpark DataFrame operations: filter, group by, window functions, and write to Parquet.
Best for: Large-scale data aggregation on distributed clusters