Skip to content

Protocol

The Protocol class captures transformation history for auditability and reproducibility.

Overview

When you process data with a TransformPlan, you receive both the transformed DataFrame and a Protocol object. The protocol contains:

  • Input/output DataFrame hashes for verification
  • Step-by-step operation details
  • Shape changes and timing information
  • Optional metadata
from transformplan import TransformPlan

plan = TransformPlan().col_drop("temp").math_multiply("price", 1.1)
df_result, protocol = plan.process(df)

# View the protocol
protocol.print()

# Save for audit
protocol.to_json("audit_trail.json")

Protocol Class

Protocol

Protocol()

Captures the transformation history for auditability.

Initialize an empty Protocol.

Source code in transformplan/protocol.py
def __init__(self) -> None:
    """Initialize an empty Protocol."""
    self._steps: list[dict[str, Any]] = []
    self._input_hash: str | None = None
    self._input_shape: tuple[int, int] | None = None
    self._created_at: str = (
        datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
    )
    self._metadata: dict[str, Any] = {}

input_hash property

input_hash: str | None

Hash of the input DataFrame.

Returns:

Type Description
str | None

Hash string or None if not set.

output_hash property

output_hash: str | None

Hash of the final output DataFrame.

Returns:

Type Description
str | None

Hash string or None if no steps.

metadata property

metadata: dict[str, Any]

Protocol metadata.

Returns:

Type Description
dict[str, Any]

Dictionary of metadata.

set_input

set_input(hash_value: str, shape: tuple[int, int]) -> None

Set the hash and shape of the input DataFrame.

Source code in transformplan/protocol.py
def set_input(self, hash_value: str, shape: tuple[int, int]) -> None:
    """Set the hash and shape of the input DataFrame."""
    self._input_hash = hash_value
    self._input_shape = shape

set_metadata

set_metadata(**kwargs: Any) -> None

Set arbitrary metadata on the protocol.

Example

protocol.set_metadata(author="alice", project="analysis-v2")

Source code in transformplan/protocol.py
def set_metadata(self, **kwargs: Any) -> None:  # noqa: ANN401
    """Set arbitrary metadata on the protocol.

    Example:
        protocol.set_metadata(author="alice", project="analysis-v2")
    """
    self._metadata.update(kwargs)

add_step

add_step(
    operation: str,
    params: dict[str, Any],
    old_shape: tuple[int, int],
    new_shape: tuple[int, int],
    elapsed: float,
    output_hash: str,
) -> None

Record a transformation step in the protocol.

Parameters:

Name Type Description Default
operation str

Name of the operation.

required
params dict[str, Any]

Operation parameters.

required
old_shape tuple[int, int]

Shape before operation (rows, cols).

required
new_shape tuple[int, int]

Shape after operation (rows, cols).

required
elapsed float

Time taken in seconds.

required
output_hash str

Hash of the output DataFrame.

required
Source code in transformplan/protocol.py
def add_step(
    self,
    operation: str,
    params: dict[str, Any],
    old_shape: tuple[int, int],
    new_shape: tuple[int, int],
    elapsed: float,
    output_hash: str,
) -> None:
    """Record a transformation step in the protocol.

    Args:
        operation: Name of the operation.
        params: Operation parameters.
        old_shape: Shape before operation (rows, cols).
        new_shape: Shape after operation (rows, cols).
        elapsed: Time taken in seconds.
        output_hash: Hash of the output DataFrame.
    """
    self._steps.append(
        {
            "step": len(self._steps) + 1,
            "operation": operation,
            "params": params,
            "old_shape": old_shape,
            "new_shape": new_shape,
            "rows_changed": old_shape[0] - new_shape[0],
            "cols_changed": old_shape[1] - new_shape[1],
            "elapsed_seconds": round(elapsed, 4),
            "output_hash": output_hash,
        }
    )

to_dataframe

to_dataframe() -> DataFrame

Convert protocol to a Polars DataFrame.

Returns:

Type Description
DataFrame

DataFrame with step information.

Source code in transformplan/protocol.py
def to_dataframe(self) -> pl.DataFrame:
    """Convert protocol to a Polars DataFrame.

    Returns:
        DataFrame with step information.
    """
    rows = []

    # Step 0: input state
    if self._input_hash is not None:
        rows.append(
            {
                "step": 0,
                "operation": "input",
                "params": None,
                "old_shape": None,
                "new_shape": self._input_shape,
                "rows_changed": 0,
                "cols_changed": 0,
                "elapsed_seconds": 0.0,
                "output_hash": self._input_hash,
            }
        )

    rows.extend(self._steps)
    return pl.DataFrame(rows)

to_csv

to_csv(path: str | Path) -> None

Write protocol to CSV file.

Params are serialized as JSON strings to avoid nested data issues.

Parameters:

Name Type Description Default
path str | Path

File path to write to.

required
Source code in transformplan/protocol.py
def to_csv(self, path: str | Path) -> None:
    """Write protocol to CSV file.

    Params are serialized as JSON strings to avoid nested data issues.

    Args:
        path: File path to write to.
    """
    rows = []

    # Step 0: input state
    if self._input_hash is not None:
        rows.append(
            {
                "step": 0,
                "operation": "input",
                "params": None,
                "old_shape": None,
                "new_shape": str(list(self._input_shape))
                if self._input_shape
                else None,
                "rows_changed": 0,
                "cols_changed": 0,
                "elapsed_seconds": 0.0,
                "output_hash": self._input_hash,
            }
        )

    rows.extend(
        {
            "step": step["step"],
            "operation": step["operation"],
            "params": json.dumps(step["params"]) if step["params"] else None,
            "old_shape": str(list(step["old_shape"])),
            "new_shape": str(list(step["new_shape"])),
            "rows_changed": step["rows_changed"],
            "cols_changed": step["cols_changed"],
            "elapsed_seconds": step["elapsed_seconds"],
            "output_hash": step["output_hash"],
        }
        for step in self._steps
    )

    pl.DataFrame(rows).write_csv(path)

to_dict

to_dict() -> dict[str, Any]

Serialize protocol to a dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation of the protocol.

