sqlintermediate

SQL Incremental Load Pattern

Incremental data load using watermark tracking to process only new and updated records efficiently.

sql
-- Watermark table to track last loaded timestamp
CREATE TABLE IF NOT EXISTS etl_watermarks (
    table_name TEXT PRIMARY KEY,
    last_loaded_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01',
    rows_loaded BIGINT DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Initialize watermark for a table
INSERT INTO etl_watermarks (table_name)
VALUES ('orders')
ON CONFLICT (table_name) DO NOTHING;

-- Incremental load: insert new/updated rows into staging
INSERT INTO staging.orders
SELECT s.*
FROM source.orders s
JOIN etl_watermarks w ON w.table_name = 'orders'
WHERE s.updated_at > w.last_loaded_at;

-- Merge staging into target (upsert)
INSERT INTO warehouse.orders
SELECT * FROM staging.orders
ON CONFLICT (id) DO UPDATE SET
    status     = EXCLUDED.status,
    amount     = EXCLUDED.amount,
    updated_at = EXCLUDED.updated_at;

-- Update watermark after successful load
UPDATE etl_watermarks
SET
    last_loaded_at = (
        SELECT COALESCE(MAX(updated_at), last_loaded_at)
        FROM staging.orders
    ),
    rows_loaded = rows_loaded + (
        SELECT COUNT(*) FROM staging.orders
    ),
    updated_at = NOW()
WHERE table_name = 'orders';

-- Truncate staging after merge
TRUNCATE staging.orders;

-- Verify load
SELECT table_name, last_loaded_at, rows_loaded
FROM etl_watermarks
WHERE table_name = 'orders';

Use Cases

  • Efficient warehouse loading without full reloads
  • Change data capture for analytics tables
  • Tracking ETL pipeline progress per table

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.