Skip to content

Trades API

Design of Experiments, batch evaluation, and Pareto analysis.

Overview

from phased_array_systems.trades import (
    # Design Space
    DesignSpace,
    DesignVariable,

    # DOE Generation
    generate_doe,

    # Batch Evaluation
    BatchRunner,

    # Pareto Analysis
    filter_feasible,
    extract_pareto,
    rank_pareto,
)

# Additional functions via submodules
from phased_array_systems.trades.doe import generate_doe_from_dict, augment_doe
from phased_array_systems.trades.pareto import compute_hypervolume

Design Space

DesignSpace

Bases: BaseModel

Collection of design variables defining a design space.

Provides methods for sampling the design space using various DOE methods (grid, random, LHS).

ATTRIBUTE DESCRIPTION
variables

List of design variables

TYPE: list[DesignVariable]

name

Optional name for the design space

TYPE: str | None

n_dims property

n_dims: int

Number of dimensions (variables) in the design space.

variable_names property

variable_names: list[str]

List of variable names.

add_variable

add_variable(name: str, type: Literal['int', 'float', 'categorical'] = 'float', low: float | None = None, high: float | None = None, values: list[Any] | None = None) -> DesignSpace

Add a variable to the design space (fluent interface).

PARAMETER DESCRIPTION
name

Variable name

TYPE: str

type

Variable type

TYPE: Literal['int', 'float', 'categorical'] DEFAULT: 'float'

low

Lower bound

TYPE: float | None DEFAULT: None

high

Upper bound

TYPE: float | None DEFAULT: None

values

Categorical values

TYPE: list[Any] | None DEFAULT: None

RETURNS DESCRIPTION
DesignSpace

Self for chaining

Source code in src/phased_array_systems/trades/design_space.py
def add_variable(
    self,
    name: str,
    type: Literal["int", "float", "categorical"] = "float",
    low: float | None = None,
    high: float | None = None,
    values: list[Any] | None = None,
) -> "DesignSpace":
    """Add a variable to the design space (fluent interface).

    Args:
        name: Variable name
        type: Variable type
        low: Lower bound
        high: Upper bound
        values: Categorical values

    Returns:
        Self for chaining
    """
    var = DesignVariable(name=name, type=type, low=low, high=high, values=values)
    self.variables.append(var)
    return self

sample

sample(method: Literal['grid', 'random', 'lhs'] = 'lhs', n_samples: int = 100, seed: int | None = None, grid_levels: int | list[int] | None = None) -> DataFrame

Sample the design space.

PARAMETER DESCRIPTION
method

Sampling method ("grid", "random", "lhs")

TYPE: Literal['grid', 'random', 'lhs'] DEFAULT: 'lhs'

n_samples

Number of samples (ignored for grid method)

TYPE: int DEFAULT: 100

seed

Random seed for reproducibility

TYPE: int | None DEFAULT: None

grid_levels

Number of levels per variable for grid method

TYPE: int | list[int] | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with columns for each variable plus 'case_id'

Source code in src/phased_array_systems/trades/design_space.py
def sample(
    self,
    method: Literal["grid", "random", "lhs"] = "lhs",
    n_samples: int = 100,
    seed: int | None = None,
    grid_levels: int | list[int] | None = None,
) -> pd.DataFrame:
    """Sample the design space.

    Args:
        method: Sampling method ("grid", "random", "lhs")
        n_samples: Number of samples (ignored for grid method)
        seed: Random seed for reproducibility
        grid_levels: Number of levels per variable for grid method

    Returns:
        DataFrame with columns for each variable plus 'case_id'
    """
    if method == "grid":
        return self._sample_grid(grid_levels)
    elif method == "random":
        return self._sample_random(n_samples, seed)
    elif method == "lhs":
        return self._sample_lhs(n_samples, seed)
    else:
        raise ValueError(f"Unknown sampling method: {method}")

get_variable

get_variable(name: str) -> DesignVariable | None

Get a variable by name.

Source code in src/phased_array_systems/trades/design_space.py
def get_variable(self, name: str) -> DesignVariable | None:
    """Get a variable by name."""
    for var in self.variables:
        if var.name == name:
            return var
    return None

DesignVariable

Bases: BaseModel

Definition of a single design variable.

Supports continuous (float), discrete (int), and categorical variables.

ATTRIBUTE DESCRIPTION
name

Variable name, typically a dot-path like "array.nx"

TYPE: str

type

Variable type ("int", "float", or "categorical")

TYPE: Literal['int', 'float', 'categorical']

low

Lower bound for continuous/discrete variables

TYPE: float | None

high

Upper bound for continuous/discrete variables

TYPE: float | None

values

List of allowed values for categorical variables

TYPE: list[Any] | None

validate_bounds_or_values

validate_bounds_or_values() -> DesignVariable

Ensure proper bounds/values are set based on type.

Source code in src/phased_array_systems/trades/design_space.py
@model_validator(mode="after")
def validate_bounds_or_values(self) -> "DesignVariable":
    """Ensure proper bounds/values are set based on type."""
    if self.type in ("int", "float"):
        if self.low is None or self.high is None:
            raise ValueError(f"Variable '{self.name}': low and high required for {self.type}")
        if self.low > self.high:
            raise ValueError(f"Variable '{self.name}': low must be <= high")
    elif self.type == "categorical":
        if not self.values or len(self.values) < 1:
            raise ValueError(f"Variable '{self.name}': values required for categorical")
    return self

sample_uniform

sample_uniform(n: int, rng: Generator) -> ndarray

Generate uniform random samples.

PARAMETER DESCRIPTION
n

Number of samples

TYPE: int

rng

NumPy random generator

TYPE: Generator

RETURNS DESCRIPTION
ndarray

Array of sampled values

Source code in src/phased_array_systems/trades/design_space.py
def sample_uniform(self, n: int, rng: np.random.Generator) -> np.ndarray:
    """Generate uniform random samples.

    Args:
        n: Number of samples
        rng: NumPy random generator

    Returns:
        Array of sampled values
    """
    if self.type == "float":
        return rng.uniform(self.low, self.high, n)
    elif self.type == "int":
        return rng.integers(int(self.low), int(self.high) + 1, n)
    else:  # categorical
        indices = rng.integers(0, len(self.values), n)
        return np.array([self.values[i] for i in indices])

scale_from_unit

scale_from_unit(unit_values: ndarray) -> ndarray

Scale values from [0, 1] to actual variable range.

PARAMETER DESCRIPTION
unit_values

Values in [0, 1]

TYPE: ndarray

RETURNS DESCRIPTION
ndarray

Scaled values in variable's actual range

Source code in src/phased_array_systems/trades/design_space.py
def scale_from_unit(self, unit_values: np.ndarray) -> np.ndarray:
    """Scale values from [0, 1] to actual variable range.

    Args:
        unit_values: Values in [0, 1]

    Returns:
        Scaled values in variable's actual range
    """
    if self.type == "float":
        return self.low + unit_values * (self.high - self.low)
    elif self.type == "int":
        scaled = self.low + unit_values * (self.high - self.low + 1)
        return np.floor(scaled).astype(int).clip(int(self.low), int(self.high))
    else:  # categorical
        indices = np.floor(unit_values * len(self.values)).astype(int)
        indices = indices.clip(0, len(self.values) - 1)
        return np.array([self.values[i] for i in indices])

get_grid_values

get_grid_values(n_levels: int) -> list[Any]

Get grid values for this variable.

PARAMETER DESCRIPTION
n_levels

Number of levels for grid

TYPE: int

RETURNS DESCRIPTION
list[Any]

List of values at each level

Source code in src/phased_array_systems/trades/design_space.py
def get_grid_values(self, n_levels: int) -> list[Any]:
    """Get grid values for this variable.

    Args:
        n_levels: Number of levels for grid

    Returns:
        List of values at each level
    """
    if self.type == "float":
        return list(np.linspace(self.low, self.high, n_levels))
    elif self.type == "int":
        # For integers, use actual integer values
        all_ints = list(range(int(self.low), int(self.high) + 1))
        if len(all_ints) <= n_levels:
            return all_ints
        # Subsample evenly
        indices = np.linspace(0, len(all_ints) - 1, n_levels).astype(int)
        return [all_ints[i] for i in indices]
    else:  # categorical
        return list(self.values)

DOE Generation

generate_doe

generate_doe(design_space: DesignSpace, method: Literal['grid', 'random', 'lhs'] = 'lhs', n_samples: int = 100, seed: int | None = None, grid_levels: int | list[int] | None = None) -> DataFrame

Generate a Design of Experiments from a design space.

This is a convenience function that wraps DesignSpace.sample().

PARAMETER DESCRIPTION
design_space

DesignSpace defining the variables and bounds

TYPE: DesignSpace

method

