pythonadvanced
Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
pythonPress ⌘/Ctrl + Shift + C to copy
import logging
from datetime import datetime
from contextlib import contextmanager
from typing import Generator, Any
import psycopg2
from psycopg2.extras import execute_values
log = logging.getLogger(__name__)
@contextmanager
def connect(dsn: str) -> Generator:
conn = psycopg2.connect(dsn)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def fetch_changed_records(
conn: Any, table: str, since: datetime, batch_size: int = 5000
) -> list[dict]:
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
f"SELECT * FROM {table} WHERE updated_at > %s ORDER BY id LIMIT %s",
(since, batch_size),
)
return cursor.fetchall()
def upsert_records(conn: Any, table: str, records: list[dict]) -> int:
if not records:
return 0
columns = list(records[0].keys())
cols_str = ", ".join(columns)
update_str = ", ".join(f"{c} = EXCLUDED.{c}" for c in columns if c != "id")
query = f"""
INSERT INTO {table} ({cols_str})
VALUES %s
ON CONFLICT (id) DO UPDATE SET {update_str}
"""
values = [tuple(r[c] for c in columns) for r in records]
cursor = conn.cursor()
execute_values(cursor, query, values)
return len(records)
def sync(
source_dsn: str, target_dsn: str, table: str, since: datetime
) -> None:
log.info(f"Syncing {table} since {since}")
with connect(source_dsn) as src, connect(target_dsn) as tgt:
records = fetch_changed_records(src, table, since)
if records:
count = upsert_records(tgt, table, records)
log.info(f"Upserted {count} records into {table}")
else:
log.info("No new records to sync")
if __name__ == "__main__":
sync(
source_dsn="postgresql://user:pass@source-host:5432/sourcedb",
target_dsn="postgresql://user:pass@target-host:5432/targetdb",
table="customers",
since=datetime(2024, 1, 1),
)Use Cases
- Replicating data between databases
- Incremental data warehouse loading
- Cross-environment data synchronization
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonadvanced
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
#etl#pipeline
pythonintermediate
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
Best for: Processing large CSV files that don't fit in memory
#batch-processing#python
sqladvanced
ETL Pipeline - Technique 39
Extract Transform Load
Best for: database operations
#sql#database
pythonbeginner
Pandas DataFrame Transformations
Common pandas DataFrame transformations including column operations, type casting, and string methods.
Best for: Cleaning raw data files for analysis
#pandas#dataframe