Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
import logging
from datetime import datetime
from contextlib import contextmanager
from typing import Generator, Any
import psycopg2
from psycopg2.extras import execute_values
log = logging.getLogger(__name__)
@contextmanager
def connect(dsn: str) -> Generator:
conn = psycopg2.connect(dsn)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def fetch_changed_records(
conn: Any, table: str, since: datetime, batch_size: int = 5000
) -> list[dict]:
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
f"SELECT * FROM {table} WHERE updated_at > %s ORDER BY id LIMIT %s",
(since, batch_size),
)
return cursor.fetchall()
def upsert_records(conn: Any, table: str, records: list[dict]) -> int:
if not records:
return 0
columns = list(records[0].keys())
cols_str = ", ".join(columns)
update_str = ", ".join(f"{c} = EXCLUDED.{c}" for c in columns if c != "id")
query = f"""
INSERT INTO {table} ({cols_str})
VALUES %s
ON CONFLICT (id) DO UPDATE SET {update_str}
"""
values = [tuple(r[c] for c in columns) for r in records]
cursor = conn.cursor()
execute_values(cursor, query, values)
return len(records)
def sync(
source_dsn: str, target_dsn: str, table: str, since: datetime
) -> None:
log.info(f"Syncing {table} since {since}")
with connect(source_dsn) as src, connect(target_dsn) as tgt:
records = fetch_changed_records(src, table, since)
if records:
count = upsert_records(tgt, table, records)
log.info(f"Upserted {count} records into {table}")
else:
log.info("No new records to sync")
if __name__ == "__main__":
sync(
source_dsn="postgresql://user:pass@source-host:5432/sourcedb",
target_dsn="postgresql://user:pass@target-host:5432/targetdb",
table="customers",
since=datetime(2024, 1, 1),
)Use Cases
- Replicating data between databases
- Incremental data warehouse loading
- Cross-environment data synchronization
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
Pandas DataFrame Transformations
Common pandas DataFrame transformations including column operations, type casting, and string methods.
Nested JSON Flattening in Python
Flatten deeply nested JSON structures into flat dictionaries suitable for DataFrames or CSV export.