sqladvanced

Change Data Capture Pattern in SQL

Implement change data capture with trigger-based auditing to track inserts, updates, and deletes.

sql
-- CDC audit log table
CREATE TABLE cdc_log (
    id BIGSERIAL PRIMARY KEY,
    table_name TEXT NOT NULL,
    operation TEXT NOT NULL CHECK (operation IN ('INSERT', 'UPDATE', 'DELETE')),
    record_id TEXT NOT NULL,
    old_data JSONB,
    new_data JSONB,
    changed_at TIMESTAMPTZ DEFAULT NOW(),
    changed_by TEXT DEFAULT current_user
);

CREATE INDEX idx_cdc_table ON cdc_log (table_name, changed_at);

-- Generic CDC trigger function
CREATE OR REPLACE FUNCTION fn_cdc_trigger()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO cdc_log (table_name, operation, record_id, new_data)
        VALUES (TG_TABLE_NAME, 'INSERT', NEW.id::text, to_jsonb(NEW));
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO cdc_log (table_name, operation, record_id, old_data, new_data)
        VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id::text, to_jsonb(OLD), to_jsonb(NEW));
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO cdc_log (table_name, operation, record_id, old_data)
        VALUES (TG_TABLE_NAME, 'DELETE', OLD.id::text, to_jsonb(OLD));
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;

-- Attach to any table
CREATE TRIGGER trg_orders_cdc
    AFTER INSERT OR UPDATE OR DELETE ON orders
    FOR EACH ROW EXECUTE FUNCTION fn_cdc_trigger();

-- Query recent changes
SELECT * FROM cdc_log
WHERE table_name = 'orders' AND changed_at > NOW() - INTERVAL '1 hour'
ORDER BY changed_at DESC;

Use Cases

  • Tracking all data changes for audit compliance
  • Building event sourcing from database operations
  • Debugging data issues with change history

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.