pythonadvanced
Build a Data Lineage Graph with NetworkX
Track and visualise data lineage across ETL pipeline stages using a directed graph.
pythonPress ⌘/Ctrl + Shift + C to copy
import networkx as nx
G = nx.DiGraph()
for node in ['raw_events','clean_events','enriched_events','fact_orders','dim_customer']:
G.add_node(node)
edges = [
('raw_events', 'clean_events', {'op':'deduplicate'}),
('clean_events', 'enriched_events', {'op':'join customer'}),
('enriched_events','fact_orders', {'op':'aggregate'}),
('dim_customer', 'enriched_events', {'op':'join'}),
]
G.add_edges_from(edges)
print('Ancestors:', nx.ancestors(G, 'fact_orders'))
print('Topological order:', list(nx.topological_sort(G)))Use Cases
- data governance
- impact analysis
- ETL documentation
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
sqladvanced
SQL Data Lineage Tracking
Track data lineage across ETL stages with metadata logging for debugging and audit trails.
Best for: Tracing data flow across pipeline stages
#lineage#metadata
pythonadvanced
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
#etl#pipeline
pythonintermediate
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
Best for: Processing large CSV files that don't fit in memory
#batch-processing#python
pythonadvanced
Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
Best for: Replicating data between databases
#database#sync