Skip to content

Row Operations

Operations for filtering, sorting, and transforming rows.

Overview

Row operations modify which rows are included in the DataFrame and how they are ordered. Use the Col class to build filter expressions.

from transformplan import TransformPlan, Col

plan = (
    TransformPlan()
    .rows_filter(Col("status") == "active")
    .rows_sort("created_at", descending=True)
    .rows_unique(columns=["email"])
)

Class Reference

RowOps

Mixin providing row-level operations.

rows_filter

rows_filter(filter: Filter | dict[str, Any]) -> Self

Filter rows using a serializable Filter expression.

Returns:

Type Description
Self

Self for method chaining.

Example

from transformplan.filters import Col

.rows_filter(Col("age") > 18) .rows_filter((Col("status") == "active") & (Col("score") >= 50))

Source code in transformplan/ops/rows.py
def rows_filter(self, filter: Filter | dict[str, Any]) -> Self:
    """Filter rows using a serializable Filter expression.

    Returns:
        Self for method chaining.

    Example:
        from transformplan.filters import Col

        .rows_filter(Col("age") > 18)
        .rows_filter((Col("status") == "active") & (Col("score") >= 50))
    """
    filter_dict = filter if isinstance(filter, dict) else filter.to_dict()
    return self._register(self._rows_filter, {"filter": filter_dict})

rows_drop

rows_drop(filter: Filter | dict[str, Any]) -> Self

Drop rows matching a filter (inverse of rows_filter).

Returns:

Type Description
Self

Self for method chaining.

Example

.rows_drop(Col("status") == "deleted")

Source code in transformplan/ops/rows.py
def rows_drop(self, filter: Filter | dict[str, Any]) -> Self:
    """Drop rows matching a filter (inverse of rows_filter).

    Returns:
        Self for method chaining.

    Example:
        .rows_drop(Col("status") == "deleted")
    """
    filter_dict = filter if isinstance(filter, dict) else filter.to_dict()
    return self._register(self._rows_drop, {"filter": filter_dict})

rows_drop_nulls

rows_drop_nulls(columns: str | Sequence[str] | None = None) -> Self

Drop rows with null values in specified columns (or any column if None).

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_drop_nulls(self, columns: str | Sequence[str] | None = None) -> Self:
    """Drop rows with null values in specified columns (or any column if None).

    Returns:
        Self for method chaining.
    """
    if isinstance(columns, str):
        columns = [columns]
    return self._register(self._rows_drop_nulls, {"columns": columns})

rows_unique

rows_unique(
    columns: str | Sequence[str] | None = None,
    keep: Literal["first", "last", "any", "none"] = "first",
) -> Self

Keep unique rows based on specified columns.

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_unique(
    self,
    columns: str | Sequence[str] | None = None,
    keep: Literal["first", "last", "any", "none"] = "first",
) -> Self:
    """Keep unique rows based on specified columns.

    Returns:
        Self for method chaining.
    """
    if isinstance(columns, str):
        columns = [columns]
    return self._register(self._rows_unique, {"columns": columns, "keep": keep})

rows_deduplicate

rows_deduplicate(
    columns: str | Sequence[str],
    sort_by: str,
    keep: Literal["first", "last"] = "first",
    *,
    descending: bool = False,
) -> Self

Deduplicate rows by keeping first/last based on sort order.

Parameters:

Name Type Description Default
columns str | Sequence[str]

Columns that define duplicates.

required
sort_by str

Column to sort by before deduplication.

required
keep Literal['first', 'last']

Keep 'first' or 'last' after sorting.

'first'
descending bool

Sort in descending order.

False

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_deduplicate(
    self,
    columns: str | Sequence[str],
    sort_by: str,
    keep: Literal["first", "last"] = "first",
    *,
    descending: bool = False,
) -> Self:
    """Deduplicate rows by keeping first/last based on sort order.

    Args:
        columns: Columns that define duplicates.
        sort_by: Column to sort by before deduplication.
        keep: Keep 'first' or 'last' after sorting.
        descending: Sort in descending order.

    Returns:
        Self for method chaining.
    """
    if isinstance(columns, str):
        columns = [columns]
    return self._register(
        self._rows_deduplicate,
        {
            "columns": list(columns),
            "sort_by": sort_by,
            "keep": keep,
            "descending": descending,
        },
    )

rows_sort

rows_sort(
    by: str | Sequence[str], *, descending: bool | Sequence[bool] = False
) -> Self

Sort rows by one or more columns.

Parameters:

Name Type Description Default
by str | Sequence[str]

Column(s) to sort by.

required
descending bool | Sequence[bool]

Sort direction (single bool or list matching columns).

False

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_sort(
    self,
    by: str | Sequence[str],
    *,
    descending: bool | Sequence[bool] = False,
) -> Self:
    """Sort rows by one or more columns.

    Args:
        by: Column(s) to sort by.
        descending: Sort direction (single bool or list matching columns).

    Returns:
        Self for method chaining.
    """
    if isinstance(by, str):
        by = [by]
    return self._register(
        self._rows_sort, {"by": list(by), "descending": descending}
    )

rows_flag

rows_flag(
    filter: Filter | dict[str, Any],
    new_column: str,
    *,
    true_value: Any = True,
    false_value: Any = False,
) -> Self

Add a flag column based on a filter condition (without dropping rows).

Parameters:

Name Type Description Default
filter Filter | dict[str, Any]

Filter condition.

required
new_column str

Name for the flag column.

required
true_value Any

Value when condition is True.

True
false_value Any

Value when condition is False.

False

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_flag(
    self,
    filter: Filter | dict[str, Any],
    new_column: str,
    *,
    true_value: Any = True,  # noqa: ANN401
    false_value: Any = False,  # noqa: ANN401
) -> Self:
    """Add a flag column based on a filter condition (without dropping rows).

    Args:
        filter: Filter condition.
        new_column: Name for the flag column.
        true_value: Value when condition is True.
        false_value: Value when condition is False.

    Returns:
        Self for method chaining.
    """
    filter_dict = filter if isinstance(filter, dict) else filter.to_dict()
    return self._register(
        self._rows_flag,
        {
            "filter": filter_dict,
            "new_column": new_column,
            "true_value": true_value,
            "false_value": false_value,
        },
    )

rows_head

rows_head(n: int = 5) -> Self

Keep only the first n rows.

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_head(self, n: int = 5) -> Self:
    """Keep only the first n rows.

    Returns:
        Self for method chaining.
    """
    return self._register(self._rows_head, {"n": n})

rows_tail

rows_tail(n: int = 5) -> Self

Keep only the last n rows.

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_tail(self, n: int = 5) -> Self:
    """Keep only the last n rows.

    Returns:
        Self for method chaining.
    """
    return self._register(self._rows_tail, {"n": n})

rows_sample

rows_sample(
    n: int | None = None, fraction: float | None = None, seed: int | None = None
) -> Self

Sample rows from the DataFrame.

Parameters:

Name Type Description Default
n int | None

Number of rows to sample.

None
fraction float | None

Fraction of rows to sample (0.0 to 1.0).

None
seed int | None

Random seed for reproducibility.

None

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_sample(
    self,
    n: int | None = None,
    fraction: float | None = None,
    seed: int | None = None,
) -> Self:
    """Sample rows from the DataFrame.

    Args:
        n: Number of rows to sample.
        fraction: Fraction of rows to sample (0.0 to 1.0).
        seed: Random seed for reproducibility.

    Returns:
        Self for method chaining.
    """
    return self._register(
        self._rows_sample, {"n": n, "fraction": fraction, "seed": seed}
    )

rows_explode

rows_explode(column: str) -> Self

Explode a list column into multiple rows.

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_explode(self, column: str) -> Self:
    """Explode a list column into multiple rows.

    Returns:
        Self for method chaining.
    """
    return self._register(self._rows_explode, {"column": column})

rows_melt

rows_melt(
    id_columns: Sequence[str],
    value_columns: Sequence[str],
    variable_name: str = "variable",
    value_name: str = "value",
) -> Self

Unpivot a DataFrame from wide to long format.

Parameters:

Name Type Description Default
id_columns Sequence[str]

Columns to keep as identifiers.

required
value_columns Sequence[str]

Columns to unpivot.

required
variable_name str

Name for the variable column.

'variable'
value_name str

Name for the value column.

