Skip to content

Validation

Schema validation and dry-run preview for TransformPlan pipelines.

Overview

TransformPlan validates operations against DataFrame schemas before execution. This catches errors like:

  • Referencing non-existent columns
  • Applying string operations to numeric columns
  • Creating columns that already exist
from transformplan import TransformPlan

plan = TransformPlan().col_drop("nonexistent")
result = plan.validate(df)

if not result.is_valid:
    for error in result.errors:
        print(error)
    # Step 1 (col_drop): Column 'nonexistent' does not exist

ValidationResult

ValidationResult

ValidationResult()

Result of schema validation.

Initialize an empty validation result.

Source code in transformplan/validation.py
def __init__(self) -> None:
    """Initialize an empty validation result."""
    self._errors: list[ValidationError] = []

is_valid property

is_valid: bool

Check if validation passed.

Returns:

Type Description
bool

True if no errors, False otherwise.

errors property

errors: list[ValidationError]

Get list of validation errors.

Returns:

Type Description
list[ValidationError]

List of ValidationError instances.

add_error

add_error(step: int, operation: str, message: str) -> None

Add a validation error.

Source code in transformplan/validation.py
def add_error(self, step: int, operation: str, message: str) -> None:
    """Add a validation error."""
    self._errors.append(ValidationError(step, operation, message))

raise_if_invalid

raise_if_invalid() -> None

Raise SchemaValidationError if validation failed.

Raises:

Type Description
SchemaValidationError

If validation failed with errors.

Source code in transformplan/validation.py
def raise_if_invalid(self) -> None:
    """Raise SchemaValidationError if validation failed.

    Raises:
        SchemaValidationError: If validation failed with errors.
    """
    if not self.is_valid:
        error_messages = "\n".join(f"  - {e}" for e in self._errors)
        msg = f"Schema validation failed with {len(self._errors)} error(s):\n{error_messages}"
        raise SchemaValidationError(msg)

ValidationError

ValidationError dataclass

ValidationError(step: int, operation: str, message: str)

A single validation error.

__str__

__str__() -> str

Return error message string.

Returns:

Type Description
str

Formatted error message.

Source code in transformplan/validation.py
def __str__(self) -> str:
    """Return error message string.

    Returns:
        Formatted error message.
    """
    return f"Step {self.step} ({self.operation}): {self.message}"

SchemaValidationError

SchemaValidationError

Bases: Exception

Raised when schema validation fails.

DryRunResult

DryRunResult

DryRunResult(
    input_schema: dict[str, DataType],
    steps: list[DryRunStep],
    validation: ValidationResult,
)

Result of a dry run showing what a pipeline will do.

Initialize DryRunResult.

Parameters:

Name Type Description Default
input_schema dict[str, DataType]

Initial schema as column name to dtype mapping.

required
steps list[DryRunStep]

List of dry run steps.

required
validation ValidationResult

Validation result with any errors.

required
Source code in transformplan/validation.py
def __init__(
    self,
    input_schema: dict[str, pl.DataType],
    steps: list[DryRunStep],
    validation: ValidationResult,
) -> None:
    """Initialize DryRunResult.

    Args:
        input_schema: Initial schema as column name to dtype mapping.
        steps: List of dry run steps.
        validation: Validation result with any errors.
    """
    self._input_schema = input_schema
    self._steps = steps
    self._validation = validation

is_valid property

is_valid: bool

Whether the pipeline passed validation.

Returns:

Type Description
bool

True if validation passed, False otherwise.

errors property

errors: list[ValidationError]

Validation errors.

Returns:

Type Description
list[ValidationError]

List of validation errors.

steps property

steps: list[DryRunStep]

List of dry run steps.

Returns:

Type Description
list[DryRunStep]

List of DryRunStep instances.

input_schema property

input_schema: dict[str, DataType]

Input schema.

Returns:

Type Description
dict[str, DataType]

Dictionary mapping column names to dtypes.

output_schema property

output_schema: dict[str, str]

Predicted output schema after all operations.

Returns:

Type Description
dict[str, str]

Dictionary mapping column names to dtype names.

input_columns property

input_columns: list[str]

Input column names.

Returns:

Type Description
list[str]

List of input column names.

output_columns property

output_columns: list[str]

Predicted output column names.

Returns:

Type Description
list[str]

List of predicted output column names.

summary

summary(*, show_params: bool = True, show_schema: bool = False) -> str

Generate a human-readable summary.

Parameters:

Name Type Description Default
show_params bool

Whether to show operation parameters.

