TransformPlan¶
The main class for building and executing transformation pipelines.
Overview¶
TransformPlan uses a deferred execution model: operations are registered via method chaining, then executed together when you call process(), validate(), or dry_run(). The plan itself is backend-agnostic — the backend is chosen at execution time (defaults to PolarsBackend).
from transformplan import TransformPlan, Col
plan = (
TransformPlan()
.col_drop("temp_column")
.math_multiply("price", 1.1)
.rows_filter(Col("active") == True)
)
# Execute
df_result, protocol = plan.process(df)
Backend Selection¶
The backend is passed at execution time, not at construction:
from transformplan.backends.duckdb import DuckDBBackend
plan = TransformPlan().col_drop("temp").math_add("age", 1)
# Default (Polars)
result, protocol = plan.process(polars_df)
# DuckDB
con = duckdb.connect()
result, protocol = plan.process(duckdb_rel, backend=DuckDBBackend(con))
See Backends for details on each backend.
Class Reference¶
TransformPlan
¶
Bases: TransformPlanBase, ColumnOps, DatetimeOps, JoinOps, MapOps, MathOps, RowOps, StrOps
Data processor with tracked transformations.
Usage
result, protocol = ( TransformPlan() .col_drop("temp") .math_multiply("price", 1.1) .rows_filter(Col("active") == True) .process(df) )
Initialize TransformPlan.
Source code in transformplan/plan.py
Execution Methods¶
process¶
Execute all registered operations and return transformed data with an audit protocol.
validate¶
Validate operations against the DataFrame schema without executing.
dry_run¶
Preview what the pipeline will do without executing it.
Chunked Processing¶
For large Parquet files that exceed available RAM, use chunked processing methods.
process_chunked¶
Process a large Parquet file in chunks, optionally keeping related rows together.
result, protocol = plan.process_chunked(
source="large_file.parquet",
partition_key="patient_id", # Keep patient rows together
chunk_size=100_000,
)
protocol.print()
See Chunked Processing for details on operation compatibility.
validate_chunked¶
Validate that a pipeline is compatible with chunked processing before executing.
validation = plan.validate_chunked(
schema={"id": pl.Int64, "name": pl.Utf8},
partition_key="id"
)
if not validation.is_valid:
print(validation.errors)
Serialization¶
Pipelines can be saved and loaded as JSON:
Plans are backend-agnostic when serialized — a pipeline saved from a Polars workflow can be loaded and executed with DuckDB, and vice versa.
Or generate executable Python code: