API Reference¶
Models¶
Models for CitiBike inventory prediction.
- class citibike.models.BaseModel(config: dict)[source]¶
Bases:
ABCAbstract base class for all bike inventory prediction models.
All models must implement: - fit(): Train the model on historical data - predict_inventory(): Predict future bike counts per station
- __init__(config: dict)[source]¶
Initialize model with configuration.
- Args:
config: Configuration dictionary
- abstract fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]¶
Train the model on historical trip data.
- Args:
- trips: DataFrame with columns [started_at, ended_at,
start_station_name, end_station_name, …]
station_stats: DataFrame indexed by station_name with capacity
- Returns:
self (for method chaining)
- abstract predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict bike inventory at each station over time.
- Args:
initial_inventory: Series indexed by station_name with starting bike counts start_time: Start of prediction period end_time: End of prediction period freq: Time frequency (e.g., “1h” for hourly)
- Returns:
- DataFrame with predictions:
index: station_name
columns: timestamps
values: predicted bike counts
- predict_states(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict station states (empty/normal/full) over time.
- Args:
initial_inventory: Starting bike counts per station start_time: Start of prediction period end_time: End of prediction period freq: Time frequency
- Returns:
DataFrame with state predictions (“empty”, “normal”, “full”)
- citibike.models.BaselineModel¶
alias of
TemporalFlowModel
- class citibike.models.MarkovModel(config: dict)[source]¶
Bases:
BaseModelMarkov Chain model for predicting bike station inventory.
This model: 1. Builds time-dependent transition matrices P[i→j | hour, is_weekend] 2. Learns departure rates per station per time context 3. Simulates multiple random walks and averages predictions
Key features: - State-dependent: Departures depend on current inventory - Capacity-aware: Arrivals capped at station capacity - Monte Carlo: Average over N simulations for robust predictions
- fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]¶
Build time-dependent transition matrices from trip data.
Uses vectorized pandas operations for speed.
- get_top_destinations(station: str, hour: int, is_weekend: bool, top_k: int = 5) DataFrame[source]¶
Get top destinations from a station for a given context.
- get_transition_matrix(hour: int, is_weekend: bool) tuple[csr_matrix, list][source]¶
Get transition matrix for a specific context.
- predict_flow(stations: list, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict net flow (legacy method, kept for compatibility).
- predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict inventory using Monte Carlo random walks.
Runs N simulations and averages the results.
- Args:
initial_inventory: Starting bike count per station start_time: Start time for prediction end_time: End time for prediction freq: Time frequency
- Returns:
DataFrame with predicted inventory (mean over simulations)
- class citibike.models.PersistenceModel(config: dict)[source]¶
Bases:
BaseModelPersistence (Naive) baseline - predicts inventory stays constant.
inventory[t+1] = inventory[t] = initial_inventory
This is the simplest possible baseline. Any useful model should beat this.
- class citibike.models.StationAverageModel(config: dict)[source]¶
Bases:
BaseModelStation-only average baseline - ignores temporal patterns.
Learns average net flow per station (across all hours/days), then applies it uniformly.
inventory[t+1] = inventory[t] + station_avg_flow
This shows the value of temporal conditioning (hour, weekend).
- class citibike.models.TemporalFlowModel(config: dict)[source]¶
Bases:
BaseModelTemporal flow model using time-conditioned historical average net flow.
This model: 1. Computes average hourly net flow (arrivals - departures) per station,
conditioned on hour of day and weekend/weekday
Predicts future inventory by applying the appropriate average flow: inventory[t+1] = inventory[t] + avg_net_flow[station, hour, is_weekend]
This captures both station-specific and temporal patterns in bike flow.
- fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]¶
Compute historical average hourly net flow per station.
- Args:
trips: Trip data with start/end stations and timestamps station_stats: Station info with capacity
- predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict inventory by applying average hourly net flow.
- Args:
initial_inventory: Starting bike count per station start_time: Start time for prediction end_time: End time for prediction freq: Time frequency
- Returns:
DataFrame with predicted inventory at each hour
Base Model¶
Base model class defining the interface for inventory prediction.
- class citibike.models.base.BaseModel(config: dict)[source]¶
Bases:
ABCAbstract base class for all bike inventory prediction models.
All models must implement: - fit(): Train the model on historical data - predict_inventory(): Predict future bike counts per station
- __init__(config: dict)[source]¶
Initialize model with configuration.
- Args:
config: Configuration dictionary
- abstract fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]¶
Train the model on historical trip data.
- Args:
- trips: DataFrame with columns [started_at, ended_at,
start_station_name, end_station_name, …]
station_stats: DataFrame indexed by station_name with capacity
- Returns:
self (for method chaining)
- abstract predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict bike inventory at each station over time.
- Args:
initial_inventory: Series indexed by station_name with starting bike counts start_time: Start of prediction period end_time: End of prediction period freq: Time frequency (e.g., “1h” for hourly)
- Returns:
- DataFrame with predictions:
index: station_name
columns: timestamps
values: predicted bike counts
- predict_states(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict station states (empty/normal/full) over time.
- Args:
initial_inventory: Starting bike counts per station start_time: Start of prediction period end_time: End of prediction period freq: Time frequency
- Returns:
DataFrame with state predictions (“empty”, “normal”, “full”)
Markov Model¶
Markov Chain model for bike inventory prediction using Monte Carlo simulation.
- class citibike.models.markov.MarkovModel(config: dict)[source]¶
Bases:
BaseModelMarkov Chain model for predicting bike station inventory.
This model: 1. Builds time-dependent transition matrices P[i→j | hour, is_weekend] 2. Learns departure rates per station per time context 3. Simulates multiple random walks and averages predictions
Key features: - State-dependent: Departures depend on current inventory - Capacity-aware: Arrivals capped at station capacity - Monte Carlo: Average over N simulations for robust predictions
- fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]¶
Build time-dependent transition matrices from trip data.
Uses vectorized pandas operations for speed.
- get_top_destinations(station: str, hour: int, is_weekend: bool, top_k: int = 5) DataFrame[source]¶
Get top destinations from a station for a given context.
- get_transition_matrix(hour: int, is_weekend: bool) tuple[csr_matrix, list][source]¶
Get transition matrix for a specific context.
- predict_flow(stations: list, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict net flow (legacy method, kept for compatibility).
- predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Predict inventory using Monte Carlo random walks.
Runs N simulations and averages the results.
- Args:
initial_inventory: Starting bike count per station start_time: Start time for prediction end_time: End time for prediction freq: Time frequency
- Returns:
DataFrame with predicted inventory (mean over simulations)
Evaluation¶
Evaluation framework for CitiBike inventory prediction models.
- class citibike.evaluation.RollingWindowCV(train_weeks: int = 3, test_weeks: int = 1, increment_days: int | None = None)[source]¶
Bases:
objectRolling window cross-validation splitter.
- citibike.evaluation.compute_initial_inventory_for_fold(trips: DataFrame, stations: list, fold_start: Timestamp) Series[source]¶
Compute initial inventory at start of fold using backward tracking.
Uses trips before fold_start to infer the bike distribution.
- Args:
trips: All trip data stations: List of station names fold_start: Start time of the fold
- Returns:
Series with estimated bike count per station
- citibike.evaluation.compute_inventory_metrics(true_inventory: DataFrame, pred_inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) dict[str, float][source]¶
Compute all evaluation metrics for inventory prediction.
- Args:
true_inventory: DataFrame with actual bike counts pred_inventory: DataFrame with predicted bike counts capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds
- Returns:
Dictionary with all metrics
- citibike.evaluation.compute_mae(y_true: ndarray, y_pred: ndarray) float[source]¶
Compute Mean Absolute Error.
- citibike.evaluation.compute_mape(y_true: ndarray, y_pred: ndarray, epsilon: float = 1.0) float[source]¶
Compute Mean Absolute Percentage Error.
- citibike.evaluation.compute_rmse(y_true: ndarray, y_pred: ndarray) float[source]¶
Compute Root Mean Squared Error.
- citibike.evaluation.compute_state_metrics(true_states: DataFrame, pred_states: DataFrame, state: str) dict[str, float][source]¶
Compute precision, recall, F1 for a specific state.
- Args:
true_states: DataFrame with actual states pred_states: DataFrame with predicted states state: Which state to evaluate (“empty” or “full”)
- Returns:
Dictionary with precision, recall, f1, count
- citibike.evaluation.inventory_to_states(inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) DataFrame[source]¶
Convert inventory counts to states (empty/normal/full).
- Args:
inventory: DataFrame with bike counts (index=stations, columns=times) capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds (as fraction of capacity)
- Returns:
DataFrame with states (“empty”, “normal”, “full”)
- citibike.evaluation.run_cross_validation(model, trips: DataFrame, station_stats: DataFrame, config: dict, verbose: bool = True) tuple[list[dict[str, float]], dict[str, tuple[float, float]]][source]¶
Run rolling window cross-validation for inventory prediction.
- Args:
model: Model instance (must have fit/predict_inventory methods) trips: Trip data station_stats: Station information with capacity config: Configuration dictionary verbose: Whether to print progress
- Returns:
Tuple of (fold_results, summary)
- citibike.evaluation.track_inventory(trips: DataFrame, initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Track actual inventory by applying trips to initial state.
This gives us GROUND TRUTH inventory - what actually happened.
- Args:
trips: Trip data for the period initial_inventory: Starting bike count per station start_time: Start of tracking period end_time: End of tracking period freq: Time frequency
- Returns:
DataFrame with actual inventory (index=stations, columns=times)
Metrics¶
Evaluation metrics for inventory prediction models.
- citibike.evaluation.metrics.compute_inventory_metrics(true_inventory: DataFrame, pred_inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) dict[str, float][source]¶
Compute all evaluation metrics for inventory prediction.
- Args:
true_inventory: DataFrame with actual bike counts pred_inventory: DataFrame with predicted bike counts capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds
- Returns:
Dictionary with all metrics
- citibike.evaluation.metrics.compute_mae(y_true: ndarray, y_pred: ndarray) float[source]¶
Compute Mean Absolute Error.
- citibike.evaluation.metrics.compute_mape(y_true: ndarray, y_pred: ndarray, epsilon: float = 1.0) float[source]¶
Compute Mean Absolute Percentage Error.
- citibike.evaluation.metrics.compute_rmse(y_true: ndarray, y_pred: ndarray) float[source]¶
Compute Root Mean Squared Error.
- citibike.evaluation.metrics.compute_state_metrics(true_states: DataFrame, pred_states: DataFrame, state: str) dict[str, float][source]¶
Compute precision, recall, F1 for a specific state.
- Args:
true_states: DataFrame with actual states pred_states: DataFrame with predicted states state: Which state to evaluate (“empty” or “full”)
- Returns:
Dictionary with precision, recall, f1, count
- citibike.evaluation.metrics.inventory_to_states(inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) DataFrame[source]¶
Convert inventory counts to states (empty/normal/full).
- Args:
inventory: DataFrame with bike counts (index=stations, columns=times) capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds (as fraction of capacity)
- Returns:
DataFrame with states (“empty”, “normal”, “full”)
Cross Validation¶
Rolling window cross-validation for inventory prediction.
- class citibike.evaluation.cross_validation.CVFold(fold_id: int, train_start: Timestamp, train_end: Timestamp, test_start: Timestamp, test_end: Timestamp)[source]¶
Bases:
objectA single cross-validation fold.
- class citibike.evaluation.cross_validation.RollingWindowCV(train_weeks: int = 3, test_weeks: int = 1, increment_days: int | None = None)[source]¶
Bases:
objectRolling window cross-validation splitter.
- citibike.evaluation.cross_validation.compute_initial_inventory_for_fold(trips: DataFrame, stations: list, fold_start: Timestamp) Series[source]¶
Compute initial inventory at start of fold using backward tracking.
Uses trips before fold_start to infer the bike distribution.
- Args:
trips: All trip data stations: List of station names fold_start: Start time of the fold
- Returns:
Series with estimated bike count per station
- citibike.evaluation.cross_validation.run_cross_validation(model, trips: DataFrame, station_stats: DataFrame, config: dict, verbose: bool = True) tuple[list[dict[str, float]], dict[str, tuple[float, float]]][source]¶
Run rolling window cross-validation for inventory prediction.
- Args:
model: Model instance (must have fit/predict_inventory methods) trips: Trip data station_stats: Station information with capacity config: Configuration dictionary verbose: Whether to print progress
- Returns:
Tuple of (fold_results, summary)
- citibike.evaluation.cross_validation.track_inventory(trips: DataFrame, initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]¶
Track actual inventory by applying trips to initial state.
This gives us GROUND TRUTH inventory - what actually happened.
- Args:
trips: Trip data for the period initial_inventory: Starting bike count per station start_time: Start of tracking period end_time: End of tracking period freq: Time frequency
- Returns:
DataFrame with actual inventory (index=stations, columns=times)
Utilities¶
Utility functions for the CitiBike Markov Model.
- class citibike.utils.DuckDBConnection(database: str | None = None)[source]¶
Bases:
objectContext manager for DuckDB connections.
- citibike.utils.aggregate_trips(parquet_path: Path, group_by: list[str], aggregations: dict[str, str], filters: dict[str, Any] | None = None) DataFrame[source]¶
Aggregate trip data using DuckDB.
- Args:
parquet_path: Path to parquet files group_by: Columns to group by aggregations: Dict of {output_col: aggregation_expr}
Example: {“trip_count”: “COUNT(*)”, “avg_duration”: “AVG(duration)”}
filters: Optional filters to apply before aggregation
- Returns:
DataFrame with aggregated results
- citibike.utils.count_trips_by_station(parquet_path: Path, start_date: str | None = None, end_date: str | None = None) DataFrame[source]¶
Count trips by station (both starts and ends).
- Args:
parquet_path: Path to parquet files start_date: Optional start date filter end_date: Optional end date filter
- Returns:
DataFrame with station trip counts
- citibike.utils.create_summary_table(parquet_path: Path, output_path: Path, start_date: str | None = None, end_date: str | None = None) None[source]¶
Create a summary table for faster querying.
Aggregates trips by hour and station for modeling.
- Args:
parquet_path: Path to raw parquet files output_path: Path for summary parquet output start_date: Optional start date filter end_date: Optional end date filter
- citibike.utils.export_to_parquet(df: DataFrame, output_path: Path, partition_cols: list[str] | None = None, compression: str = 'zstd') None[source]¶
Export DataFrame to Parquet using DuckDB.
- Args:
df: DataFrame to export output_path: Output path for parquet file partition_cols: Columns to partition by (creates subdirectories) compression: Compression algorithm (zstd, snappy, gzip, etc.)
- citibike.utils.get_trip_stats(parquet_path: Path, start_date: str | None = None, end_date: str | None = None) dict[str, Any][source]¶
Get summary statistics for trip data.
- Args:
parquet_path: Path to parquet files start_date: Optional start date filter (YYYY-MM-DD) end_date: Optional end date filter (YYYY-MM-DD)
- Returns:
Dictionary with statistics
- citibike.utils.load_config(config_path: str = 'config.yaml') dict[source]¶
Load configuration from YAML file.
- Args:
config_path: Path to config file
- Returns:
Configuration dictionary
- citibike.utils.load_station_info(station_path: str = 'data/stations/station_info.csv', use_parquet: bool = True) DataFrame[source]¶
Load station information including capacity.
- Args:
station_path: Path to station info file (CSV or Parquet) use_parquet: If True, try Parquet first
- Returns:
DataFrame with station information
- citibike.utils.load_trip_data(data_dir: str = 'data', start_date: str | None = None, end_date: str | None = None, use_parquet: bool = True) DataFrame[source]¶
Load trip data from Parquet or CSV files using DuckDB.
- Args:
data_dir: Directory containing trip data folders start_date: Optional start date filter (YYYY-MM-DD) end_date: Optional end date filter (YYYY-MM-DD) use_parquet: If True, use Parquet files; otherwise fall back to CSV
- Returns:
DataFrame with all trip data
- citibike.utils.prepare_data(trips: DataFrame, stations: DataFrame, config: dict) tuple[DataFrame, DataFrame][source]¶
Prepare data for modeling.
Filters to valid stations
Adds time features
Merges with station capacity
- Args:
trips: Raw trip data stations: Station information config: Configuration dictionary
- Returns:
Tuple of (processed_trips, station_stats)
- citibike.utils.query_parquet(parquet_path: Path, columns: list[str] | None = None, filters: dict[str, Any] | None = None, limit: int | None = None) DataFrame[source]¶
Query Parquet files with DuckDB.
- Args:
parquet_path: Path to parquet directory or file columns: List of columns to select (None = all) filters: Dictionary of column filters (e.g., {“year”: 2025, “month”: 9}) limit: Maximum number of rows to return
- Returns:
DataFrame with query results