Quickstart¶
This guide walks you through creating your first transformation pipeline with TransformPlan.
Creating a Pipeline¶
A pipeline is a sequence of operations that transform a DataFrame. Operations are registered using method chaining and executed together when you call process().
import polars as pl
from transformplan import TransformPlan, Col
# Sample data
df = pl.DataFrame({
"name": ["Alice", "Bob", "Charlie", "Diana"],
"department": ["Engineering", "Sales", "Engineering", "Sales"],
"salary": [75000, 65000, 80000, 70000],
"years": [3, 5, 7, 2]
})
# Build a transformation plan
plan = (
TransformPlan()
.col_rename(column="name", new_name="employee")
.math_multiply(column="salary", value=1.05) # 5% raise
.math_round(column="salary", decimals=0)
.rows_filter(Col("years") >= 3)
)
Validating Before Execution¶
Before processing data, validate that all operations are compatible with the DataFrame schema:
result = plan.validate(df)
print(result) # ValidationResult(valid=True)
if result.is_valid:
print("Pipeline is valid!")
else:
for error in result.errors:
print(f"Error: {error}")
Dry Run Preview¶
See what a pipeline will do without actually executing it:
This shows each step with columns added, removed, or modified.
Processing Data¶
Execute the pipeline and get the transformed data along with an audit protocol:
df_result, protocol = plan.process(df)
print(df_result)
# shape: (3, 4)
# +----------+-------------+--------+-------+
# | employee | department | salary | years |
# +----------+-------------+--------+-------+
# | Alice | Engineering | 78750 | 3 |
# | Bob | Sales | 68250 | 5 |
# | Charlie | Engineering | 84000 | 7 |
# +----------+-------------+--------+-------+
Viewing the Audit Protocol¶
The protocol captures complete transformation history:
Output shows:
- Input/output hashes for reproducibility verification
- Each operation with parameters
- Row and column changes at each step
- Execution time per operation
Filtering Rows¶
Use the Col class to build filter expressions:
from transformplan import Col
# Simple comparison
plan = TransformPlan().rows_filter(Col("age") >= 18)
# Multiple conditions
plan = TransformPlan().rows_filter(
(Col("status") == "active") & (Col("score") >= 50)
)
# String operations
plan = TransformPlan().rows_filter(
Col("email").str_contains("@company.com")
)
Saving and Loading Pipelines¶
Pipelines can be serialized to JSON for storage or sharing:
# Save to JSON
plan.to_json("pipeline.json")
# Load from JSON
loaded_plan = TransformPlan.from_json("pipeline.json")
# Or work with strings
json_str = plan.to_json()
plan_from_str = TransformPlan.from_json(json_str)
Processing Large Files¶
For Parquet files that exceed available RAM, use process_chunked():
# Process a large file in chunks
result, protocol = plan.process_chunked(
source="large_dataset.parquet",
chunk_size=100_000, # Rows per chunk
)
protocol.print() # Shows per-chunk statistics
When using operations that need related rows together (like rows_unique), specify a partition key:
plan = (
TransformPlan()
.col_rename(column="PatientID", new_name="patient_id")
.rows_unique(columns=["patient_id"]) # Needs all patient rows together
)
result, protocol = plan.process_chunked(
source="patients.parquet",
partition_key="patient_id", # Keep patient rows in same chunk
chunk_size=50_000,
)
Validate compatibility before processing:
validation = plan.validate_chunked(
schema=df.schema,
partition_key="patient_id"
)
if not validation.is_valid:
print(validation.errors)
Operation Restrictions
Some operations cannot be used with chunked processing:
rows_sort, rows_pivot, rows_sample, rows_head, rows_tail.
See Chunked Processing for details.
Next Steps¶
- Explore the API Reference for all available operations
- Learn about Filters for complex row filtering
- Understand Protocols for audit trails
- Process large files with Chunked Processing