"""Markov Chain model for bike inventory prediction using Monte Carlo simulation."""
from typing import Any
import numpy as np
import pandas as pd
from scipy import sparse
from .base import BaseModel
[docs]
class MarkovModel(BaseModel):
"""Markov Chain model for predicting bike station inventory.
This model:
1. Builds time-dependent transition matrices P[i→j | hour, is_weekend]
2. Learns departure rates per station per time context
3. Simulates multiple random walks and averages predictions
Key features:
- State-dependent: Departures depend on current inventory
- Capacity-aware: Arrivals capped at station capacity
- Monte Carlo: Average over N simulations for robust predictions
"""
def __init__(self, config: dict):
super().__init__(config)
# Model parameters
model_config = config.get("model", {}).get("markov", {})
self.smoothing_alpha = model_config.get("smoothing_alpha", 0.0)
self.min_transitions = model_config.get("min_transitions", 1)
self.n_simulations = model_config.get("n_simulations", 50)
self.random_seed = model_config.get("random_seed", None) # None = use system entropy
# Learned parameters
self.stations = []
self.station_to_idx = {}
self.idx_to_station = {}
# Transition matrices: (hour, is_weekend) -> sparse matrix
self.transition_matrices = {}
# Departure rates: (hour, is_weekend) -> array of rates per station
self.departure_rate_arrays = {}
# Fallback for missing contexts
self.global_transition_matrix = None
self.global_departure_rates = None
[docs]
def fit(
self,
trips: pd.DataFrame,
station_stats: pd.DataFrame,
) -> "BaseModel":
"""Build time-dependent transition matrices from trip data.
Uses vectorized pandas operations for speed.
"""
print(f"Fitting {self.get_name()} on {len(trips):,} trips...")
# Get list of stations and create index mappings
self.stations = station_stats.index.tolist()
self.station_capacities = station_stats["capacity"].to_dict()
self.station_to_idx = {s: i for i, s in enumerate(self.stations)}
self.idx_to_station = dict(enumerate(self.stations))
n_stations = len(self.stations)
# Build capacity array for vectorized operations
self.capacity_array = np.array(
[self.station_capacities.get(s, 30) for s in self.stations], dtype=float
)
print(f" Building transition matrices for {n_stations} stations...")
# Prepare data
trips = trips.copy()
if "hour" not in trips.columns:
trips["hour"] = trips["started_at"].dt.hour
if "is_weekend" not in trips.columns:
trips["is_weekend"] = trips["started_at"].dt.dayofweek.isin([5, 6])
# Filter to known stations
trips = trips[
trips["start_station_name"].isin(self.station_to_idx)
& trips["end_station_name"].isin(self.station_to_idx)
].copy()
# Map station names to indices (vectorized)
trips["from_idx"] = trips["start_station_name"].map(self.station_to_idx)
trips["to_idx"] = trips["end_station_name"].map(self.station_to_idx)
# Count days per context for rate normalization
trips["date"] = trips["started_at"].dt.date
days_per_context = trips.groupby(["hour", "is_weekend"])["date"].nunique()
# Build all transition counts at once using groupby (FAST!)
print(" Counting transitions (vectorized)...")
transition_counts = (
trips.groupby(["hour", "is_weekend", "from_idx", "to_idx"])
.size()
.reset_index(name="count")
)
departure_counts = (
trips.groupby(["hour", "is_weekend", "from_idx"]).size().reset_index(name="departures")
)
# Build transition matrices for each context
print(" Building sparse matrices...")
contexts = [(h, w) for h in range(24) for w in [False, True]]
for hour, is_weekend in contexts:
# Filter to this context
ctx_trans = transition_counts[
(transition_counts["hour"] == hour)
& (transition_counts["is_weekend"] == is_weekend)
]
ctx_deps = departure_counts[
(departure_counts["hour"] == hour) & (departure_counts["is_weekend"] == is_weekend)
]
if len(ctx_trans) < self.min_transitions:
continue
# Get total departures per origin
dep_totals = ctx_deps.set_index("from_idx")["departures"]
# Compute probabilities
ctx_trans = ctx_trans.copy()
dep_totals_local = dep_totals # Capture for lambda
ctx_trans["prob"] = ctx_trans.apply(
lambda row, dt=dep_totals_local: row["count"] / dt.get(row["from_idx"], 1),
axis=1,
)
# Create sparse matrix
P = sparse.csr_matrix(
(
ctx_trans["prob"].values,
(
ctx_trans["from_idx"].values.astype(int),
ctx_trans["to_idx"].values.astype(int),
),
),
shape=(n_stations, n_stations),
)
self.transition_matrices[(hour, is_weekend)] = P
# Compute departure rates as array (vectorized for simulation)
n_days = days_per_context.get((hour, is_weekend), 1)
dep_rate_array = np.zeros(n_stations)
for _, row in ctx_deps.iterrows():
idx = int(row["from_idx"])
dep_rate_array[idx] = row["departures"] / n_days
self.departure_rate_arrays[(hour, is_weekend)] = dep_rate_array
# Build global fallback
self._build_global_fallback(trips, n_stations)
self.is_fitted = True
print(f" Built {len(self.transition_matrices)} transition matrices")
print(f" Sparsity: {self._compute_avg_sparsity():.1%}")
print(f" Simulations per prediction: {self.n_simulations}")
return self
def _build_global_fallback(self, trips: pd.DataFrame, n_stations: int):
"""Build a global transition matrix as fallback."""
# Count all transitions (vectorized)
trans_counts = trips.groupby(["from_idx", "to_idx"]).size().reset_index(name="count")
dep_counts = trips.groupby("from_idx").size()
# Compute probabilities
trans_counts["prob"] = trans_counts.apply(
lambda row: row["count"] / dep_counts.get(row["from_idx"], 1), axis=1
)
# Build sparse matrix
self.global_transition_matrix = sparse.csr_matrix(
(
trans_counts["prob"].values,
(
trans_counts["from_idx"].values.astype(int),
trans_counts["to_idx"].values.astype(int),
),
),
shape=(n_stations, n_stations),
)
# Global departure rates as array
n_hours = trips["started_at"].dt.floor("h").nunique()
self.global_departure_rates = np.zeros(n_stations)
for idx, count in dep_counts.items():
self.global_departure_rates[int(idx)] = count / max(n_hours, 1)
def _compute_avg_sparsity(self) -> float:
"""Compute average sparsity of transition matrices."""
if not self.transition_matrices:
return 1.0
n_stations = len(self.stations)
total_possible = n_stations * n_stations
sparsities = []
for P in self.transition_matrices.values():
n_nonzero = P.nnz
sparsity = 1 - (n_nonzero / total_possible)
sparsities.append(sparsity)
return np.mean(sparsities)
def _simulate_one_walk(
self,
initial_inventory: np.ndarray,
times: pd.DatetimeIndex,
rng: np.random.Generator,
deterministic: bool = False,
) -> np.ndarray:
"""Simulate a single random walk.
Args:
initial_inventory: Starting bike counts (array)
times: Timestamps to simulate
rng: Random number generator
deterministic: If True, use expected values instead of sampling
Returns:
Array of shape (n_stations, n_times) with inventory trajectory
"""
n_stations = len(self.stations)
n_times = len(times)
# Initialize trajectory
trajectory = np.zeros((n_stations, n_times))
trajectory[:, 0] = initial_inventory
current = initial_inventory.copy()
for t_idx in range(1, n_times):
t = times[t_idx]
hour = t.hour
is_weekend = t.dayofweek in [5, 6]
context = (hour, is_weekend)
# Get transition matrix and departure rates
P = self.transition_matrices.get(context, self.global_transition_matrix)
dep_rates = self.departure_rate_arrays.get(context, self.global_departure_rates)
if P is None:
trajectory[:, t_idx] = current
continue
if deterministic:
# --- DETERMINISTIC: Use expected values ---
# Departures = rate, capped at available
departures = np.minimum(dep_rates, current)
# Arrivals = P.T @ departures (expected arrivals)
arrivals = P.T @ departures
else:
# --- STOCHASTIC: Sample from distributions ---
# Expected departures scaled by availability
availability_factor = current / np.maximum(self.capacity_array, 1)
expected_departures = dep_rates * availability_factor
# Sample actual departures (Poisson)
departures = np.minimum(
rng.poisson(expected_departures), current.astype(int)
).astype(float)
# Route departures through transition matrix
arrivals = np.zeros(n_stations)
for i in range(n_stations):
n_depart = int(departures[i])
if n_depart > 0:
probs = P.getrow(i).toarray().flatten()
prob_sum = probs.sum()
if prob_sum > 0:
probs = probs / prob_sum
destinations = rng.choice(n_stations, size=n_depart, p=probs)
for dest in destinations:
arrivals[dest] += 1
# --- UPDATE INVENTORY ---
new_inventory = current - departures + arrivals
# Clip to [0, capacity]
new_inventory = np.clip(new_inventory, 0, self.capacity_array)
current = new_inventory
trajectory[:, t_idx] = current
return trajectory
[docs]
def predict_inventory(
self,
initial_inventory: pd.Series,
start_time: pd.Timestamp,
end_time: pd.Timestamp,
freq: str = "1h",
) -> pd.DataFrame:
"""Predict inventory using Monte Carlo random walks.
Runs N simulations and averages the results.
Args:
initial_inventory: Starting bike count per station
start_time: Start time for prediction
end_time: End time for prediction
freq: Time frequency
Returns:
DataFrame with predicted inventory (mean over simulations)
"""
if not self.is_fitted:
raise ValueError("Model must be fitted before prediction")
# Generate time periods
times = pd.date_range(start=start_time, end=end_time, freq=freq, inclusive="left")
n_times = len(times)
n_stations = len(self.stations)
# Convert initial inventory to array (aligned with self.stations)
init_array = np.array([initial_inventory.get(s, 0) for s in self.stations], dtype=float)
# Run simulations
all_trajectories = np.zeros((self.n_simulations, n_stations, n_times))
rng = np.random.default_rng(seed=self.random_seed)
# Use deterministic mode for single simulation
deterministic = self.n_simulations == 1
for sim in range(self.n_simulations):
all_trajectories[sim] = self._simulate_one_walk(init_array, times, rng, deterministic)
# Average over simulations
mean_trajectory = all_trajectories.mean(axis=0)
# Convert to DataFrame
predictions = pd.DataFrame(
mean_trajectory,
index=self.stations,
columns=times,
)
return predictions
[docs]
def predict_with_uncertainty(
self,
initial_inventory: pd.Series,
start_time: pd.Timestamp,
end_time: pd.Timestamp,
freq: str = "1h",
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Predict inventory with uncertainty bounds.
Returns mean, lower (5th percentile), and upper (95th percentile).
"""
if not self.is_fitted:
raise ValueError("Model must be fitted before prediction")
times = pd.date_range(start=start_time, end=end_time, freq=freq, inclusive="left")
n_times = len(times)
n_stations = len(self.stations)
init_array = np.array([initial_inventory.get(s, 0) for s in self.stations], dtype=float)
all_trajectories = np.zeros((self.n_simulations, n_stations, n_times))
rng = np.random.default_rng(seed=self.random_seed)
for sim in range(self.n_simulations):
all_trajectories[sim] = self._simulate_one_walk(init_array, times, rng)
mean_traj = all_trajectories.mean(axis=0)
lower_traj = np.percentile(all_trajectories, 5, axis=0)
upper_traj = np.percentile(all_trajectories, 95, axis=0)
mean_df = pd.DataFrame(mean_traj, index=self.stations, columns=times)
lower_df = pd.DataFrame(lower_traj, index=self.stations, columns=times)
upper_df = pd.DataFrame(upper_traj, index=self.stations, columns=times)
return mean_df, lower_df, upper_df
[docs]
def predict_flow(
self,
stations: list,
start_time: pd.Timestamp,
end_time: pd.Timestamp,
freq: str = "1h",
) -> pd.DataFrame:
"""Predict net flow (legacy method, kept for compatibility)."""
if not self.is_fitted:
raise ValueError("Model must be fitted before prediction")
times = pd.date_range(start=start_time, end=end_time, freq=freq, inclusive="left")
known_stations = [s for s in stations if s in self.station_to_idx]
predictions = pd.DataFrame(0.0, index=stations, columns=times)
for t in times:
hour = t.hour
is_weekend = t.dayofweek in [5, 6]
context = (hour, is_weekend)
P = self.transition_matrices.get(context, self.global_transition_matrix)
dep_rates = self.departure_rate_arrays.get(context, self.global_departure_rates)
if P is None:
continue
arrivals_vector = P.T @ dep_rates
for station in known_stations:
idx = self.station_to_idx[station]
departures = dep_rates[idx]
arrivals = arrivals_vector[idx]
predictions.loc[station, t] = arrivals - departures
return predictions
[docs]
def get_transition_matrix(self, hour: int, is_weekend: bool) -> tuple[sparse.csr_matrix, list]:
"""Get transition matrix for a specific context."""
context = (hour, is_weekend)
P = self.transition_matrices.get(context, self.global_transition_matrix)
return P, self.stations
[docs]
def get_top_destinations(
self,
station: str,
hour: int,
is_weekend: bool,
top_k: int = 5,
) -> pd.DataFrame:
"""Get top destinations from a station for a given context."""
if station not in self.station_to_idx:
return pd.DataFrame()
context = (hour, is_weekend)
P = self.transition_matrices.get(context, self.global_transition_matrix)
if P is None:
return pd.DataFrame()
station_idx = self.station_to_idx[station]
row = P.getrow(station_idx).toarray().flatten()
top_indices = np.argsort(row)[-top_k:][::-1]
results = []
for idx in top_indices:
if row[idx] > 0:
results.append(
{
"destination": self.idx_to_station[idx],
"probability": row[idx],
}
)
return pd.DataFrame(results)
[docs]
def get_params(self) -> dict[str, Any]:
"""Return model parameters."""
return {
"name": self.get_name(),
"n_stations": len(self.stations),
"n_transition_matrices": len(self.transition_matrices),
"avg_sparsity": self._compute_avg_sparsity() if self.transition_matrices else None,
"smoothing_alpha": self.smoothing_alpha,
"n_simulations": self.n_simulations,
"random_seed": self.random_seed,
}