Skip to content

TransformPlan

The main class for building and executing transformation pipelines.

Overview

TransformPlan uses a deferred execution model: operations are registered via method chaining, then executed together when you call process(), validate(), or dry_run().

from transformplan import TransformPlan, Col

plan = (
    TransformPlan()
    .col_drop("temp_column")
    .math_multiply("price", 1.1)
    .rows_filter(Col("active") == True)
)

# Execute
df_result, protocol = plan.process(df)

Class Reference

TransformPlan

TransformPlan()

Bases: TransformPlanBase, ColumnOps, DatetimeOps, MapOps, MathOps, RowOps, StrOps

Data processor with tracked transformations.

Usage

result, protocol = ( TransformPlan() .col_drop("temp") .math_multiply("price", 1.1) .rows_filter(Col("active") == True) .process(df) )

Source code in transformplan/core.py
def __init__(self) -> None:
    """Initialize an empty TransformPlanBase."""
    self._operations: list[tuple[Callable[..., pl.DataFrame], dict[str, Any]]] = []

Execution Methods

process

Execute all registered operations and return transformed data with an audit protocol.

df_result, protocol = plan.process(df)

validate

Validate operations against the DataFrame schema without executing.

result = plan.validate(df)
if not result.is_valid:
    for error in result.errors:
        print(error)

dry_run

Preview what the pipeline will do without executing it.

preview = plan.dry_run(df)
preview.print()

Chunked Processing

For large Parquet files that exceed available RAM, use chunked processing methods.

process_chunked

Process a large Parquet file in chunks, optionally keeping related rows together.

result, protocol = plan.process_chunked(
    source="large_file.parquet",
    partition_key="patient_id",  # Keep patient rows together
    chunk_size=100_000,
)
protocol.print()

See Chunked Processing for details on operation compatibility.

validate_chunked

Validate that a pipeline is compatible with chunked processing before executing.

validation = plan.validate_chunked(
    schema={"id": pl.Int64, "name": pl.Utf8},
    partition_key="id"
)
if not validation.is_valid:
    print(validation.errors)

Serialization

Pipelines can be saved and loaded as JSON:

# Save
plan.to_json("pipeline.json")

# Load
loaded = TransformPlan.from_json("pipeline.json")

Or generate executable Python code:

print(plan.to_python())