Skip to content

Chunked Processing

Process large Parquet files that exceed available RAM by reading and transforming data in chunks.

Overview

When working with files larger than available memory, use process_chunked() instead of process(). This method:

  • Reads Parquet files in configurable chunks using lazy evaluation
  • Optionally keeps related rows together using a partition key
  • Validates that operations are compatible with chunked processing
  • Returns a ChunkedProtocol with per-chunk statistics
from transformplan import TransformPlan, Col

plan = (
    TransformPlan()
    .col_rename(column="PatientID", new_name="patient_id")
    .rows_filter(Col("age") >= 18)
    .rows_unique(columns=["patient_id", "visit_date"])
)

# Process a large file in chunks
result, protocol = plan.process_chunked(
    source="patients_10gb.parquet",
    partition_key="patient_id",  # Keep patient rows together
    chunk_size=100_000,
)

protocol.print()

Operation Compatibility

Not all operations can be used with chunked processing. Operations are classified into three categories:

Chunkable Operations

These operations process each row independently and work with any chunk:

Category Operations
Column col_drop, col_rename, col_cast, col_reorder, col_select, col_duplicate, col_fill_null, col_drop_null, col_drop_zero, col_add, col_add_uuid, col_hash, col_coalesce
Math math_add, math_subtract, math_multiply, math_divide, math_clamp, math_abs, math_round, math_set_min, math_set_max, math_add_columns, math_subtract_columns, math_multiply_columns, math_divide_columns, math_percent_of
String str_replace, str_slice, str_truncate, str_lower, str_upper, str_strip, str_pad, str_split, str_concat, str_extract
Datetime dt_year, dt_month, dt_day, dt_week, dt_quarter, dt_year_month, dt_quarter_year, dt_calendar_week, dt_parse, dt_format, dt_diff_days, dt_age_years, dt_is_between, dt_truncate
Map map_values, map_discretize, map_bool_to_int, map_null_to_value, map_value_to_null, map_case, map_from_column
Rows rows_filter, rows_drop, rows_flag, rows_explode, rows_drop_nulls, rows_melt

Group-Dependent Operations

These operations need all rows for a group together. They work with chunked processing only when partition_key includes their grouping columns:

Operation Group Parameter Requirement
rows_unique columns partition_key must contain columns
rows_deduplicate columns partition_key must contain columns
math_cumsum group_by partition_key must contain group_by
math_rank group_by partition_key must contain group_by

Example: To use rows_unique(columns=["patient_id"]), you must set partition_key="patient_id" (or a list containing it).

Global Operations (Blocked)

These operations require the full dataset and cannot be used with chunked processing:

  • rows_sort - Requires global ordering
  • rows_pivot - Needs all values to determine output columns
  • rows_sample - Random sampling requires full dataset
  • rows_head - Requires global ordering
  • rows_tail - Requires global ordering

Attempting to use these operations will raise a ChunkingError.

Validation

Before processing, validate that your pipeline is compatible with chunked processing:

# Validate without processing
validation = plan.validate_chunked(
    schema={"patient_id": pl.Utf8, "age": pl.Int64, "visit_date": pl.Date},
    partition_key="patient_id"
)

print(validation)
# Pipeline is compatible with chunked processing.

# Or validate with a sample DataFrame
validation = plan.validate_chunked(data=sample_df, partition_key="patient_id")

if not validation.is_valid:
    for error in validation.errors:
        print(f"Error: {error}")

ChunkedProtocol Class

ChunkedProtocol

ChunkedProtocol()

Protocol for tracking chunked processing with per-chunk information.

Tracks the overall processing as well as individual chunk statistics.

Attributes:

Name Type Description
VERSION

Protocol version string.

Initialize an empty ChunkedProtocol.

Source code in transformplan/chunking.py
def __init__(self) -> None:
    """Initialize an empty ChunkedProtocol."""
    self._chunks: list[ChunkInfo] = []
    self._source_path: str | None = None
    self._partition_key: list[str] | None = None
    self._chunk_size: int | None = None
    self._created_at: str = datetime.now(timezone.utc).isoformat()
    self._metadata: dict[str, Any] = {}
    self._operations: list[dict[str, Any]] = []

chunks property

chunks: list[ChunkInfo]

List of chunk information.