Source code in transformplan/protocol.py
def to_dict(self) -> dict[str, Any]:
    """Serialize protocol to a dictionary.

    Returns:
        Dictionary representation of the protocol.
    """
    return {
        "version": self.VERSION,
        "created_at": self._created_at,
        "metadata": self._metadata,
        "input": {
            "hash": self._input_hash,
            "shape": list(self._input_shape) if self._input_shape else None,
        },
        "steps": [
            {
                "step": s["step"],
                "operation": s["operation"],
                "params": s["params"],
                "old_shape": list(s["old_shape"]),
                "new_shape": list(s["new_shape"]),
                "rows_changed": s["rows_changed"],
                "cols_changed": s["cols_changed"],
                "elapsed_seconds": s["elapsed_seconds"],
                "output_hash": s["output_hash"],
            }
            for s in self._steps
        ],
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> Protocol

Deserialize protocol from a dictionary.

Returns:

Type Description
Protocol

Protocol instance.

Source code in transformplan/protocol.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Protocol:
    """Deserialize protocol from a dictionary.

    Returns:
        Protocol instance.
    """
    protocol = cls()
    protocol._created_at = data.get("created_at", protocol._created_at)
    protocol._metadata = data.get("metadata", {})

    input_data = data.get("input", {})
    protocol._input_hash = input_data.get("hash")
    shape = input_data.get("shape")
    protocol._input_shape = (int(shape[0]), int(shape[1])) if shape else None

    for step in data.get("steps", []):
        protocol._steps.append(
            {
                "step": step["step"],
                "operation": step["operation"],
                "params": step["params"],
                "old_shape": tuple(step["old_shape"]),
                "new_shape": tuple(step["new_shape"]),
                "rows_changed": step["rows_changed"],
                "cols_changed": step["cols_changed"],
                "elapsed_seconds": step["elapsed_seconds"],
                "output_hash": step["output_hash"],
            }
        )

    return protocol

to_json

to_json(path: str | Path | None = None, indent: int = 2) -> str

Serialize protocol to JSON.

Parameters:

Name Type Description Default
path str | Path | None

Optional file path to write to.

None
indent int

JSON indentation level.

2

Returns:

Type Description
str

JSON string.

Source code in transformplan/protocol.py
def to_json(self, path: str | Path | None = None, indent: int = 2) -> str:
    """Serialize protocol to JSON.

    Args:
        path: Optional file path to write to.
        indent: JSON indentation level.

    Returns:
        JSON string.
    """
    json_str = json.dumps(self.to_dict(), indent=indent)

    if path is not None:
        Path(path).write_text(json_str)

    return json_str

from_json classmethod

from_json(source: str | Path) -> Protocol

Deserialize protocol from JSON.

Parameters:

Name Type Description Default
source str | Path

Either a JSON string or a path to a JSON file.

required

Returns:

Type Description
Protocol

Protocol instance.

Source code in transformplan/protocol.py
@classmethod
def from_json(cls, source: str | Path) -> Protocol:
    """Deserialize protocol from JSON.

    Args:
        source: Either a JSON string or a path to a JSON file.

    Returns:
        Protocol instance.
    """
    if isinstance(source, Path) or not source.strip().startswith("{"):
        # Treat as file path
        content = Path(source).read_text()
    else:
        # Treat as JSON string
        content = source

    return cls.from_dict(json.loads(content))

summary

summary(*, show_params: bool = True) -> str

Generate a clean, human-readable summary of the protocol.

Parameters:

Name Type Description Default
show_params bool

Whether to include operation parameters.

True

Returns:

Type Description
str

Formatted string summary.

Source code in transformplan/protocol.py
def summary(self, *, show_params: bool = True) -> str:  # noqa: C901
    """Generate a clean, human-readable summary of the protocol.

    Args:
        show_params: Whether to include operation parameters.

    Returns:
        Formatted string summary.
    """
    lines = []

    # Header
    lines.extend(("=" * 70, "TRANSFORM PROTOCOL", "=" * 70))

    # Metadata
    if self._metadata:
        for key, value in self._metadata.items():
            lines.append(f"{key}: {value}")
        lines.append("-" * 70)

    # Input info
    if self._input_hash:
        shape_str = (
            f"{self._input_shape[0]} rows x {self._input_shape[1]} cols"
            if self._input_shape
            else "unknown"
        )
        lines.append(f"Input:  {shape_str}  [{self._input_hash}]")

    # Output info
    if self._steps:
        final = self._steps[-1]
        shape_str = f"{final['new_shape'][0]} rows x {final['new_shape'][1]} cols"
        lines.append(f"Output: {shape_str}  [{final['output_hash']}]")

    # Total time
    total_time = sum(s["elapsed_seconds"] for s in self._steps)
    lines.extend(
        [
            f"Total time: {total_time:.4f}s",
            "-" * 70,
            "",
            f"{'#':<4} {'Operation':<20} {'Rows':<12} {'Cols':<12} {'Time':<10} {'Hash':<16}",
            "-" * 70,
        ]
    )

    # Input row
    if self._input_hash:
        shape = self._input_shape or (0, 0)
        lines.append(
            f"{'0':<4} {'input':<20} {shape[0]:<12} {shape[1]:<12} {'-':<10} {self._input_hash:<16}"
        )

    # Operation rows
    no_effect_steps = []
    for step in self._steps:
        step_num = str(step["step"])
        op = step["operation"]
        rows = step["new_shape"][0]
        cols = step["new_shape"][1]

        # Row/col change indicators (negative means removed)
        row_change = -step[
            "rows_changed"
        ]  # flip: positive = added, negative = removed
        col_change = -step["cols_changed"]
        row_str = str(rows)
        col_str = str(cols)
        if row_change != 0:
            row_str += f" ({row_change:+d})"
        if col_change != 0:
            col_str += f" ({col_change:+d})"

        time_str = f"{step['elapsed_seconds']:.4f}s"
        hash_str = step["output_hash"]

        # Check if step had no effect (same hash as previous)
        prev_hash = (
            self._input_hash
            if step["step"] == 1
            else self._steps[step["step"] - 2]["output_hash"]
        )
        no_effect = hash_str == prev_hash
        if no_effect:
            no_effect_steps.append(step["step"])

        # Add marker for no-effect steps
        marker = " ○" if no_effect else ""

        lines.append(
            f"{step_num:<4} {op:<20} {row_str:<12} {col_str:<12} {time_str:<10} {hash_str:<16}{marker}"
        )

        # Params
        if show_params and step["params"]:
            params_str = self._format_params(step["params"])
            lines.append(f"     └─ {params_str}")

    lines.append("=" * 70)

    # Add note about no-effect steps
    if no_effect_steps:
        lines.append(
            f"○ = no effect (steps {', '.join(map(str, no_effect_steps))} did not change data)"
        )

    return "\n".join(lines)

print

print(*, show_params: bool = True) -> None

Print the protocol summary to stdout.

Parameters:

Name Type Description Default
show_params bool

Whether to include operation parameters.

True
Source code in transformplan/protocol.py
def print(self, *, show_params: bool = True) -> None:
    """Print the protocol summary to stdout.

    Args:
        show_params: Whether to include operation parameters.
    """
    print(self.summary(show_params=show_params))  # noqa: T201

frame_hash Function

frame_hash

frame_hash(df: DataFrame) -> str

Compute a deterministic hash of a DataFrame.

The hash is: - Row-order invariant (sorted row hashes) - Column-order invariant (columns sorted before hashing) - Content-sensitive (any value change = different hash)

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to hash.

required

Returns:

Type Description
str

A 16-character hex string.

Source code in transformplan/protocol.py
def frame_hash(df: pl.DataFrame) -> str:
    """Compute a deterministic hash of a DataFrame.

    The hash is:
    - Row-order invariant (sorted row hashes)
    - Column-order invariant (columns sorted before hashing)
    - Content-sensitive (any value change = different hash)

    Args:
        df: The DataFrame to hash.

    Returns:
        A 16-character hex string.
    """
    # Sort columns for column-order invariance
    sorted_cols = sorted(df.columns)
    df_sorted = df.select(sorted_cols)

    # Schema hash (sorted columns + dtypes)
    schema_str = str([(col, str(df_sorted.schema[col])) for col in sorted_cols])

    # Row hashes (sorted for row-order invariance)
    row_hashes = df_sorted.hash_rows().sort().to_list()

    # Combine
    content = f"{schema_str}|{row_hashes}"
    return hashlib.sha256(content.encode()).hexdigest()[:16]

Example Output

The print() method generates a formatted summary:

======================================================================
TRANSFORM PROTOCOL
======================================================================
Input:  1000 rows x 5 cols  [a1b2c3d4e5f6g7h8]
Output: 850 rows x 4 cols   [h8g7f6e5d4c3b2a1]
Total time: 0.0234s
----------------------------------------------------------------------

#    Operation            Rows         Cols         Time       Hash
----------------------------------------------------------------------
0    input                1000         5            -          a1b2c3d4e5f6g7h8
1    col_drop             1000         4 (-1)       0.0012s    b2c3d4e5f6g7h8a1
     -> column='temp'
2    math_multiply        1000         4            0.0008s    c3d4e5f6g7h8a1b2
     -> column='price', value=1.1
3    rows_filter          850 (-150)   4            0.0214s    h8g7f6e5d4c3b2a1
     -> filter=(age >= 18)
======================================================================

Reproducibility

The frame_hash function computes a deterministic hash that is:

  • Row-order invariant: Same rows in different order produce the same hash
  • Column-order invariant: Same columns in different order produce the same hash
  • Content-sensitive: Any value change produces a different hash

This enables verification that the same pipeline on the same input produces identical results.