sqladvanced
SQL Data Lineage Tracking
Track data lineage across ETL stages with metadata logging for debugging and audit trails.
sqlPress ⌘/Ctrl + Shift + C to copy
-- Data lineage tracking table
CREATE TABLE data_lineage (
id BIGSERIAL PRIMARY KEY,
pipeline_run_id UUID NOT NULL,
stage TEXT NOT NULL,
source_table TEXT,
target_table TEXT NOT NULL,
rows_read BIGINT DEFAULT 0,
rows_written BIGINT DEFAULT 0,
rows_rejected BIGINT DEFAULT 0,
started_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
status TEXT DEFAULT 'running' CHECK (status IN ('running', 'success', 'failed')),
error_message TEXT,
metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_lineage_pipeline ON data_lineage (pipeline_run_id);
CREATE INDEX idx_lineage_target ON data_lineage (target_table, started_at);
-- Log ETL stage start
INSERT INTO data_lineage (pipeline_run_id, stage, source_table, target_table, started_at)
VALUES (
'a1b2c3d4-e5f6-7890-abcd-ef1234567890',
'transform',
'raw.orders',
'staging.orders_clean',
NOW()
);
-- Update on completion
UPDATE data_lineage
SET
completed_at = NOW(),
rows_read = 50000,
rows_written = 48500,
rows_rejected = 1500,
status = 'success',
metadata = '{"filter": "status != cancelled", "dedup_key": "order_id"}'::jsonb
WHERE pipeline_run_id = 'a1b2c3d4-e5f6-7890-abcd-ef1234567890'
AND stage = 'transform';
-- Query: trace a table's data sources
SELECT stage, source_table, rows_read, rows_written, completed_at
FROM data_lineage
WHERE target_table = 'warehouse.fct_orders'
ORDER BY completed_at DESC
LIMIT 10;Use Cases
- Tracing data flow across pipeline stages
- Debugging data quality issues to their source
- Audit compliance for data governance
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonadvanced
Build a Data Lineage Graph with NetworkX
Track and visualise data lineage across ETL pipeline stages using a directed graph.
Best for: data governance
#networkx#lineage
pythonadvanced
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
#etl#pipeline
pythonintermediate
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
Best for: Processing large CSV files that don't fit in memory
#batch-processing#python
pythonadvanced
Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
Best for: Replicating data between databases
#database#sync