Returns:

Type Description
list[ChunkInfo]

List of ChunkInfo instances.

total_input_rows property

total_input_rows: int

Total rows across all input chunks.

Returns:

Type Description
int

Sum of input rows.

total_output_rows property

total_output_rows: int

Total rows across all output chunks.

Returns:

Type Description
int

Sum of output rows.

total_elapsed_seconds property

total_elapsed_seconds: float

Total processing time across all chunks.

Returns:

Type Description
float

Sum of elapsed seconds.

num_chunks property

num_chunks: int

Number of chunks processed.

Returns:

Type Description
int

Count of chunks.

metadata property

metadata: dict[str, Any]

Protocol metadata.

Returns:

Type Description
dict[str, Any]

Dictionary of metadata.

set_source

set_source(path: str, partition_key: list[str] | None, chunk_size: int) -> None

Set source file information.

Source code in transformplan/chunking.py
def set_source(
    self,
    path: str,
    partition_key: list[str] | None,
    chunk_size: int,
) -> None:
    """Set source file information."""
    self._source_path = path
    self._partition_key = partition_key
    self._chunk_size = chunk_size

set_operations

set_operations(operations: list[dict[str, Any]]) -> None

Record the operations that were applied.

Source code in transformplan/chunking.py
def set_operations(self, operations: list[dict[str, Any]]) -> None:
    """Record the operations that were applied."""
    self._operations = operations

set_metadata

set_metadata(**kwargs: Any) -> None

Set arbitrary metadata on the protocol.

Source code in transformplan/chunking.py
def set_metadata(self, **kwargs: Any) -> None:  # noqa: ANN401
    """Set arbitrary metadata on the protocol."""
    self._metadata.update(kwargs)

add_chunk

add_chunk(chunk_info: ChunkInfo) -> None

Add information about a processed chunk.

Source code in transformplan/chunking.py
def add_chunk(self, chunk_info: ChunkInfo) -> None:
    """Add information about a processed chunk."""
    self._chunks.append(chunk_info)

output_hash

output_hash() -> str

Compute a combined hash of all output chunk hashes.

Returns:

Type Description
str

A 16-character hex hash of all chunk output hashes combined.

Source code in transformplan/chunking.py
def output_hash(self) -> str:
    """Compute a combined hash of all output chunk hashes.

    Returns:
        A 16-character hex hash of all chunk output hashes combined.
    """
    if not self._chunks:
        return ""
    combined = "|".join(c.output_hash for c in self._chunks)
    return hashlib.sha256(combined.encode()).hexdigest()[:16]

to_dict

to_dict() -> dict[str, Any]

Serialize protocol to a dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation of the protocol.