'value'

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_melt(
    self,
    id_columns: Sequence[str],
    value_columns: Sequence[str],
    variable_name: str = "variable",
    value_name: str = "value",
) -> Self:
    """Unpivot a DataFrame from wide to long format.

    Args:
        id_columns: Columns to keep as identifiers.
        value_columns: Columns to unpivot.
        variable_name: Name for the variable column.
        value_name: Name for the value column.

    Returns:
        Self for method chaining.
    """
    return self._register(
        self._rows_melt,
        {
            "id_columns": list(id_columns),
            "value_columns": list(value_columns),
            "variable_name": variable_name,
            "value_name": value_name,
        },
    )

rows_pivot

rows_pivot(
    index: str | Sequence[str],
    columns: str,
    values: str,
    aggregate_function: PivotAgg = "first",
) -> Self

Pivot from long to wide format.

Parameters:

Name Type Description Default
index str | Sequence[str]

Column(s) to use as row identifiers.

required
columns str

Column whose unique values become new columns.

required
values str

Column containing values to fill.

required
aggregate_function PivotAgg

How to aggregate ('first', 'sum', 'mean', 'count', etc.).

'first'

Returns:

Type Description
Self

Self for method chaining.

Source code in transformplan/ops/rows.py
def rows_pivot(
    self,
    index: str | Sequence[str],
    columns: str,
    values: str,
    aggregate_function: PivotAgg = "first",
) -> Self:
    """Pivot from long to wide format.

    Args:
        index: Column(s) to use as row identifiers.
        columns: Column whose unique values become new columns.
        values: Column containing values to fill.
        aggregate_function: How to aggregate ('first', 'sum', 'mean', 'count',
            etc.).

    Returns:
        Self for method chaining.
    """
    if isinstance(index, str):
        index = [index]
    return self._register(
        self._rows_pivot,
        {
            "index": list(index),
            "columns": columns,
            "values": values,
            "aggregate_function": aggregate_function,
        },
    )

Examples

Filtering Rows

from transformplan import Col

# Keep rows matching condition
plan = TransformPlan().rows_filter(Col("age") >= 18)

# Drop rows matching condition
plan = TransformPlan().rows_drop(Col("status") == "deleted")

# Complex filters
plan = TransformPlan().rows_filter(
    (Col("score") >= 50) & (Col("active") == True)
)

Flagging Rows

Add a boolean column based on a condition without removing rows:

plan = TransformPlan().rows_flag(
    filter=Col("score") >= 90,
    new_column="is_excellent",
    true_value=True,
    false_value=False
)

Sorting

# Sort by single column
plan = TransformPlan().rows_sort("name")

# Sort descending
plan = TransformPlan().rows_sort("score", descending=True)

# Sort by multiple columns
plan = TransformPlan().rows_sort(
    by=["category", "price"],
    descending=[False, True]
)

Removing Duplicates

# Keep first occurrence of each unique value
plan = TransformPlan().rows_unique(columns=["email"])

# Keep last occurrence
plan = TransformPlan().rows_unique(columns=["user_id"], keep="last")

# Deduplicate with specific sort order
plan = TransformPlan().rows_deduplicate(
    columns=["user_id"],
    sort_by="updated_at",
    keep="last",
    descending=True
)

Handling Nulls

# Drop rows with nulls in any column
plan = TransformPlan().rows_drop_nulls()

# Drop rows with nulls in specific columns
plan = TransformPlan().rows_drop_nulls(columns=["required_field"])

Limiting Rows

# Keep first n rows
plan = TransformPlan().rows_head(10)

# Keep last n rows
plan = TransformPlan().rows_tail(10)

# Random sample
plan = TransformPlan().rows_sample(n=100, seed=42)
plan = TransformPlan().rows_sample(fraction=0.1, seed=42)

Reshaping

# Explode list column into multiple rows
plan = TransformPlan().rows_explode("tags")

# Unpivot from wide to long format
plan = TransformPlan().rows_melt(
    id_columns=["id", "name"],
    value_columns=["q1", "q2", "q3", "q4"],
    variable_name="quarter",
    value_name="sales"
)

# Pivot from long to wide format
plan = TransformPlan().rows_pivot(
    index=["id"],
    columns="quarter",
    values="sales",
    aggregate_function="sum"
)