Chunked Processing¶
Process large Parquet files that exceed available RAM by reading and transforming data in chunks.
Polars Only
Chunked processing is designed for Polars DataFrames and Parquet files. DuckDB handles large datasets natively through its out-of-core execution engine — no chunking needed.
Overview¶
When working with files larger than available memory, use process_chunked() instead of process(). This method:
- Reads Parquet files in configurable chunks using lazy evaluation
- Optionally keeps related rows together using a partition key
- Validates that operations are compatible with chunked processing
- Returns a
ChunkedProtocolwith per-chunk statistics
from transformplan import TransformPlan, Col
plan = (
TransformPlan()
.col_rename(column="PatientID", new_name="patient_id")
.rows_filter(Col("age") >= 18)
.rows_unique(columns=["patient_id", "visit_date"])
)
# Process a large file in chunks
result, protocol = plan.process_chunked(
source="patients_10gb.parquet",
partition_key="patient_id", # Keep patient rows together
chunk_size=100_000,
)
protocol.print()
Operation Compatibility¶
Not all operations can be used with chunked processing. Operations are classified into three categories:
Chunkable Operations¶
These operations process each row independently and work with any chunk:
| Category | Operations |
|---|---|
| Column | col_drop, col_rename, col_cast, col_reorder, col_select, col_duplicate, col_fill_null, col_drop_null, col_drop_zero, col_add, col_add_uuid, col_hash, col_coalesce |
| Math | math_add, math_subtract, math_multiply, math_divide, math_clamp, math_abs, math_round, math_set_min, math_set_max, math_add_columns, math_subtract_columns, math_multiply_columns, math_divide_columns, math_percent_of |
| String | str_replace, str_slice, str_truncate, str_lower, str_upper, str_strip, str_pad, str_split, str_concat, str_extract |
| Datetime | dt_year, dt_month, dt_day, dt_week, dt_quarter, dt_year_month, dt_quarter_year, dt_calendar_week, dt_parse, dt_format, dt_diff_days, dt_age_years, dt_is_between, dt_truncate |
| Map | map_values, map_discretize, map_bool_to_int, map_null_to_value, map_value_to_null, map_case, map_from_column |
| Rows | rows_filter, rows_drop, rows_flag, rows_explode, rows_drop_nulls, rows_melt |
Group-Dependent Operations¶
These operations need all rows for a group together. They work with chunked processing only when partition_key includes their grouping columns:
| Operation | Group Parameter | Requirement |
|---|---|---|
rows_unique |
columns |
partition_key must contain columns |
rows_deduplicate |
columns |
partition_key must contain columns |
math_cumsum |
group_by |
partition_key must contain group_by |
math_rank |
group_by |
partition_key must contain group_by |
math_diff_from_agg |
group_by |
partition_key must contain group_by |
Example: To use rows_unique(columns=["patient_id"]), you must set partition_key="patient_id" (or a list containing it).
Global Operations (Blocked)¶
These operations require the full dataset and cannot be used with chunked processing:
rows_sort- Requires global orderingrows_pivot- Needs all values to determine output columnsrows_sample- Random sampling requires full datasetrows_head- Requires global orderingrows_tail- Requires global ordering
Attempting to use these operations will raise a ChunkingError.
Validation¶
Before processing, validate that your pipeline is compatible with chunked processing:
# Validate without processing
validation = plan.validate_chunked(
schema={"patient_id": pl.Utf8, "age": pl.Int64, "visit_date": pl.Date},
partition_key="patient_id"
)
print(validation)
# Pipeline is compatible with chunked processing.
# Or validate with a sample DataFrame
validation = plan.validate_chunked(data=sample_df, partition_key="patient_id")
if not validation.is_valid:
for error in validation.errors:
print(f"Error: {error}")
ChunkedProtocol Class¶
ChunkedProtocol
¶
Protocol for tracking chunked processing with per-chunk information.
Tracks the overall processing as well as individual chunk statistics.
Attributes:
| Name | Type | Description |
|---|---|---|
VERSION |
Protocol version string. |
Initialize an empty ChunkedProtocol.
Source code in transformplan/chunking.py
chunks
property
¶
List of chunk information.
Returns:
| Type | Description |
|---|---|
list[ChunkInfo]
|
List of ChunkInfo instances. |
total_input_rows
property
¶
Total rows across all input chunks.
Returns:
| Type | Description |
|---|---|
int
|
Sum of input rows. |
total_output_rows
property
¶
Total rows across all output chunks.
Returns:
| Type | Description |
|---|---|
int
|
Sum of output rows. |
total_elapsed_seconds
property
¶
Total processing time across all chunks.
Returns:
| Type | Description |
|---|---|
float
|
Sum of elapsed seconds. |
num_chunks
property
¶
Number of chunks processed.
Returns:
| Type | Description |
|---|---|
int
|
Count of chunks. |
metadata
property
¶
Protocol metadata.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary of metadata. |
set_source
¶
Set source file information.
set_operations
¶
set_metadata
¶
add_chunk
¶
output_hash
¶
Compute a combined hash of all output chunk hashes.
Returns:
| Type | Description |
|---|---|
str
|
A 16-character hex hash of all chunk output hashes combined. |
Source code in transformplan/chunking.py
to_dict
¶
Serialize protocol to a dictionary.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of the protocol. |
Source code in transformplan/chunking.py
from_dict
classmethod
¶
from_dict(data: dict[str, Any]) -> ChunkedProtocol
Deserialize protocol from a dictionary.
Returns:
| Type | Description |
|---|---|
ChunkedProtocol
|
ChunkedProtocol instance. |
Source code in transformplan/chunking.py
to_json
¶
Serialize protocol to JSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path | None
|
Optional file path to write to. |
None
|
indent
|
int
|
JSON indentation level. |
2
|
Returns:
| Type | Description |
|---|---|
str
|
JSON string. |
Source code in transformplan/chunking.py
from_json
classmethod
¶
from_json(source: str | Path) -> ChunkedProtocol
Deserialize protocol from JSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str | Path
|
Either a JSON string or a path to a JSON file. |
required |
Returns:
| Type | Description |
|---|---|
ChunkedProtocol
|
ChunkedProtocol instance. |
Source code in transformplan/chunking.py
summary
¶
Generate a human-readable summary of the chunked processing.
Returns:
| Type | Description |
|---|---|
str
|
Formatted string summary of the protocol. |
Source code in transformplan/chunking.py
ChunkValidationResult Class¶
ChunkValidationResult
dataclass
¶
ChunkValidationResult(
is_valid: bool,
errors: list[str] = list(),
warnings: list[str] = list(),
global_operations: list[str] = list(),
group_dependent_ops: list[tuple[str, list[str] | None]] = list(),
)
Result of validating a pipeline for chunked processing.
Attributes:
| Name | Type | Description |
|---|---|---|
is_valid |
bool
|
Whether the pipeline can be processed in chunks. |
errors |
list[str]
|
List of error messages explaining incompatibilities. |
warnings |
list[str]
|
List of warning messages (non-blocking). |
global_operations |
list[str]
|
Names of operations that require full dataset. |
group_dependent_ops |
list[tuple[str, list[str] | None]]
|
List of (operation, columns) for group-dependent ops. |
ChunkingError Exception¶
ChunkingError
¶
ChunkingError(
message: str, validation_result: ChunkValidationResult | None = None
)
Bases: Exception
Raised when a pipeline is incompatible with chunked processing.
Attributes:
| Name | Type | Description |
|---|---|---|
validation_result |
The validation result containing error details. |
Initialize ChunkingError with message and optional validation result.
Source code in transformplan/chunking.py
Example: Processing Patient Records¶
import polars as pl
from transformplan import TransformPlan, Col, ChunkingError
# Build pipeline with group-dependent operation
plan = (
TransformPlan()
.col_rename(column="PatientID", new_name="patient_id")
.dt_age_years(birth_column="date_of_birth", new_column="age")
.rows_filter(Col("age") >= 18)
.rows_unique(columns=["patient_id", "visit_date"]) # Needs partition key
.col_drop("date_of_birth")
)
# Validate first
validation = plan.validate_chunked(
schema={
"PatientID": pl.Utf8,
"date_of_birth": pl.Date,
"visit_date": pl.Date,
"diagnosis": pl.Utf8,
},
partition_key="PatientID"
)
if validation.is_valid:
# Process the large file
result, protocol = plan.process_chunked(
source="patients_archive.parquet",
partition_key="PatientID",
chunk_size=50_000,
)
# View processing summary
protocol.print()
# Save audit trail
protocol.to_json("chunked_audit.json")
else:
print("Pipeline incompatible with chunking:")
for error in validation.errors:
print(f" - {error}")
Example Output¶
======================================================================
CHUNKED PROCESSING PROTOCOL
======================================================================
Source: patients_archive.parquet
Partition key: ['PatientID']
Target chunk size: 50,000
----------------------------------------------------------------------
Chunks processed: 24
Total input rows: 1,187,432
Total output rows: 892,156
Row change: -295,276
Total time: 12.4523s
Avg time per chunk: 0.5188s
Output hash: 7a3b2c1d4e5f6789
----------------------------------------------------------------------
# Input Output Change Time Hash
----------------------------------------------------------------------
0 49,832 37,291 -12,541 0.4821s a1b2c3d4e5f67890
1 50,127 38,456 -11,671 0.5123s b2c3d4e5f6789012
2 49,956 37,892 -12,064 0.4956s c3d4e5f678901234
...
======================================================================