Skip to content

Backends

TransformPlan uses a pluggable backend system. Each backend implements the Backend ABC, providing all 88 operations plus meta methods for hashing, schema inspection, and type classification.

Overview

The backend determines how data is stored and transformed:

  • PolarsBackend (default): Operates on Polars DataFrames using native Polars expressions
  • DuckDBBackend (optional): Operates on DuckDB relations using SQL generation

A TransformPlan is a pure, backend-agnostic recipe of operations. The backend is chosen at execution time by passing it to process(), validate(), or dry_run(). If no backend is specified, PolarsBackend is used by default. Pipelines serialized with one backend can be loaded and executed with another.

from transformplan import TransformPlan

# Build a plan — no backend needed
plan = TransformPlan().col_drop("temp").math_add("age", 1)

# Execute with default PolarsBackend
result, protocol = plan.process(polars_df)

# Execute with DuckDB backend
import duckdb
from transformplan.backends.duckdb import DuckDBBackend
con = duckdb.connect()
result, protocol = plan.process(duckdb_rel, backend=DuckDBBackend(con))

Backend ABC

The abstract base class that all backends must implement.

Backend

Bases: ABC

Abstract base class defining the operation interface for backends.

Each backend must implement all 89 operations. Methods receive data and operation-specific parameters, and return transformed data.

Subclasses must set the name class variable to a unique identifier (e.g. "polars", "duckdb").

compute_hash abstractmethod

compute_hash(data: Any) -> str

Deterministic, order-invariant hash of the data.

Source code in transformplan/backends/base.py
@abstractmethod
def compute_hash(self, data: Any) -> str:
    """Deterministic, order-invariant hash of the data."""
    ...

get_shape abstractmethod

get_shape(data: Any) -> tuple[int, int]

Return (rows, columns) shape.

Source code in transformplan/backends/base.py
@abstractmethod
def get_shape(self, data: Any) -> tuple[int, int]:
    """Return (rows, columns) shape."""
    ...

get_schema abstractmethod

get_schema(data: Any) -> dict[str, Any]

Return column name to type mapping (native types for each backend).

Source code in transformplan/backends/base.py
@abstractmethod
def get_schema(self, data: Any) -> dict[str, Any]:
    """Return column name to type mapping (native types for each backend)."""
    ...

get_columns abstractmethod

get_columns(data: Any) -> list[str]

Return list of column names.

Source code in transformplan/backends/base.py
@abstractmethod
def get_columns(self, data: Any) -> list[str]:
    """Return list of column names."""
    ...

is_numeric_type abstractmethod

is_numeric_type(dtype: Any) -> bool

Check if dtype is numeric.

Source code in transformplan/backends/base.py
@abstractmethod
def is_numeric_type(self, dtype: Any) -> bool:
    """Check if dtype is numeric."""
    ...

is_string_type abstractmethod

is_string_type(dtype: Any) -> bool

Check if dtype is string/text.

Source code in transformplan/backends/base.py
@abstractmethod
def is_string_type(self, dtype: Any) -> bool:
    """Check if dtype is string/text."""
    ...

is_datetime_type abstractmethod

is_datetime_type(dtype: Any) -> bool

Check if dtype is date/datetime/time/duration.

Source code in transformplan/backends/base.py
@abstractmethod
def is_datetime_type(self, dtype: Any) -> bool:
    """Check if dtype is date/datetime/time/duration."""
    ...

is_boolean_type abstractmethod

is_boolean_type(dtype: Any) -> bool

Check if dtype is boolean.

Source code in transformplan/backends/base.py
@abstractmethod
def is_boolean_type(self, dtype: Any) -> bool:
    """Check if dtype is boolean."""
    ...

float_type abstractmethod

float_type() -> Any

Return the float/double type for this backend.

Source code in transformplan/backends/base.py
@abstractmethod
def float_type(self) -> Any:
    """Return the float/double type for this backend."""
    ...

string_type abstractmethod

string_type() -> Any

Return the string/text type for this backend.

Source code in transformplan/backends/base.py
@abstractmethod
def string_type(self) -> Any:
    """Return the string/text type for this backend."""
    ...

PolarsBackend

The default backend, using Polars DataFrames.

PolarsBackend

Bases: Backend

Backend implementation using Polars for all operations.

compute_hash

compute_hash(data: DataFrame) -> str
Source code in transformplan/backends/polars.py
def compute_hash(self, data: pl.DataFrame) -> str:
    return frame_hash(data)

get_shape

get_shape(data: DataFrame) -> tuple[int, int]
Source code in transformplan/backends/polars.py
def get_shape(self, data: pl.DataFrame) -> tuple[int, int]:
    return data.shape

get_schema

get_schema(data: DataFrame) -> dict[str, Any]
Source code in transformplan/backends/polars.py
def get_schema(self, data: pl.DataFrame) -> dict[str, Any]:
    return dict(data.schema)

get_columns

