pythonintermediate

Data Quality Testing with Expectations

Define and run data quality expectations for automated validation in data pipelines.

python
import pandas as pd
from dataclasses import dataclass
from typing import Any, Callable


@dataclass
class ExpectationResult:
    name: str
    passed: bool
    detail: str


def expect_not_null(df: pd.DataFrame, column: str) -> ExpectationResult:
    null_count = df[column].isna().sum()
    return ExpectationResult(
        name=f"{column} not null",
        passed=null_count == 0,
        detail=f"{null_count} nulls found",
    )


def expect_unique(df: pd.DataFrame, column: str) -> ExpectationResult:
    dup_count = df[column].duplicated().sum()
    return ExpectationResult(
        name=f"{column} unique",
        passed=dup_count == 0,
        detail=f"{dup_count} duplicates found",
    )


def expect_values_in(df: pd.DataFrame, column: str, allowed: set) -> ExpectationResult:
    invalid = set(df[column].dropna().unique()) - allowed
    return ExpectationResult(
        name=f"{column} values in {allowed}",
        passed=len(invalid) == 0,
        detail=f"Invalid values: {invalid}" if invalid else "All valid",
    )


def expect_range(df: pd.DataFrame, column: str, min_val: float, max_val: float) -> ExpectationResult:
    out = df[(df[column] < min_val) | (df[column] > max_val)]
    return ExpectationResult(
        name=f"{column} in [{min_val}, {max_val}]",
        passed=len(out) == 0,
        detail=f"{len(out)} out of range",
    )


def run_suite(df: pd.DataFrame) -> list[ExpectationResult]:
    return [
        expect_not_null(df, "id"),
        expect_unique(df, "id"),
        expect_not_null(df, "amount"),
        expect_range(df, "amount", 0, 100000),
        expect_values_in(df, "status", {"active", "inactive", "pending"}),
    ]


df = pd.read_csv("data.csv")
results = run_suite(df)
for r in results:
    icon = "PASS" if r.passed else "FAIL"
    print(f"[{icon}] {r.name}: {r.detail}")

failed = [r for r in results if not r.passed]
if failed:
    raise SystemExit(f"{len(failed)} expectations failed")

Use Cases

  • Automated data quality gates in pipelines
  • Pre-load validation before warehouse ingestion
  • Continuous data monitoring and alerting

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.