Skip to content

TransformPlan

The main class for building and executing transformation pipelines.

Overview

TransformPlan uses a deferred execution model: operations are registered via method chaining, then executed together when you call process(), validate(), or dry_run(). The plan itself is backend-agnostic — the backend is chosen at execution time (defaults to PolarsBackend).

from transformplan import TransformPlan, Col

plan = (
    TransformPlan()
    .col_drop("temp_column")
    .math_multiply("price", 1.1)
    .rows_filter(Col("active") == True)
)

# Execute
df_result, protocol = plan.process(df)

Backend Selection

The backend is passed at execution time, not at construction:

from transformplan.backends.duckdb import DuckDBBackend

plan = TransformPlan().col_drop("temp").math_add("age", 1)

# Default (Polars)
result, protocol = plan.process(polars_df)

# DuckDB
con = duckdb.connect()
result, protocol = plan.process(duckdb_rel, backend=DuckDBBackend(con))

See Backends for details on each backend.

Class Reference

TransformPlan

TransformPlan()

Bases: TransformPlanBase, ColumnOps, DatetimeOps, JoinOps, MapOps, MathOps, RowOps, StrOps

Data processor with tracked transformations.

Usage

result, protocol = ( TransformPlan() .col_drop("temp") .math_multiply("price", 1.1) .rows_filter(Col("active") == True) .process(df) )

Initialize TransformPlan.

Source code in transformplan/plan.py
def __init__(self) -> None:
    """Initialize TransformPlan."""
    super().__init__()

Execution Methods

process

Execute all registered operations and return transformed data with an audit protocol.

df_result, protocol = plan.process(df)

validate

Validate operations against the DataFrame schema without executing.

result = plan.validate(df)
if not result.is_valid:
    for error in result.errors:
        print(error)

dry_run

Preview what the pipeline will do without executing it.

preview = plan.dry_run(df)
preview.print()

Chunked Processing

For large Parquet files that exceed available RAM, use chunked processing methods.

process_chunked

Process a large Parquet file in chunks, optionally keeping related rows together.

result, protocol = plan.process_chunked(
    source="large_file.parquet",
    partition_key="patient_id",  # Keep patient rows together
    chunk_size=100_000,
)
protocol.print()

See Chunked Processing for details on operation compatibility.

validate_chunked

Validate that a pipeline is compatible with chunked processing before executing.

validation = plan.validate_chunked(
    schema={"id": pl.Int64, "name": pl.Utf8},
    partition_key="id"
)
if not validation.is_valid:
    print(validation.errors)

Serialization

Pipelines can be saved and loaded as JSON:

# Save
plan.to_json("pipeline.json")

# Load
loaded = TransformPlan.from_json("pipeline.json")

Plans are backend-agnostic when serialized — a pipeline saved from a Polars workflow can be loaded and executed with DuckDB, and vice versa.

Or generate executable Python code:

print(plan.to_python())