pythonadvanced

Database Sync Script in Python

Sync data between two databases with upsert logic, batch processing, and change detection.

python
import logging
from datetime import datetime
from contextlib import contextmanager
from typing import Generator, Any
import psycopg2
from psycopg2.extras import execute_values

log = logging.getLogger(__name__)


@contextmanager
def connect(dsn: str) -> Generator:
    conn = psycopg2.connect(dsn)
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def fetch_changed_records(
    conn: Any, table: str, since: datetime, batch_size: int = 5000
) -> list[dict]:
    cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    cursor.execute(
        f"SELECT * FROM {table} WHERE updated_at > %s ORDER BY id LIMIT %s",
        (since, batch_size),
    )
    return cursor.fetchall()


def upsert_records(conn: Any, table: str, records: list[dict]) -> int:
    if not records:
        return 0
    columns = list(records[0].keys())
    cols_str = ", ".join(columns)
    update_str = ", ".join(f"{c} = EXCLUDED.{c}" for c in columns if c != "id")
    query = f"""
        INSERT INTO {table} ({cols_str})
        VALUES %s
        ON CONFLICT (id) DO UPDATE SET {update_str}
    """
    values = [tuple(r[c] for c in columns) for r in records]
    cursor = conn.cursor()
    execute_values(cursor, query, values)
    return len(records)


def sync(
    source_dsn: str, target_dsn: str, table: str, since: datetime
) -> None:
    log.info(f"Syncing {table} since {since}")
    with connect(source_dsn) as src, connect(target_dsn) as tgt:
        records = fetch_changed_records(src, table, since)
        if records:
            count = upsert_records(tgt, table, records)
            log.info(f"Upserted {count} records into {table}")
        else:
            log.info("No new records to sync")


if __name__ == "__main__":
    sync(
        source_dsn="postgresql://user:pass@source-host:5432/sourcedb",
        target_dsn="postgresql://user:pass@target-host:5432/targetdb",
        table="customers",
        since=datetime(2024, 1, 1),
    )

Use Cases

  • Replicating data between databases
  • Incremental data warehouse loading
  • Cross-environment data synchronization

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.