Skip to content

Quickstart

This guide walks you through creating your first transformation pipeline with TransformPlan.

Creating a Pipeline

A pipeline is a sequence of operations that transform a DataFrame. Operations are registered using method chaining and executed together when you call process().

import polars as pl
from transformplan import TransformPlan, Col

# Sample data
df = pl.DataFrame({
    "name": ["Alice", "Bob", "Charlie", "Diana"],
    "department": ["Engineering", "Sales", "Engineering", "Sales"],
    "salary": [75000, 65000, 80000, 70000],
    "years": [3, 5, 7, 2]
})

# Build a transformation plan
plan = (
    TransformPlan()
    .col_rename(column="name", new_name="employee")
    .math_multiply(column="salary", value=1.05)  # 5% raise
    .math_round(column="salary", decimals=0)
    .rows_filter(Col("years") >= 3)
)

Validating Before Execution

Before processing data, validate that all operations are compatible with the DataFrame schema:

result = plan.validate(df)
print(result)  # ValidationResult(valid=True)

if result.is_valid:
    print("Pipeline is valid!")
else:
    for error in result.errors:
        print(f"Error: {error}")

Dry Run Preview

See what a pipeline will do without actually executing it:

preview = plan.dry_run(df)
preview.print()

This shows each step with columns added, removed, or modified.

Processing Data

Execute the pipeline and get the transformed data along with an audit protocol:

df_result, protocol = plan.process(df)

print(df_result)
# shape: (3, 4)
# +----------+-------------+--------+-------+
# | employee | department  | salary | years |
# +----------+-------------+--------+-------+
# | Alice    | Engineering | 78750  | 3     |
# | Bob      | Sales       | 68250  | 5     |
# | Charlie  | Engineering | 84000  | 7     |
# +----------+-------------+--------+-------+

Viewing the Audit Protocol

The protocol captures complete transformation history:

protocol.print()

Output shows:

  • Input/output hashes for reproducibility verification
  • Each operation with parameters
  • Row and column changes at each step
  • Execution time per operation

Filtering Rows

Use the Col class to build filter expressions:

from transformplan import Col

# Simple comparison
plan = TransformPlan().rows_filter(Col("age") >= 18)

# Multiple conditions
plan = TransformPlan().rows_filter(
    (Col("status") == "active") & (Col("score") >= 50)
)

# String operations
plan = TransformPlan().rows_filter(
    Col("email").str_contains("@company.com")
)

Saving and Loading Pipelines

Pipelines can be serialized to JSON for storage or sharing:

# Save to JSON
plan.to_json("pipeline.json")

# Load from JSON
loaded_plan = TransformPlan.from_json("pipeline.json")

# Or work with strings
json_str = plan.to_json()
plan_from_str = TransformPlan.from_json(json_str)

Processing Large Files

For Parquet files that exceed available RAM, use process_chunked():

# Process a large file in chunks
result, protocol = plan.process_chunked(
    source="large_dataset.parquet",
    chunk_size=100_000,  # Rows per chunk
)

protocol.print()  # Shows per-chunk statistics

When using operations that need related rows together (like rows_unique), specify a partition key:

plan = (
    TransformPlan()
    .col_rename(column="PatientID", new_name="patient_id")
    .rows_unique(columns=["patient_id"])  # Needs all patient rows together
)

result, protocol = plan.process_chunked(
    source="patients.parquet",
    partition_key="patient_id",  # Keep patient rows in same chunk
    chunk_size=50_000,
)

Validate compatibility before processing:

validation = plan.validate_chunked(
    schema=df.schema,
    partition_key="patient_id"
)

if not validation.is_valid:
    print(validation.errors)

Operation Restrictions

Some operations cannot be used with chunked processing: rows_sort, rows_pivot, rows_sample, rows_head, rows_tail. See Chunked Processing for details.

Next Steps