True
show_schema bool

Whether to show full schema at each step.

False

Returns:

Type Description
str

Formatted string.

Source code in transformplan/validation.py
def summary(self, *, show_params: bool = True, show_schema: bool = False) -> str:  # noqa: C901
    """Generate a human-readable summary.

    Args:
        show_params: Whether to show operation parameters.
        show_schema: Whether to show full schema at each step.

    Returns:
        Formatted string.
    """
    lines = []

    # Header
    lines.extend(("=" * 70, "DRY RUN PREVIEW", "=" * 70))

    # Validation status
    if self.is_valid:
        lines.append("✓ Validation: PASSED")
    else:
        lines.append(f"✗ Validation: FAILED ({len(self.errors)} errors)")
        lines.extend(f"  - {err}" for err in self.errors)

    lines.extend(["-" * 70, f"Input: {len(self._input_schema)} columns"])
    if show_schema:
        for col, dtype in self._input_schema.items():
            lines.append(f"  {col}: {dtype_name(dtype)}")

    lines.extend(
        [
            "-" * 70,
            "",
            f"{'#':<4} {'Operation':<20} {'Columns':<15} {'Changes':<30}",
            "-" * 70,
        ]
    )

    for step in self._steps:
        step_num = str(step.step)
        op = step.operation
        col_count = len(step.schema_after)

        # Build changes string
        changes = []
        if step.columns_added:
            changes.append(f"+{step.columns_added}")
        if step.columns_removed:
            changes.append(f"-{step.columns_removed}")
        if step.columns_modified:
            changes.append(f"~{step.columns_modified}")
        changes_str = " ".join(changes) if changes else "-"

        # Error marker
        err_marker = " ✗" if step.error else ""

        lines.append(
            f"{step_num:<4} {op:<20} {col_count:<15} {changes_str:<30}{err_marker}"
        )

        # Params
        if show_params and step.params:
            params_str = _format_params_short(step.params)
            lines.append(f"     └─ {params_str}")

        # Error detail
        if step.error:
            lines.append(f"     └─ ERROR: {step.error}")

        # Full schema
        if show_schema:
            lines.append(f"     Schema: {step.schema_after}")

    lines.extend(["=" * 70, f"Output: {len(self.output_schema)} columns"])
    if show_schema:
        for col, dtype in self.output_schema.items():
            lines.append(f"  {col}: {dtype}")

    return "\n".join(lines)

print

print(*, show_params: bool = True, show_schema: bool = False) -> None

Print the dry run summary.

Source code in transformplan/validation.py
def print(self, *, show_params: bool = True, show_schema: bool = False) -> None:
    """Print the dry run summary."""
    print(self.summary(show_params=show_params, show_schema=show_schema))  # noqa: T201

DryRunStep

DryRunStep dataclass

DryRunStep(
    step: int,
    operation: str,
    params: dict[str, Any],
    schema_before: dict[str, str],
    schema_after: dict[str, str],
    columns_added: list[str],
    columns_removed: list[str],
    columns_modified: list[str],
    error: str | None = None,
)

A single step in a dry run.

Example: Validation

from transformplan import TransformPlan, Col

df = pl.DataFrame({
    "name": ["Alice", "Bob"],
    "age": [25, 30],
    "salary": [50000, 60000]
})

plan = (
    TransformPlan()
    .col_drop("age")
    .rows_filter(Col("age") > 18)  # Error: age was dropped!
)

result = plan.validate(df)
print(result)
# ValidationResult(valid=False, errors=1)

for error in result.errors:
    print(error)
# Step 2 (rows_filter): Column 'age' does not exist

Example: Dry Run

plan = (
    TransformPlan()
    .col_drop("temp")
    .col_add("bonus", value=1000)
    .math_multiply("salary", 1.1)
)

preview = plan.dry_run(df)
preview.print()

Output:

======================================================================
DRY RUN PREVIEW
======================================================================
Validation: PASSED
----------------------------------------------------------------------
Input: 3 columns
----------------------------------------------------------------------

#    Operation            Columns        Changes
----------------------------------------------------------------------
1    col_drop             2              -['temp']
     -> column='temp'
2    col_add              3              +['bonus']
     -> new_column='bonus', value=1000
3    math_multiply        3              ~['salary']
     -> column='salary', value=1.1
======================================================================
Output: 3 columns

Type Checking

Validation includes type checking for operations that require specific types:

Operation Type Required Column Type
math_* Numeric (Int, Float)
str_* String (Utf8)
dt_* Datetime (Date, Datetime, Time)