Databricks Notebook Data Pipeline
Databricks notebook with Delta Lake reads, transformations, merge operations, and table optimization.
from pyspark.sql import functions as F
from delta.tables import DeltaTable
# Read from Delta Lake
df = spark.read.table("bronze.raw_events")
# Transform: clean and enrich
silver_df = (
df.filter(F.col("event_type").isNotNull())
.withColumn("event_date", F.to_date("timestamp"))
.withColumn("hour", F.hour("timestamp"))
.withColumn("processed_at", F.current_timestamp())
.dropDuplicates(["event_id"])
)
# Write to silver layer
silver_df.write.format("delta").mode("append").partitionBy("event_date").saveAsTable(
"silver.events"
)
# Merge / Upsert into gold table
agg_df = (
silver_df.groupBy("event_date", "event_type")
.agg(
F.count("event_id").alias("event_count"),
F.countDistinct("user_id").alias("unique_users"),
)
)
gold_table = DeltaTable.forName(spark, "gold.event_metrics")
gold_table.alias("target").merge(
agg_df.alias("source"),
"target.event_date = source.event_date AND target.event_type = source.event_type"
).whenMatchedUpdate(
set={
"event_count": "source.event_count",
"unique_users": "source.unique_users",
}
).whenNotMatchedInsertAll().execute()
# Optimize table performance
spark.sql("OPTIMIZE gold.event_metrics ZORDER BY (event_date)")
spark.sql("VACUUM gold.event_metrics RETAIN 168 HOURS")
print(f"Processed {silver_df.count()} events")Sponsored
Databricks — Unified analytics platform
Use Cases
- Medallion architecture data pipelines on Databricks
- Delta Lake merge operations for SCD tables
- Automated data lakehouse ETL workflows
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Spark SQL Query Example
PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.
Retry Logic for Data Pipelines
Configurable retry decorator with exponential backoff and jitter for resilient data pipeline tasks.
Generator Pipeline for Data Processing
Chain generators to build memory-efficient data processing pipelines for large files and streams.