Sampling method - "grid": Full factorial grid (n_samples ignored) - "random": Uniform random sampling - "lhs": Latin Hypercube Sampling (space-filling)

TYPE: Literal['grid', 'random', 'lhs'] DEFAULT: 'lhs'

n_samples

Number of samples (for random/lhs methods)

TYPE: int DEFAULT: 100

seed

Random seed for reproducibility

TYPE: int | None DEFAULT: None

grid_levels

Number of levels per variable for grid method

TYPE: int | list[int] | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with columns: - case_id: Unique identifier for each case - One column per design variable

Examples:

>>> space = DesignSpace()
>>> space.add_variable("array.nx", "int", low=4, high=16)
>>> space.add_variable("array.ny", "int", low=4, high=16)
>>> space.add_variable("rf.tx_power_w_per_elem", "float", low=0.5, high=2.0)
>>> doe = generate_doe(space, method="lhs", n_samples=50, seed=42)
Source code in src/phased_array_systems/trades/doe.py
def generate_doe(
    design_space: DesignSpace,
    method: Literal["grid", "random", "lhs"] = "lhs",
    n_samples: int = 100,
    seed: int | None = None,
    grid_levels: int | list[int] | None = None,
) -> pd.DataFrame:
    """Generate a Design of Experiments from a design space.

    This is a convenience function that wraps DesignSpace.sample().

    Args:
        design_space: DesignSpace defining the variables and bounds
        method: Sampling method
            - "grid": Full factorial grid (n_samples ignored)
            - "random": Uniform random sampling
            - "lhs": Latin Hypercube Sampling (space-filling)
        n_samples: Number of samples (for random/lhs methods)
        seed: Random seed for reproducibility
        grid_levels: Number of levels per variable for grid method

    Returns:
        DataFrame with columns:
            - case_id: Unique identifier for each case
            - One column per design variable

    Examples:
        >>> space = DesignSpace()
        >>> space.add_variable("array.nx", "int", low=4, high=16)
        >>> space.add_variable("array.ny", "int", low=4, high=16)
        >>> space.add_variable("rf.tx_power_w_per_elem", "float", low=0.5, high=2.0)
        >>> doe = generate_doe(space, method="lhs", n_samples=50, seed=42)
    """
    return design_space.sample(
        method=method,
        n_samples=n_samples,
        seed=seed,
        grid_levels=grid_levels,
    )

generate_doe_from_dict

generate_doe_from_dict(variables: dict, method: Literal['grid', 'random', 'lhs'] = 'lhs', n_samples: int = 100, seed: int | None = None) -> DataFrame

Generate DOE from a simplified dictionary specification.

Convenience function for quick DOE generation without creating explicit DesignVariable objects.

PARAMETER DESCRIPTION
variables

Dictionary mapping variable names to specs: - For continuous: {"name": (low, high)} or {"name": (low, high, "float")} - For discrete: {"name": (low, high, "int")} - For categorical: {"name": ["value1", "value2", ...]}

TYPE: dict

method

Sampling method

TYPE: Literal['grid', 'random', 'lhs'] DEFAULT: 'lhs'

n_samples

Number of samples

TYPE: int DEFAULT: 100

seed

Random seed

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with DOE cases

Examples:

>>> doe = generate_doe_from_dict({
...     "array.nx": (4, 16, "int"),
...     "array.ny": (4, 16, "int"),
...     "rf.tx_power_w_per_elem": (0.5, 2.0),
...     "array.geometry": ["rectangular", "triangular"],
... }, n_samples=50)
Source code in src/phased_array_systems/trades/doe.py
def generate_doe_from_dict(
    variables: dict,
    method: Literal["grid", "random", "lhs"] = "lhs",
    n_samples: int = 100,
    seed: int | None = None,
) -> pd.DataFrame:
    """Generate DOE from a simplified dictionary specification.

    Convenience function for quick DOE generation without creating
    explicit DesignVariable objects.

    Args:
        variables: Dictionary mapping variable names to specs:
            - For continuous: {"name": (low, high)} or {"name": (low, high, "float")}
            - For discrete: {"name": (low, high, "int")}
            - For categorical: {"name": ["value1", "value2", ...]}
        method: Sampling method
        n_samples: Number of samples
        seed: Random seed

    Returns:
        DataFrame with DOE cases

    Examples:
        >>> doe = generate_doe_from_dict({
        ...     "array.nx": (4, 16, "int"),
        ...     "array.ny": (4, 16, "int"),
        ...     "rf.tx_power_w_per_elem": (0.5, 2.0),
        ...     "array.geometry": ["rectangular", "triangular"],
        ... }, n_samples=50)
    """
    space = DesignSpace()

    for name, spec in variables.items():
        if isinstance(spec, list):
            # Categorical
            space.add_variable(name, type="categorical", values=spec)
        elif isinstance(spec, tuple):
            if len(spec) == 2:
                # (low, high) -> float
                space.add_variable(name, type="float", low=spec[0], high=spec[1])
            elif len(spec) == 3:
                # (low, high, type)
                space.add_variable(name, type=spec[2], low=spec[0], high=spec[1])
            else:
                raise ValueError(f"Invalid spec for '{name}': {spec}")
        else:
            raise ValueError(f"Invalid spec type for '{name}': {type(spec)}")

    return generate_doe(space, method=method, n_samples=n_samples, seed=seed)