get_columns(data: DataFrame) -> list[str]
Source code in transformplan/backends/polars.py
def get_columns(self, data: pl.DataFrame) -> list[str]:
    return data.columns
import polars as pl
from transformplan import TransformPlan

df = pl.DataFrame({"name": ["Alice", "Bob"], "age": [25, 30]})
plan = TransformPlan().rows_filter(Col("age") >= 18)
result, protocol = plan.process(df)

DuckDBBackend

Optional backend using DuckDB relations and SQL generation. Requires duckdb to be installed.

Optional Dependency

Install DuckDB separately: pip install duckdb or uv add duckdb

DuckDBBackend

DuckDBBackend(con: DuckDBPyConnection | None = None)

Bases: Backend

Backend implementation using DuckDB for all operations.

Initialize DuckDBBackend.

Parameters:

Name Type Description Default
con DuckDBPyConnection | None

DuckDB connection. If None, creates an in-memory connection.

None
Source code in transformplan/backends/duckdb.py
def __init__(self, con: duckdb.DuckDBPyConnection | None = None) -> None:
    """Initialize DuckDBBackend.

    Args:
        con: DuckDB connection. If None, creates an in-memory connection.
    """
    self._con = con or duckdb.connect()

compute_hash

compute_hash(data: DuckDBPyRelation) -> str
Source code in transformplan/backends/duckdb.py
def compute_hash(self, data: duckdb.DuckDBPyRelation) -> str:
    # Deterministic hash: sort columns, sort rows, hash all content
    cols = sorted(data.columns)
    col_list = ", ".join(_q(c) for c in cols)
    concat_expr = " || '|' || ".join(
        f"COALESCE({_q(c)}::VARCHAR, '')" for c in cols
    )
    sql = (
        f"SELECT md5(string_agg(row_str, '\\n' ORDER BY row_str)) AS h "
        f"FROM (SELECT {concat_expr} AS row_str "
        f"FROM (SELECT {col_list} FROM {_sub(data)}) AS _s) AS _r"
    )
    result = self._con.sql(sql).fetchone()
    if result is None or result[0] is None:
        return hashlib.md5(b"empty").hexdigest()[:16]
    return str(result[0])[:16]

get_shape

get_shape(data: DuckDBPyRelation) -> tuple[int, int]
Source code in transformplan/backends/duckdb.py
def get_shape(self, data: duckdb.DuckDBPyRelation) -> tuple[int, int]:
    count_result = self._con.sql(f"SELECT COUNT(*) FROM {_sub(data)}").fetchone()
    rows = count_result[0] if count_result else 0
    return (rows, len(data.columns))

get_schema

get_schema(data: DuckDBPyRelation) -> dict[str, Any]
Source code in transformplan/backends/duckdb.py
def get_schema(self, data: duckdb.DuckDBPyRelation) -> dict[str, Any]:
    return dict(zip(data.columns, [str(t) for t in data.types], strict=False))

get_columns

get_columns(data: DuckDBPyRelation) -> list[str]
Source code in transformplan/backends/duckdb.py
def get_columns(self, data: duckdb.DuckDBPyRelation) -> list[str]:
    return list(data.columns)
import duckdb
from transformplan import TransformPlan, Col
from transformplan.backends.duckdb import DuckDBBackend

con = duckdb.connect()
rel = con.sql("SELECT * FROM 'data.parquet'")

plan = (
    TransformPlan()
    .col_rename(column="ID", new_name="id")
    .rows_filter(Col("age") >= 18)
    .math_standardize(column="score", new_column="z_score")
)

result, protocol = plan.process(rel, backend=DuckDBBackend(con))

Cross-Backend Serialization

Pipelines are inherently backend-agnostic. The same serialized plan can be executed with any backend:

import duckdb
from transformplan import TransformPlan, Col
from transformplan.backends.duckdb import DuckDBBackend

# Build and serialize
plan = (
    TransformPlan()
    .col_rename(column="ID", new_name="id")
    .rows_filter(Col("age") >= 18)
)
plan.to_json("pipeline.json")

# Load and execute with Polars (default)
restored = TransformPlan.from_json("pipeline.json")
result, protocol = restored.process(polars_df)

# Or execute with DuckDB
con = duckdb.connect()
rel = con.sql("SELECT * FROM 'data.parquet'")
result, protocol = restored.process(rel, backend=DuckDBBackend(con))

Type System

Each backend classifies its native types into categories used by the validation system:

Method PolarsBackend DuckDBBackend
is_numeric_type() Polars Int/Float/Decimal dtypes INTEGER, BIGINT, DOUBLE, FLOAT, etc.
is_string_type() pl.Utf8, pl.String VARCHAR, TEXT, etc.
is_datetime_type() pl.Date, pl.Datetime, pl.Time DATE, TIMESTAMP, TIME, etc.
is_boolean_type() pl.Boolean BOOLEAN

Type factory methods (float_type(), int_type(), string_type(), bool_type()) return the appropriate native type for each backend, used by operations that create new columns.