sqlintermediate
SQL Incremental Load Pattern
Incremental data load using watermark tracking to process only new and updated records efficiently.
sqlPress ⌘/Ctrl + Shift + C to copy
-- Watermark table to track last loaded timestamp
CREATE TABLE IF NOT EXISTS etl_watermarks (
table_name TEXT PRIMARY KEY,
last_loaded_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01',
rows_loaded BIGINT DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Initialize watermark for a table
INSERT INTO etl_watermarks (table_name)
VALUES ('orders')
ON CONFLICT (table_name) DO NOTHING;
-- Incremental load: insert new/updated rows into staging
INSERT INTO staging.orders
SELECT s.*
FROM source.orders s
JOIN etl_watermarks w ON w.table_name = 'orders'
WHERE s.updated_at > w.last_loaded_at;
-- Merge staging into target (upsert)
INSERT INTO warehouse.orders
SELECT * FROM staging.orders
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
amount = EXCLUDED.amount,
updated_at = EXCLUDED.updated_at;
-- Update watermark after successful load
UPDATE etl_watermarks
SET
last_loaded_at = (
SELECT COALESCE(MAX(updated_at), last_loaded_at)
FROM staging.orders
),
rows_loaded = rows_loaded + (
SELECT COUNT(*) FROM staging.orders
),
updated_at = NOW()
WHERE table_name = 'orders';
-- Truncate staging after merge
TRUNCATE staging.orders;
-- Verify load
SELECT table_name, last_loaded_at, rows_loaded
FROM etl_watermarks
WHERE table_name = 'orders';Use Cases
- Efficient warehouse loading without full reloads
- Change data capture for analytics tables
- Tracking ETL pipeline progress per table
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
sqlintermediate
SQL Data Deduplication Techniques
Remove duplicate records using ROW_NUMBER, DISTINCT ON, and self-join deduplication strategies.
Best for: Cleaning duplicate records in production databases
#sql#deduplication
sqladvanced
ETL Pipeline - Technique 39
Extract Transform Load
Best for: database operations
#sql#database
pythonadvanced
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
#etl#pipeline
pythonadvanced
Spark SQL Query Example
PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.
Best for: Processing large-scale datasets with Spark
#spark#pyspark