sqladvanced

SQL Data Lineage Tracking

Track data lineage across ETL stages with metadata logging for debugging and audit trails.

sql
-- Data lineage tracking table
CREATE TABLE data_lineage (
    id BIGSERIAL PRIMARY KEY,
    pipeline_run_id UUID NOT NULL,
    stage TEXT NOT NULL,
    source_table TEXT,
    target_table TEXT NOT NULL,
    rows_read BIGINT DEFAULT 0,
    rows_written BIGINT DEFAULT 0,
    rows_rejected BIGINT DEFAULT 0,
    started_at TIMESTAMPTZ NOT NULL,
    completed_at TIMESTAMPTZ,
    status TEXT DEFAULT 'running' CHECK (status IN ('running', 'success', 'failed')),
    error_message TEXT,
    metadata JSONB DEFAULT '{}'
);

CREATE INDEX idx_lineage_pipeline ON data_lineage (pipeline_run_id);
CREATE INDEX idx_lineage_target ON data_lineage (target_table, started_at);

-- Log ETL stage start
INSERT INTO data_lineage (pipeline_run_id, stage, source_table, target_table, started_at)
VALUES (
    'a1b2c3d4-e5f6-7890-abcd-ef1234567890',
    'transform',
    'raw.orders',
    'staging.orders_clean',
    NOW()
);

-- Update on completion
UPDATE data_lineage
SET
    completed_at = NOW(),
    rows_read = 50000,
    rows_written = 48500,
    rows_rejected = 1500,
    status = 'success',
    metadata = '{"filter": "status != cancelled", "dedup_key": "order_id"}'::jsonb
WHERE pipeline_run_id = 'a1b2c3d4-e5f6-7890-abcd-ef1234567890'
  AND stage = 'transform';

-- Query: trace a table's data sources
SELECT stage, source_table, rows_read, rows_written, completed_at
FROM data_lineage
WHERE target_table = 'warehouse.fct_orders'
ORDER BY completed_at DESC
LIMIT 10;

Use Cases

  • Tracing data flow across pipeline stages
  • Debugging data quality issues to their source
  • Audit compliance for data governance

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.