Source code in transformplan/chunking.py
def to_dict(self) -> dict[str, Any]:
    """Serialize protocol to a dictionary.

    Returns:
        Dictionary representation of the protocol.
    """
    return {
        "version": self.VERSION,
        "created_at": self._created_at,
        "metadata": self._metadata,
        "source": {
            "path": self._source_path,
            "partition_key": self._partition_key,
            "chunk_size": self._chunk_size,
        },
        "operations": self._operations,
        "summary": {
            "num_chunks": self.num_chunks,
            "total_input_rows": self.total_input_rows,
            "total_output_rows": self.total_output_rows,
            "total_elapsed_seconds": round(self.total_elapsed_seconds, 4),
            "output_hash": self.output_hash(),
        },
        "chunks": [
            {
                "chunk_index": c.chunk_index,
                "input_rows": c.input_rows,
                "output_rows": c.output_rows,
                "input_hash": c.input_hash,
                "output_hash": c.output_hash,
                "elapsed_seconds": round(c.elapsed_seconds, 4),
            }
            for c in self._chunks
        ],
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> ChunkedProtocol

Deserialize protocol from a dictionary.

Returns:

Type Description
ChunkedProtocol

ChunkedProtocol instance.

Source code in transformplan/chunking.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ChunkedProtocol:
    """Deserialize protocol from a dictionary.

    Returns:
        ChunkedProtocol instance.
    """
    protocol = cls()
    protocol._created_at = data.get("created_at", protocol._created_at)
    protocol._metadata = data.get("metadata", {})

    source = data.get("source", {})
    protocol._source_path = source.get("path")
    protocol._partition_key = source.get("partition_key")
    protocol._chunk_size = source.get("chunk_size")

    protocol._operations = data.get("operations", [])

    for chunk_data in data.get("chunks", []):
        protocol._chunks.append(
            ChunkInfo(
                chunk_index=chunk_data["chunk_index"],
                input_rows=chunk_data["input_rows"],
                output_rows=chunk_data["output_rows"],
                input_hash=chunk_data["input_hash"],
                output_hash=chunk_data["output_hash"],
                elapsed_seconds=chunk_data["elapsed_seconds"],
            )
        )

    return protocol

to_json

to_json(path: str | Path | None = None, indent: int = 2) -> str

Serialize protocol to JSON.

Parameters:

Name Type Description Default
path str | Path | None

Optional file path to write to.

None
indent int

JSON indentation level.

2

Returns:

Type Description
str

JSON string.

Source code in transformplan/chunking.py
def to_json(self, path: str | Path | None = None, indent: int = 2) -> str:
    """Serialize protocol to JSON.

    Args:
        path: Optional file path to write to.
        indent: JSON indentation level.

    Returns:
        JSON string.
    """
    json_str = json.dumps(self.to_dict(), indent=indent)

    if path is not None:
        Path(path).write_text(json_str)

    return json_str

from_json classmethod

from_json(source: str | Path) -> ChunkedProtocol

Deserialize protocol from JSON.

Parameters:

Name Type Description Default
source str | Path

Either a JSON string or a path to a JSON file.

required

Returns:

Type Description
ChunkedProtocol

ChunkedProtocol instance.

Source code in transformplan/chunking.py
@classmethod
def from_json(cls, source: str | Path) -> ChunkedProtocol:
    """Deserialize protocol from JSON.

    Args:
        source: Either a JSON string or a path to a JSON file.

    Returns:
        ChunkedProtocol instance.
    """
    if isinstance(source, Path) or not source.strip().startswith("{"):
        content = Path(source).read_text()
    else:
        content = source

    return cls.from_dict(json.loads(content))

summary

summary() -> str

Generate a human-readable summary of the chunked processing.

Returns:

Type Description
str

Formatted string summary of the protocol.

Source code in transformplan/chunking.py
def summary(self) -> str:
    """Generate a human-readable summary of the chunked processing.

    Returns:
        Formatted string summary of the protocol.
    """
    lines = [
        "=" * 70,
        "CHUNKED PROCESSING PROTOCOL",
        "=" * 70,
    ]

    if self._metadata:
        for key, value in self._metadata.items():
            lines.append(f"{key}: {value}")
        lines.append("-" * 70)

    # Source info
    if self._source_path:
        lines.append(f"Source: {self._source_path}")
    if self._partition_key:
        lines.append(f"Partition key: {self._partition_key}")
    if self._chunk_size:
        lines.append(f"Target chunk size: {self._chunk_size:,}")
    lines.extend(
        [
            "-" * 70,
            f"Chunks processed: {self.num_chunks}",
            f"Total input rows: {self.total_input_rows:,}",
            f"Total output rows: {self.total_output_rows:,}",
        ]
    )
    rows_diff = self.total_output_rows - self.total_input_rows
    if rows_diff != 0:
        lines.append(f"Row change: {rows_diff:+,}")
    lines.append(f"Total time: {self.total_elapsed_seconds:.4f}s")
    if self.num_chunks > 0:
        avg_time = self.total_elapsed_seconds / self.num_chunks
        lines.append(f"Avg time per chunk: {avg_time:.4f}s")
    lines.extend((f"Output hash: {self.output_hash()}", "-" * 70))

    # Per-chunk details
    if self._chunks:
        lines.extend(
            (
                "",
                f"{'#':<6} {'Input':<12} {'Output':<12} {'Change':<10} {'Time':<10} {'Hash':<16}",
                "-" * 70,
            )
        )

        for chunk in self._chunks:
            idx = str(chunk.chunk_index)
            input_rows = f"{chunk.input_rows:,}"
            output_rows = f"{chunk.output_rows:,}"
            change = chunk.output_rows - chunk.input_rows
            change_str = f"{change:+,}" if change != 0 else "-"
            time_str = f"{chunk.elapsed_seconds:.4f}s"
            hash_str = chunk.output_hash

            lines.append(
                f"{idx:<6} {input_rows:<12} {output_rows:<12} {change_str:<10} {time_str:<10} {hash_str:<16}"
            )

    lines.append("=" * 70)
    return "\n".join(lines)

print

print() -> None

Print the protocol summary to stdout.

Source code in transformplan/chunking.py
def print(self) -> None:
    """Print the protocol summary to stdout."""
    print(self.summary())  # noqa: T201

ChunkValidationResult Class

ChunkValidationResult dataclass

ChunkValidationResult(
    is_valid: bool,
    errors: list[str] = list(),
    warnings: list[str] = list(),
    global_operations: list[str] = list(),
    group_dependent_ops: list[tuple[str, list[str] | None]] = list(),
)

Result of validating a pipeline for chunked processing.

Attributes:

Name Type Description
is_valid bool

Whether the pipeline can be processed in chunks.

errors list[str]

List of error messages explaining incompatibilities.

warnings list[str]

List of warning messages (non-blocking).

global_operations list[str]

Names of operations that require full dataset.

group_dependent_ops list[tuple[str, list[str] | None]]

List of (operation, columns) for group-dependent ops.

is_valid instance-attribute

is_valid: bool

errors class-attribute instance-attribute

errors: list[str] = field(default_factory=list)

warnings class-attribute instance-attribute

warnings: list[str] = field(default_factory=list)

global_operations class-attribute instance-attribute

global_operations: list[str] = field(default_factory=list)

group_dependent_ops class-attribute instance-attribute

group_dependent_ops: list[tuple[str, list[str] | None]] = field(
    default_factory=list
)

ChunkingError Exception

ChunkingError

ChunkingError(
    message: str, validation_result: ChunkValidationResult | None = None
)

Bases: Exception

Raised when a pipeline is incompatible with chunked processing.

Attributes:

Name Type Description
validation_result

The validation result containing error details.

Initialize ChunkingError with message and optional validation result.

Source code in transformplan/chunking.py
def __init__(
    self, message: str, validation_result: ChunkValidationResult | None = None
) -> None:
    """Initialize ChunkingError with message and optional validation result."""
    super().__init__(message)
    self.validation_result = validation_result

Example: Processing Patient Records

import polars as pl
from transformplan import TransformPlan, Col, ChunkingError

# Build pipeline with group-dependent operation
plan = (
    TransformPlan()
    .col_rename(column="PatientID", new_name="patient_id")
    .dt_age_years(birth_column="date_of_birth", new_column="age")
    .rows_filter(Col("age") >= 18)
    .rows_unique(columns=["patient_id", "visit_date"])  # Needs partition key
    .col_drop("date_of_birth")
)

# Validate first
validation = plan.validate_chunked(
    schema={
        "PatientID": pl.Utf8,
        "date_of_birth": pl.Date,
        "visit_date": pl.Date,
        "diagnosis": pl.Utf8,
    },
    partition_key="PatientID"
)

if validation.is_valid:
    # Process the large file
    result, protocol = plan.process_chunked(
        source="patients_archive.parquet",
        partition_key="PatientID",
        chunk_size=50_000,
    )

    # View processing summary
    protocol.print()

    # Save audit trail
    protocol.to_json("chunked_audit.json")
else:
    print("Pipeline incompatible with chunking:")
    for error in validation.errors:
        print(f"  - {error}")

Example Output

======================================================================
CHUNKED PROCESSING PROTOCOL
======================================================================
Source: patients_archive.parquet
Partition key: ['PatientID']
Target chunk size: 50,000
----------------------------------------------------------------------
Chunks processed: 24
Total input rows: 1,187,432
Total output rows: 892,156
Row change: -295,276
Total time: 12.4523s
Avg time per chunk: 0.5188s
Output hash: 7a3b2c1d4e5f6789
----------------------------------------------------------------------

#      Input        Output       Change     Time       Hash
----------------------------------------------------------------------
0      49,832       37,291       -12,541    0.4821s    a1b2c3d4e5f67890
1      50,127       38,456       -11,671    0.5123s    b2c3d4e5f6789012
2      49,956       37,892       -12,064    0.4956s    c3d4e5f678901234
...
======================================================================