augment_doe

augment_doe(existing_doe: DataFrame, design_space: DesignSpace, n_additional: int, method: Literal['random', 'lhs'] = 'lhs', seed: int | None = None) -> DataFrame

Add additional samples to an existing DOE.

Useful for adaptive sampling or expanding a study.

PARAMETER DESCRIPTION
existing_doe

Existing DOE DataFrame

TYPE: DataFrame

design_space

DesignSpace defining the variables

TYPE: DesignSpace

n_additional

Number of additional samples to add

TYPE: int

method

Sampling method for new samples

TYPE: Literal['random', 'lhs'] DEFAULT: 'lhs'

seed

Random seed

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Combined DataFrame with original + new cases

Source code in src/phased_array_systems/trades/doe.py
def augment_doe(
    existing_doe: pd.DataFrame,
    design_space: DesignSpace,
    n_additional: int,
    method: Literal["random", "lhs"] = "lhs",
    seed: int | None = None,
) -> pd.DataFrame:
    """Add additional samples to an existing DOE.

    Useful for adaptive sampling or expanding a study.

    Args:
        existing_doe: Existing DOE DataFrame
        design_space: DesignSpace defining the variables
        n_additional: Number of additional samples to add
        method: Sampling method for new samples
        seed: Random seed

    Returns:
        Combined DataFrame with original + new cases
    """
    # Generate new samples
    new_doe = generate_doe(
        design_space,
        method=method,
        n_samples=n_additional,
        seed=seed,
    )

    # Renumber case IDs to avoid collision
    max_existing_id = 0
    for case_id in existing_doe["case_id"]:
        if case_id.startswith("case_"):
            try:
                num = int(case_id.replace("case_", ""))
                max_existing_id = max(max_existing_id, num)
            except ValueError:
                pass

    new_ids = [f"case_{i:05d}" for i in range(max_existing_id + 1, max_existing_id + 1 + n_additional)]
    new_doe["case_id"] = new_ids

    # Combine
    return pd.concat([existing_doe, new_doe], ignore_index=True)

Batch Evaluation

BatchRunner

BatchRunner(scenario: Scenario, requirements: RequirementSet | None = None, architecture_builder: Callable[[dict], Architecture] | None = None)

Parallel batch evaluation of DOE cases.

Evaluates multiple architecture/scenario combinations with case-level error handling, progress reporting, and resume capability.

ATTRIBUTE DESCRIPTION
scenario

Scenario to evaluate against

requirements

Optional requirements for verification

architecture_builder

Function to build Architecture from case dict

Initialize the batch runner.

PARAMETER DESCRIPTION
scenario

Scenario to evaluate

TYPE: Scenario

requirements

Optional requirements for verification

TYPE: RequirementSet | None DEFAULT: None

architecture_builder

Function to convert case dict to Architecture. If None, uses default_architecture_builder.

TYPE: Callable[[dict], Architecture] | None DEFAULT: None

Source code in src/phased_array_systems/trades/runner.py
def __init__(
    self,
    scenario: Scenario,
    requirements: RequirementSet | None = None,
    architecture_builder: Callable[[dict], Architecture] | None = None,
):
    """Initialize the batch runner.

    Args:
        scenario: Scenario to evaluate
        requirements: Optional requirements for verification
        architecture_builder: Function to convert case dict to Architecture.
            If None, uses default_architecture_builder.
    """
    self.scenario = scenario
    self.requirements = requirements
    self.architecture_builder = architecture_builder or default_architecture_builder

run

run(cases: DataFrame, n_workers: int = 1, cache_path: Path | None = None, progress_callback: Callable[[int, int], None] | None = None) -> DataFrame

Run batch evaluation.

PARAMETER DESCRIPTION
cases

DataFrame with design variable columns + case_id

TYPE: DataFrame

n_workers

Number of parallel workers (1 = sequential)

TYPE: int DEFAULT: 1

cache_path

Optional path to save/load partial results

TYPE: Path | None DEFAULT: None

progress_callback

Optional callback(completed, total) for progress

TYPE: Callable[[int, int], None] | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with all input columns + all metric columns

Source code in src/phased_array_systems/trades/runner.py
def run(
    self,
    cases: pd.DataFrame,
    n_workers: int = 1,
    cache_path: Path | None = None,
    progress_callback: Callable[[int, int], None] | None = None,
) -> pd.DataFrame:
    """Run batch evaluation.

    Args:
        cases: DataFrame with design variable columns + case_id
        n_workers: Number of parallel workers (1 = sequential)
        cache_path: Optional path to save/load partial results
        progress_callback: Optional callback(completed, total) for progress

    Returns:
        DataFrame with all input columns + all metric columns
    """
    # Load cached results if available
    completed_ids = set()
    cached_results = []

    if cache_path is not None and cache_path.exists():
        try:
            cached_df = pd.read_parquet(cache_path)
            completed_ids = set(cached_df["case_id"])
            cached_results = cached_df.to_dict("records")
            print(f"Resuming: {len(completed_ids)} cases already completed")
        except Exception:
            pass  # Ignore cache errors

    # Filter to uncompleted cases
    cases_to_run = cases[~cases["case_id"].isin(completed_ids)]
    total_cases = len(cases)
    remaining = len(cases_to_run)

    if remaining == 0:
        print("All cases already completed")
        return pd.DataFrame(cached_results)

    print(f"Running {remaining} cases ({len(completed_ids)} cached)")

    # Convert to list of dicts for processing
    case_dicts = cases_to_run.to_dict("records")

    results = list(cached_results)
    start_time = time.perf_counter()

    if n_workers == 1:
        # Sequential execution
        for i, case_row in enumerate(case_dicts):
            result = _evaluate_single_case(
                case_row,
                self.scenario,
                self.requirements,
                self.architecture_builder,
            )
            results.append(result)

            if progress_callback:
                progress_callback(len(results), total_cases)

            # Save intermediate results
            if cache_path is not None and (i + 1) % 10 == 0:
                self._save_cache(results, cache_path)

    else:
        # Parallel execution
        with ProcessPoolExecutor(max_workers=n_workers) as executor:
            futures = {
                executor.submit(
                    _evaluate_single_case,
                    case_row,
                    self.scenario,
                    self.requirements,
                    self.architecture_builder,
                ): case_row["case_id"]
                for case_row in case_dicts
            }

            for future in as_completed(futures):
                result = future.result()
                results.append(result)

                if progress_callback:
                    progress_callback(len(results), total_cases)

                # Save intermediate results periodically
                if cache_path is not None and len(results) % 10 == 0:
                    self._save_cache(results, cache_path)

    elapsed = time.perf_counter() - start_time
    print(f"Completed {remaining} cases in {elapsed:.1f}s ({elapsed/remaining:.3f}s/case)")

    # Final save
    if cache_path is not None:
        self._save_cache(results, cache_path)

    # Build result DataFrame
    result_df = pd.DataFrame(results)

    # Ensure consistent column order
    cols = list(cases.columns) + [
        c for c in result_df.columns if c not in cases.columns
    ]
    result_df = result_df[[c for c in cols if c in result_df.columns]]

    return result_df

Pareto Analysis

filter_feasible

filter_feasible(results: DataFrame, requirements: RequirementSet | None = None, verification_column: str = 'verification.passes') -> DataFrame

Filter results to only feasible (requirement-passing) designs.

PARAMETER DESCRIPTION
results

DataFrame with evaluation results

TYPE: DataFrame

requirements

Optional RequirementSet to verify against

TYPE: RequirementSet | None DEFAULT: None

verification_column

Column name for verification status

TYPE: str DEFAULT: 'verification.passes'

RETURNS DESCRIPTION
DataFrame

DataFrame containing only feasible designs

Source code in src/phased_array_systems/trades/pareto.py
def filter_feasible(
    results: pd.DataFrame,
    requirements: RequirementSet | None = None,
    verification_column: str = "verification.passes",
) -> pd.DataFrame:
    """Filter results to only feasible (requirement-passing) designs.

    Args:
        results: DataFrame with evaluation results
        requirements: Optional RequirementSet to verify against
        verification_column: Column name for verification status

    Returns:
        DataFrame containing only feasible designs
    """
    if requirements is not None and len(requirements) > 0:
        # Re-verify against requirements
        mask = []
        for _, row in results.iterrows():
            metrics = row.to_dict()
            report = requirements.verify(metrics)
            mask.append(report.passes)
        return results[mask].copy()

    elif verification_column in results.columns:
        # Use pre-computed verification
        return results[results[verification_column] == 1.0].copy()

    else:
        # No filtering - return all
        return results.copy()

extract_pareto

extract_pareto(results: DataFrame, objectives: list[tuple[str, OptimizeDirection]], include_dominated: bool = False) -> DataFrame

Extract Pareto-optimal designs from results.

A design is Pareto-optimal if no other design is better in all objectives.

PARAMETER DESCRIPTION
results

DataFrame with evaluation results

TYPE: DataFrame

objectives

List of (column_name, direction) tuples where direction is "minimize" or "maximize"

TYPE: list[tuple[str, OptimizeDirection]]

include_dominated

If True, include a 'pareto_optimal' column marking Pareto-optimal rows

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
DataFrame

DataFrame containing only Pareto-optimal designs (or all designs

DataFrame

with pareto_optimal column if include_dominated=True)

Examples:

>>> pareto = extract_pareto(results, [
...     ("cost_usd", "minimize"),
...     ("eirp_dbw", "maximize"),
... ])
Source code in src/phased_array_systems/trades/pareto.py
def extract_pareto(
    results: pd.DataFrame,
    objectives: list[tuple[str, OptimizeDirection]],
    include_dominated: bool = False,
) -> pd.DataFrame:
    """Extract Pareto-optimal designs from results.

    A design is Pareto-optimal if no other design is better in all objectives.

    Args:
        results: DataFrame with evaluation results
        objectives: List of (column_name, direction) tuples where direction
            is "minimize" or "maximize"
        include_dominated: If True, include a 'pareto_optimal' column marking
            Pareto-optimal rows

    Returns:
        DataFrame containing only Pareto-optimal designs (or all designs
        with pareto_optimal column if include_dominated=True)

    Examples:
        >>> pareto = extract_pareto(results, [
        ...     ("cost_usd", "minimize"),
        ...     ("eirp_dbw", "maximize"),
        ... ])
    """
    if len(results) == 0:
        return results.copy()

    # Convert to minimization (negate maximization objectives)
    obj_matrix = np.zeros((len(results), len(objectives)))
    for i, (name, direction) in enumerate(objectives):
        values = results[name].values
        if direction == "maximize":
            obj_matrix[:, i] = -values
        else:
            obj_matrix[:, i] = values

    # Find Pareto-optimal points
    is_pareto = np.ones(len(results), dtype=bool)

    for i in range(len(results)):
        if not is_pareto[i]:
            continue

        # Check if any other point dominates point i
        for j in range(len(results)):
            if i == j or not is_pareto[j]:
                continue

            # j dominates i if j is <= in all objectives and < in at least one
            all_leq = np.all(obj_matrix[j] <= obj_matrix[i])
            any_lt = np.any(obj_matrix[j] < obj_matrix[i])

            if all_leq and any_lt:
                is_pareto[i] = False
                break

    if include_dominated:
        result_df = results.copy()
        result_df["pareto_optimal"] = is_pareto
        return result_df
    else:
        return results[is_pareto].copy()

rank_pareto

rank_pareto(pareto: DataFrame, objectives: list[tuple[str, OptimizeDirection]], weights: list[float] | None = None, method: Literal['weighted_sum', 'topsis'] = 'weighted_sum') -> DataFrame

Rank Pareto-optimal designs using weighted objectives.

PARAMETER DESCRIPTION
pareto

DataFrame with Pareto-optimal designs

TYPE: DataFrame

objectives

List of (column_name, direction) tuples

TYPE: list[tuple[str, OptimizeDirection]]

weights

Weights for each objective (default: equal weights)

TYPE: list[float] | None DEFAULT: None

method

Ranking method ("weighted_sum" or "topsis")

TYPE: Literal['weighted_sum', 'topsis'] DEFAULT: 'weighted_sum'

RETURNS DESCRIPTION
DataFrame

DataFrame with added 'rank' and 'score' columns, sorted by rank

Source code in src/phased_array_systems/trades/pareto.py
def rank_pareto(
    pareto: pd.DataFrame,
    objectives: list[tuple[str, OptimizeDirection]],
    weights: list[float] | None = None,
    method: Literal["weighted_sum", "topsis"] = "weighted_sum",
) -> pd.DataFrame:
    """Rank Pareto-optimal designs using weighted objectives.

    Args:
        pareto: DataFrame with Pareto-optimal designs
        objectives: List of (column_name, direction) tuples
        weights: Weights for each objective (default: equal weights)
        method: Ranking method ("weighted_sum" or "topsis")

    Returns:
        DataFrame with added 'rank' and 'score' columns, sorted by rank
    """
    if len(pareto) == 0:
        return pareto.copy()

    n_obj = len(objectives)
    if weights is None:
        weights = [1.0 / n_obj] * n_obj
    else:
        # Normalize weights
        total = sum(weights)
        weights = [w / total for w in weights]

    # Extract and normalize objective values
    obj_matrix = np.zeros((len(pareto), n_obj))
    for i, (name, direction) in enumerate(objectives):
        values = pareto[name].values.astype(float)
        # Normalize to [0, 1]
        min_val, max_val = values.min(), values.max()
        if max_val > min_val:
            normalized = (values - min_val) / (max_val - min_val)
        else:
            normalized = np.zeros_like(values)

        # Flip for maximization (higher is better -> lower normalized score)
        if direction == "maximize":
            normalized = 1 - normalized

        obj_matrix[:, i] = normalized

    if method == "weighted_sum":
        # Weighted sum (lower is better)
        scores = np.sum(obj_matrix * weights, axis=1)

    elif method == "topsis":
        # TOPSIS method
        # Ideal point: min of all (already normalized to minimization)
        ideal = np.zeros(n_obj)
        # Anti-ideal: max of all
        anti_ideal = np.ones(n_obj)

        # Distance to ideal and anti-ideal
        d_ideal = np.sqrt(np.sum(weights * (obj_matrix - ideal) ** 2, axis=1))
        d_anti = np.sqrt(np.sum(weights * (obj_matrix - anti_ideal) ** 2, axis=1))

        # TOPSIS score (higher is better, so negate for ranking)
        with np.errstate(divide="ignore", invalid="ignore"):
            scores = d_ideal / (d_ideal + d_anti)
            scores = np.nan_to_num(scores, nan=1.0)

    else:
        raise ValueError(f"Unknown ranking method: {method}")

    # Add scores and rank
    result_df = pareto.copy()
    result_df["pareto_score"] = scores
    result_df["pareto_rank"] = result_df["pareto_score"].rank(method="min").astype(int)

    return result_df.sort_values("pareto_rank")

compute_hypervolume

compute_hypervolume(pareto: DataFrame, objectives: list[tuple[str, OptimizeDirection]], reference_point: list[float] | None = None) -> float

Compute hypervolume indicator for a Pareto front.

The hypervolume is the volume of objective space dominated by the Pareto front, bounded by a reference point. Higher is better.

PARAMETER DESCRIPTION
pareto

DataFrame with Pareto-optimal designs

TYPE: DataFrame

objectives

List of (column_name, direction) tuples

TYPE: list[tuple[str, OptimizeDirection]]

reference_point

Reference point in objective space (default: worst point + 10%)

TYPE: list[float] | None DEFAULT: None

RETURNS DESCRIPTION
float

Hypervolume value

Note

For >3 objectives, this uses a simple approximation.

Source code in src/phased_array_systems/trades/pareto.py
def compute_hypervolume(
    pareto: pd.DataFrame,
    objectives: list[tuple[str, OptimizeDirection]],
    reference_point: list[float] | None = None,
) -> float:
    """Compute hypervolume indicator for a Pareto front.

    The hypervolume is the volume of objective space dominated by the
    Pareto front, bounded by a reference point. Higher is better.

    Args:
        pareto: DataFrame with Pareto-optimal designs
        objectives: List of (column_name, direction) tuples
        reference_point: Reference point in objective space (default: worst point + 10%)

    Returns:
        Hypervolume value

    Note:
        For >3 objectives, this uses a simple approximation.
    """
    if len(pareto) == 0:
        return 0.0

    n_obj = len(objectives)

    # Extract objective values (convert to minimization)
    obj_matrix = np.zeros((len(pareto), n_obj))
    for i, (name, direction) in enumerate(objectives):
        values = pareto[name].values.astype(float)
        if direction == "maximize":
            obj_matrix[:, i] = -values
        else:
            obj_matrix[:, i] = values

    # Set reference point if not provided
    if reference_point is None:
        worst = obj_matrix.max(axis=0)
        reference_point = worst * 1.1 + 0.1  # 10% beyond worst

    ref = np.array(reference_point)

    # For 2D, compute exact hypervolume
    if n_obj == 2:
        # Sort by first objective
        sorted_idx = np.argsort(obj_matrix[:, 0])
        sorted_obj = obj_matrix[sorted_idx]

        hv = 0.0
        prev_y = ref[1]
        for i in range(len(sorted_obj)):
            x, y = sorted_obj[i]
            if x < ref[0] and y < ref[1]:
                hv += (ref[0] - x) * (prev_y - y)
                prev_y = y

        return hv

    else:
        # Monte Carlo approximation for higher dimensions
        n_samples = 10000
        rng = np.random.default_rng(42)

        # Sample random points in hyperbox
        samples = np.zeros((n_samples, n_obj))
        for i in range(n_obj):
            min_val = obj_matrix[:, i].min()
            samples[:, i] = rng.uniform(min_val, ref[i], n_samples)

        # Count points dominated by at least one Pareto point
        dominated = np.zeros(n_samples, dtype=bool)
        for pareto_point in obj_matrix:
            is_dominated = np.all(samples >= pareto_point, axis=1)
            dominated |= is_dominated

        # Estimate hypervolume
        box_volume = np.prod(ref - obj_matrix.min(axis=0))
        hv = box_volume * dominated.sum() / n_samples

        return hv

Usage Examples

Complete Trade Study

from phased_array_systems.trades import (
    DesignSpace, generate_doe, BatchRunner,
    filter_feasible, extract_pareto, rank_pareto
)
from phased_array_systems.requirements import Requirement, RequirementSet

# Define design space
space = (
    DesignSpace(name="Array Trade")
    .add_variable("array.nx", type="categorical", values=[4, 8, 16])
    .add_variable("array.ny", type="categorical", values=[4, 8, 16])
    .add_variable("rf.tx_power_w_per_elem", type="float", low=0.5, high=3.0)
    # Fixed parameters
    .add_variable("array.geometry", type="categorical", values=["rectangular"])
    .add_variable("array.dx_lambda", type="float", low=0.5, high=0.5)
    .add_variable("array.dy_lambda", type="float", low=0.5, high=0.5)
    .add_variable("array.enforce_subarray_constraint", type="categorical", values=[True])
    .add_variable("rf.pa_efficiency", type="float", low=0.3, high=0.3)
)

# Define requirements
requirements = RequirementSet(requirements=[
    Requirement("REQ-001", "Min EIRP", "eirp_dbw", ">=", 35.0),
    Requirement("REQ-002", "Max Cost", "cost_usd", "<=", 100000.0),
])

# Generate DOE
doe = generate_doe(space, method="lhs", n_samples=100, seed=42)

# Run batch evaluation
runner = BatchRunner(scenario, requirements)
results = runner.run(doe, n_workers=1)

# Filter and extract Pareto
feasible = filter_feasible(results, requirements)
objectives = [("cost_usd", "minimize"), ("eirp_dbw", "maximize")]
pareto = extract_pareto(feasible, objectives)

# Rank Pareto designs
ranked = rank_pareto(pareto, objectives, weights=[0.5, 0.5])
print(f"Top design: {ranked.iloc[0]['case_id']}")

Quick DOE

doe = generate_doe_from_dict(
    {
        "array.nx": (4, 16, "int"),
        "array.ny": (4, 16, "int"),
        "rf.tx_power_w_per_elem": (0.5, 3.0),
        "array.geometry": ["rectangular", "triangular"],
    },
    n_samples=50,
    seed=42,
)

Progress Callback

def show_progress(completed, total):
    print(f"Progress: {completed}/{total} ({completed/total*100:.0f}%)")

results = runner.run(doe, progress_callback=show_progress)

Hypervolume

from phased_array_systems.trades import compute_hypervolume

hv = compute_hypervolume(pareto, objectives)
print(f"Hypervolume: {hv:.2e}")

Type Aliases

from phased_array_systems.types import OptimizeDirection

# OptimizeDirection = Literal["minimize", "maximize"]

objectives = [
    ("cost_usd", "minimize"),   # Lower is better
    ("eirp_dbw", "maximize"),   # Higher is better
]

See Also