SQL Incremental Load Pattern
Incremental data load using watermark tracking to process only new and updated records efficiently.
-- Watermark table to track last loaded timestamp
CREATE TABLE IF NOT EXISTS etl_watermarks (
table_name TEXT PRIMARY KEY,
last_loaded_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01',
rows_loaded BIGINT DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Initialize watermark for a table
INSERT INTO etl_watermarks (table_name)
VALUES ('orders')
ON CONFLICT (table_name) DO NOTHING;
-- Incremental load: insert new/updated rows into staging
INSERT INTO staging.orders
SELECT s.*
FROM source.orders s
JOIN etl_watermarks w ON w.table_name = 'orders'
WHERE s.updated_at > w.last_loaded_at;
-- Merge staging into target (upsert)
INSERT INTO warehouse.orders
SELECT * FROM staging.orders
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
amount = EXCLUDED.amount,
updated_at = EXCLUDED.updated_at;
-- Update watermark after successful load
UPDATE etl_watermarks
SET
last_loaded_at = (
SELECT COALESCE(MAX(updated_at), last_loaded_at)
FROM staging.orders
),
rows_loaded = rows_loaded + (
SELECT COUNT(*) FROM staging.orders
),
updated_at = NOW()
WHERE table_name = 'orders';
-- Truncate staging after merge
TRUNCATE staging.orders;
-- Verify load
SELECT table_name, last_loaded_at, rows_loaded
FROM etl_watermarks
WHERE table_name = 'orders';Use Cases
- Efficient warehouse loading without full reloads
- Change data capture for analytics tables
- Tracking ETL pipeline progress per table
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
SQL Data Deduplication Techniques
Remove duplicate records using ROW_NUMBER, DISTINCT ON, and self-join deduplication strategies.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Spark SQL Query Example
PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.