Source code for citibike.models.baseline

"""Temporal Flow model using historical average flow conditioned on time."""

from typing import Any

import numpy as np
import pandas as pd

from .base import BaseModel


[docs] class TemporalFlowModel(BaseModel): """Temporal flow model using time-conditioned historical average net flow. This model: 1. Computes average hourly net flow (arrivals - departures) per station, conditioned on hour of day and weekend/weekday 2. Predicts future inventory by applying the appropriate average flow: inventory[t+1] = inventory[t] + avg_net_flow[station, hour, is_weekend] This captures both station-specific and temporal patterns in bike flow. """ def __init__(self, config: dict): super().__init__(config) self.hourly_net_flow = {} # (station, hour, is_weekend) -> avg net flow self.global_avg_flow = 0.0 self.stations = []
[docs] def fit( self, trips: pd.DataFrame, station_stats: pd.DataFrame, ) -> "BaseModel": """Compute historical average hourly net flow per station. Args: trips: Trip data with start/end stations and timestamps station_stats: Station info with capacity """ print(f"Fitting {self.get_name()} on {len(trips):,} trips...") # Store stations and capacities self.stations = station_stats.index.tolist() self.station_capacities = station_stats["capacity"].to_dict() # Ensure time features exist trips = trips.copy() if "hour" not in trips.columns: trips["hour"] = trips["started_at"].dt.hour if "is_weekend" not in trips.columns: trips["is_weekend"] = trips["started_at"].dt.dayofweek.isin([5, 6]) # Compute departures per (station, hour, is_weekend) departures = ( trips.groupby(["start_station_name", "hour", "is_weekend"]) .size() .reset_index(name="departures") ) departures.columns = ["station", "hour", "is_weekend", "departures"] # Compute arrivals per (station, hour, is_weekend) arrivals = ( trips.groupby(["end_station_name", "hour", "is_weekend"]) .size() .reset_index(name="arrivals") ) arrivals.columns = ["station", "hour", "is_weekend", "arrivals"] # Merge to get net flow flow_df = departures.merge( arrivals, on=["station", "hour", "is_weekend"], how="outer" ).fillna(0) flow_df["net_flow"] = flow_df["arrivals"] - flow_df["departures"] # Count number of each type of day to get average per hour date_info = trips.groupby(trips["started_at"].dt.date).agg({"is_weekend": "first"}) n_weekdays = (~date_info["is_weekend"]).sum() n_weekend_days = date_info["is_weekend"].sum() # Normalize by number of days to get average flow per hour def normalize_flow(row): n_days = n_weekend_days if row["is_weekend"] else n_weekdays return row["net_flow"] / max(n_days, 1) flow_df["avg_net_flow"] = flow_df.apply(normalize_flow, axis=1) # Store as lookup dictionary self.hourly_net_flow = {} for _, row in flow_df.iterrows(): key = (row["station"], int(row["hour"]), bool(row["is_weekend"])) self.hourly_net_flow[key] = row["avg_net_flow"] # Compute global average as fallback self.global_avg_flow = flow_df["avg_net_flow"].mean() self.is_fitted = True print( f"Fitted model with {len(self.hourly_net_flow)} (station, hour, weekend) combinations" ) return self
[docs] def predict_inventory( self, initial_inventory: pd.Series, start_time: pd.Timestamp, end_time: pd.Timestamp, freq: str = "1h", ) -> pd.DataFrame: """Predict inventory by applying average hourly net flow. Args: initial_inventory: Starting bike count per station start_time: Start time for prediction end_time: End time for prediction freq: Time frequency Returns: DataFrame with predicted inventory at each hour """ if not self.is_fitted: raise ValueError("Model must be fitted before prediction") # Generate time periods times = pd.date_range(start=start_time, end=end_time, freq=freq, inclusive="left") # Get stations from initial inventory stations = initial_inventory.index.tolist() # Initialize predictions predictions = pd.DataFrame(index=stations, columns=times, dtype=float) # Set initial state predictions[times[0]] = initial_inventory # Simulate forward current_inventory = initial_inventory.copy() for _i, t in enumerate(times[1:], 1): hour = t.hour is_weekend = t.dayofweek in [5, 6] # Apply net flow for each station new_inventory = current_inventory.copy() for station in stations: key = (station, hour, is_weekend) net_flow = self.hourly_net_flow.get(key, self.global_avg_flow) # Update inventory new_bikes = current_inventory[station] + net_flow # Clamp to valid range [0, capacity] capacity = self.station_capacities.get(station, 30) new_inventory[station] = np.clip(new_bikes, 0, capacity) predictions[t] = new_inventory current_inventory = new_inventory return predictions
[docs] def get_params(self) -> dict[str, Any]: """Return model parameters.""" return { "name": self.get_name(), "n_flow_combinations": len(self.hourly_net_flow) if self.hourly_net_flow else 0, "global_avg_flow": self.global_avg_flow, }