TransformPlan¶
The main class for building and executing transformation pipelines.
Overview¶
TransformPlan uses a deferred execution model: operations are registered via method chaining, then executed together when you call process(), validate(), or dry_run().
from transformplan import TransformPlan, Col
plan = (
TransformPlan()
.col_drop("temp_column")
.math_multiply("price", 1.1)
.rows_filter(Col("active") == True)
)
# Execute
df_result, protocol = plan.process(df)
Class Reference¶
TransformPlan
¶
Bases: TransformPlanBase, ColumnOps, DatetimeOps, MapOps, MathOps, RowOps, StrOps
Data processor with tracked transformations.
Usage
result, protocol = ( TransformPlan() .col_drop("temp") .math_multiply("price", 1.1) .rows_filter(Col("active") == True) .process(df) )
Source code in transformplan/core.py
Execution Methods¶
process¶
Execute all registered operations and return transformed data with an audit protocol.
validate¶
Validate operations against the DataFrame schema without executing.
dry_run¶
Preview what the pipeline will do without executing it.
Chunked Processing¶
For large Parquet files that exceed available RAM, use chunked processing methods.
process_chunked¶
Process a large Parquet file in chunks, optionally keeping related rows together.
result, protocol = plan.process_chunked(
source="large_file.parquet",
partition_key="patient_id", # Keep patient rows together
chunk_size=100_000,
)
protocol.print()
See Chunked Processing for details on operation compatibility.
validate_chunked¶
Validate that a pipeline is compatible with chunked processing before executing.
validation = plan.validate_chunked(
schema={"id": pl.Int64, "name": pl.Utf8},
partition_key="id"
)
if not validation.is_valid:
print(validation.errors)
Serialization¶
Pipelines can be saved and loaded as JSON:
Or generate executable Python code: