"""Evaluation metrics for inventory prediction models."""
import numpy as np
import pandas as pd
[docs]
def compute_mae(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""Compute Mean Absolute Error."""
return np.mean(np.abs(y_true - y_pred))
[docs]
def compute_rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""Compute Root Mean Squared Error."""
return np.sqrt(np.mean((y_true - y_pred) ** 2))
[docs]
def compute_mape(y_true: np.ndarray, y_pred: np.ndarray, epsilon: float = 1.0) -> float:
"""Compute Mean Absolute Percentage Error."""
return np.mean(np.abs(y_true - y_pred) / (np.abs(y_true) + epsilon))
[docs]
def inventory_to_states(
inventory: pd.DataFrame,
capacities: dict[str, float],
thresholds: dict[str, float],
) -> pd.DataFrame:
"""Convert inventory counts to states (empty/normal/full).
Args:
inventory: DataFrame with bike counts (index=stations, columns=times)
capacities: Dict mapping station -> capacity
thresholds: Dict with "empty" and "full" thresholds (as fraction of capacity)
Returns:
DataFrame with states ("empty", "normal", "full")
"""
states = pd.DataFrame(index=inventory.index, columns=inventory.columns, dtype=str)
for station in inventory.index:
capacity = capacities.get(station, 30)
empty_thresh = capacity * thresholds.get("empty", 0.1)
full_thresh = capacity * thresholds.get("full", 0.9)
for col in inventory.columns:
bikes = inventory.loc[station, col]
if bikes <= empty_thresh:
states.loc[station, col] = "empty"
elif bikes >= full_thresh:
states.loc[station, col] = "full"
else:
states.loc[station, col] = "normal"
return states
[docs]
def compute_state_metrics(
true_states: pd.DataFrame,
pred_states: pd.DataFrame,
state: str,
) -> dict[str, float]:
"""Compute precision, recall, F1 for a specific state.
Args:
true_states: DataFrame with actual states
pred_states: DataFrame with predicted states
state: Which state to evaluate ("empty" or "full")
Returns:
Dictionary with precision, recall, f1, count
"""
# Flatten and align
true_flat = true_states.values.flatten()
pred_flat = pred_states.values.flatten()
# Binary classification metrics
true_positive = np.sum((true_flat == state) & (pred_flat == state))
false_positive = np.sum((true_flat != state) & (pred_flat == state))
false_negative = np.sum((true_flat == state) & (pred_flat != state))
# true_negative not used but kept for documentation: np.sum((true_flat != state) & (pred_flat != state))
# Compute metrics
precision = (
true_positive / (true_positive + false_positive)
if (true_positive + false_positive) > 0
else 0.0
)
recall = (
true_positive / (true_positive + false_negative)
if (true_positive + false_negative) > 0
else 0.0
)
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
return {
f"{state}_precision": precision,
f"{state}_recall": recall,
f"{state}_f1": f1,
f"{state}_count": int(np.sum(true_flat == state)),
f"{state}_predicted_count": int(np.sum(pred_flat == state)),
}
[docs]
def compute_inventory_metrics(
true_inventory: pd.DataFrame,
pred_inventory: pd.DataFrame,
capacities: dict[str, float],
thresholds: dict[str, float],
) -> dict[str, float]:
"""Compute all evaluation metrics for inventory prediction.
Args:
true_inventory: DataFrame with actual bike counts
pred_inventory: DataFrame with predicted bike counts
capacities: Dict mapping station -> capacity
thresholds: Dict with "empty" and "full" thresholds
Returns:
Dictionary with all metrics
"""
# Align dataframes
common_stations = true_inventory.index.intersection(pred_inventory.index)
common_times = true_inventory.columns.intersection(pred_inventory.columns)
if len(common_stations) == 0 or len(common_times) == 0:
return {"error": "No overlap between true and predicted data"}
true_inv = true_inventory.loc[common_stations, common_times]
pred_inv = pred_inventory.loc[common_stations, common_times]
# Flatten for basic metrics
true_flat = true_inv.values.flatten()
pred_flat = pred_inv.values.flatten()
metrics = {
"inventory_mae": compute_mae(true_flat, pred_flat),
"inventory_rmse": compute_rmse(true_flat, pred_flat),
"inventory_mape": compute_mape(true_flat, pred_flat),
"n_predictions": len(true_flat),
"n_stations": len(common_stations),
"n_time_periods": len(common_times),
}
# Correlation
if len(true_flat) > 1 and np.std(true_flat) > 0 and np.std(pred_flat) > 0:
correlation = np.corrcoef(true_flat, pred_flat)[0, 1]
metrics["correlation"] = correlation if not np.isnan(correlation) else 0.0
else:
metrics["correlation"] = 0.0
# Convert to states
true_states = inventory_to_states(true_inv, capacities, thresholds)
pred_states = inventory_to_states(pred_inv, capacities, thresholds)
# State-based metrics
for state in ["empty", "full"]:
state_metrics = compute_state_metrics(true_states, pred_states, state)
metrics.update(state_metrics)
# Overall state accuracy
correct = np.sum(true_states.values == pred_states.values)
total = true_states.size
metrics["state_accuracy"] = correct / total if total > 0 else 0.0
# Per-station metrics
station_maes = []
for station in common_stations:
true_s = true_inv.loc[station].values
pred_s = pred_inv.loc[station].values
station_maes.append(compute_mae(true_s, pred_s))
metrics["station_mae_mean"] = np.mean(station_maes)
metrics["station_mae_std"] = np.std(station_maes)
return metrics
[docs]
def summarize_fold_results(fold_results: list) -> dict[str, tuple[float, float]]:
"""Summarize results across cross-validation folds.
Args:
fold_results: List of metric dictionaries from each fold
Returns:
Dictionary mapping metric -> (mean, std)
"""
if not fold_results:
return {}
# Get all metric names
metric_names = fold_results[0].keys()
# Skip non-numeric metrics
skip_metrics = {"fold_id", "train_start", "test_start", "train_end", "test_end", "error"}
summary = {}
for metric in metric_names:
if metric in skip_metrics:
continue
values = [r[metric] for r in fold_results if metric in r]
if values:
try:
numeric_values = [float(v) for v in values]
summary[metric] = (np.mean(numeric_values), np.std(numeric_values))
except (ValueError, TypeError):
continue
return summary