API Reference

Models

Models for CitiBike inventory prediction.

class citibike.models.BaseModel(config: dict)[source]

Bases: ABC

Abstract base class for all bike inventory prediction models.

All models must implement: - fit(): Train the model on historical data - predict_inventory(): Predict future bike counts per station

__init__(config: dict)[source]

Initialize model with configuration.

Args:

config: Configuration dictionary

abstract fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]

Train the model on historical trip data.

Args:
trips: DataFrame with columns [started_at, ended_at,

start_station_name, end_station_name, …]

station_stats: DataFrame indexed by station_name with capacity

Returns:

self (for method chaining)

get_name() str[source]

Return the model name.

get_params() dict[str, Any][source]

Return model parameters for logging.

abstract predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict bike inventory at each station over time.

Args:

initial_inventory: Series indexed by station_name with starting bike counts start_time: Start of prediction period end_time: End of prediction period freq: Time frequency (e.g., “1h” for hourly)

Returns:
DataFrame with predictions:
  • index: station_name

  • columns: timestamps

  • values: predicted bike counts

predict_states(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict station states (empty/normal/full) over time.

Args:

initial_inventory: Starting bike counts per station start_time: Start of prediction period end_time: End of prediction period freq: Time frequency

Returns:

DataFrame with state predictions (“empty”, “normal”, “full”)

citibike.models.BaselineModel

alias of TemporalFlowModel

class citibike.models.MarkovModel(config: dict)[source]

Bases: BaseModel

Markov Chain model for predicting bike station inventory.

This model: 1. Builds time-dependent transition matrices P[i→j | hour, is_weekend] 2. Learns departure rates per station per time context 3. Simulates multiple random walks and averages predictions

Key features: - State-dependent: Departures depend on current inventory - Capacity-aware: Arrivals capped at station capacity - Monte Carlo: Average over N simulations for robust predictions

fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]

Build time-dependent transition matrices from trip data.

Uses vectorized pandas operations for speed.

get_params() dict[str, Any][source]

Return model parameters.

get_top_destinations(station: str, hour: int, is_weekend: bool, top_k: int = 5) DataFrame[source]

Get top destinations from a station for a given context.

get_transition_matrix(hour: int, is_weekend: bool) tuple[csr_matrix, list][source]

Get transition matrix for a specific context.

predict_flow(stations: list, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict net flow (legacy method, kept for compatibility).

predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict inventory using Monte Carlo random walks.

Runs N simulations and averages the results.

Args:

initial_inventory: Starting bike count per station start_time: Start time for prediction end_time: End time for prediction freq: Time frequency

Returns:

DataFrame with predicted inventory (mean over simulations)

predict_with_uncertainty(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') tuple[DataFrame, DataFrame, DataFrame][source]

Predict inventory with uncertainty bounds.

Returns mean, lower (5th percentile), and upper (95th percentile).

class citibike.models.PersistenceModel(config: dict)[source]

Bases: BaseModel

Persistence (Naive) baseline - predicts inventory stays constant.

inventory[t+1] = inventory[t] = initial_inventory

This is the simplest possible baseline. Any useful model should beat this.

fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]

No training needed - just store station info.

get_params() dict[str, Any][source]

Return model parameters for logging.

predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict inventory stays constant at initial state.

class citibike.models.StationAverageModel(config: dict)[source]

Bases: BaseModel

Station-only average baseline - ignores temporal patterns.

Learns average net flow per station (across all hours/days), then applies it uniformly.

inventory[t+1] = inventory[t] + station_avg_flow

This shows the value of temporal conditioning (hour, weekend).

fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]

Compute average net flow per station (ignoring time).

get_params() dict[str, Any][source]

Return model parameters for logging.

predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict inventory using station average flow (no time patterns).

class citibike.models.TemporalFlowModel(config: dict)[source]

Bases: BaseModel

Temporal flow model using time-conditioned historical average net flow.

This model: 1. Computes average hourly net flow (arrivals - departures) per station,

conditioned on hour of day and weekend/weekday

  1. Predicts future inventory by applying the appropriate average flow: inventory[t+1] = inventory[t] + avg_net_flow[station, hour, is_weekend]

This captures both station-specific and temporal patterns in bike flow.

fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]

Compute historical average hourly net flow per station.

Args:

trips: Trip data with start/end stations and timestamps station_stats: Station info with capacity

get_params() dict[str, Any][source]

Return model parameters.

predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict inventory by applying average hourly net flow.

Args:

initial_inventory: Starting bike count per station start_time: Start time for prediction end_time: End time for prediction freq: Time frequency

Returns:

DataFrame with predicted inventory at each hour

Base Model

Base model class defining the interface for inventory prediction.

class citibike.models.base.BaseModel(config: dict)[source]

Bases: ABC

Abstract base class for all bike inventory prediction models.

All models must implement: - fit(): Train the model on historical data - predict_inventory(): Predict future bike counts per station

__init__(config: dict)[source]

Initialize model with configuration.

Args:

config: Configuration dictionary

abstract fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]

Train the model on historical trip data.

Args:
trips: DataFrame with columns [started_at, ended_at,

start_station_name, end_station_name, …]

station_stats: DataFrame indexed by station_name with capacity

Returns:

self (for method chaining)

get_name() str[source]

Return the model name.

get_params() dict[str, Any][source]

Return model parameters for logging.

abstract predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict bike inventory at each station over time.

Args:

initial_inventory: Series indexed by station_name with starting bike counts start_time: Start of prediction period end_time: End of prediction period freq: Time frequency (e.g., “1h” for hourly)

Returns:
DataFrame with predictions:
  • index: station_name

  • columns: timestamps

  • values: predicted bike counts

predict_states(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict station states (empty/normal/full) over time.

Args:

initial_inventory: Starting bike counts per station start_time: Start of prediction period end_time: End of prediction period freq: Time frequency

Returns:

DataFrame with state predictions (“empty”, “normal”, “full”)

Markov Model

Markov Chain model for bike inventory prediction using Monte Carlo simulation.

class citibike.models.markov.MarkovModel(config: dict)[source]

Bases: BaseModel

Markov Chain model for predicting bike station inventory.

This model: 1. Builds time-dependent transition matrices P[i→j | hour, is_weekend] 2. Learns departure rates per station per time context 3. Simulates multiple random walks and averages predictions

Key features: - State-dependent: Departures depend on current inventory - Capacity-aware: Arrivals capped at station capacity - Monte Carlo: Average over N simulations for robust predictions

fit(trips: DataFrame, station_stats: DataFrame) BaseModel[source]

Build time-dependent transition matrices from trip data.

Uses vectorized pandas operations for speed.

get_params() dict[str, Any][source]

Return model parameters.

get_top_destinations(station: str, hour: int, is_weekend: bool, top_k: int = 5) DataFrame[source]

Get top destinations from a station for a given context.

get_transition_matrix(hour: int, is_weekend: bool) tuple[csr_matrix, list][source]

Get transition matrix for a specific context.

predict_flow(stations: list, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict net flow (legacy method, kept for compatibility).

predict_inventory(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Predict inventory using Monte Carlo random walks.

Runs N simulations and averages the results.

Args:

initial_inventory: Starting bike count per station start_time: Start time for prediction end_time: End time for prediction freq: Time frequency

Returns:

DataFrame with predicted inventory (mean over simulations)

predict_with_uncertainty(initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') tuple[DataFrame, DataFrame, DataFrame][source]

Predict inventory with uncertainty bounds.

Returns mean, lower (5th percentile), and upper (95th percentile).

Evaluation

Evaluation framework for CitiBike inventory prediction models.

class citibike.evaluation.RollingWindowCV(train_weeks: int = 3, test_weeks: int = 1, increment_days: int | None = None)[source]

Bases: object

Rolling window cross-validation splitter.

get_n_splits(trips: DataFrame) int[source]
split(trips: DataFrame) Generator[CVFold, None, None][source]

Generate train/test splits.

citibike.evaluation.compute_initial_inventory_for_fold(trips: DataFrame, stations: list, fold_start: Timestamp) Series[source]

Compute initial inventory at start of fold using backward tracking.

Uses trips before fold_start to infer the bike distribution.

Args:

trips: All trip data stations: List of station names fold_start: Start time of the fold

Returns:

Series with estimated bike count per station

citibike.evaluation.compute_inventory_metrics(true_inventory: DataFrame, pred_inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) dict[str, float][source]

Compute all evaluation metrics for inventory prediction.

Args:

true_inventory: DataFrame with actual bike counts pred_inventory: DataFrame with predicted bike counts capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds

Returns:

Dictionary with all metrics

citibike.evaluation.compute_mae(y_true: ndarray, y_pred: ndarray) float[source]

Compute Mean Absolute Error.

citibike.evaluation.compute_mape(y_true: ndarray, y_pred: ndarray, epsilon: float = 1.0) float[source]

Compute Mean Absolute Percentage Error.

citibike.evaluation.compute_rmse(y_true: ndarray, y_pred: ndarray) float[source]

Compute Root Mean Squared Error.

citibike.evaluation.compute_state_metrics(true_states: DataFrame, pred_states: DataFrame, state: str) dict[str, float][source]

Compute precision, recall, F1 for a specific state.

Args:

true_states: DataFrame with actual states pred_states: DataFrame with predicted states state: Which state to evaluate (“empty” or “full”)

Returns:

Dictionary with precision, recall, f1, count

citibike.evaluation.inventory_to_states(inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) DataFrame[source]

Convert inventory counts to states (empty/normal/full).

Args:

inventory: DataFrame with bike counts (index=stations, columns=times) capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds (as fraction of capacity)

Returns:

DataFrame with states (“empty”, “normal”, “full”)

citibike.evaluation.run_cross_validation(model, trips: DataFrame, station_stats: DataFrame, config: dict, verbose: bool = True) tuple[list[dict[str, float]], dict[str, tuple[float, float]]][source]

Run rolling window cross-validation for inventory prediction.

Args:

model: Model instance (must have fit/predict_inventory methods) trips: Trip data station_stats: Station information with capacity config: Configuration dictionary verbose: Whether to print progress

Returns:

Tuple of (fold_results, summary)

citibike.evaluation.track_inventory(trips: DataFrame, initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Track actual inventory by applying trips to initial state.

This gives us GROUND TRUTH inventory - what actually happened.

Args:

trips: Trip data for the period initial_inventory: Starting bike count per station start_time: Start of tracking period end_time: End of tracking period freq: Time frequency

Returns:

DataFrame with actual inventory (index=stations, columns=times)

Metrics

Evaluation metrics for inventory prediction models.

citibike.evaluation.metrics.compute_inventory_metrics(true_inventory: DataFrame, pred_inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) dict[str, float][source]

Compute all evaluation metrics for inventory prediction.

Args:

true_inventory: DataFrame with actual bike counts pred_inventory: DataFrame with predicted bike counts capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds

Returns:

Dictionary with all metrics

citibike.evaluation.metrics.compute_mae(y_true: ndarray, y_pred: ndarray) float[source]

Compute Mean Absolute Error.

citibike.evaluation.metrics.compute_mape(y_true: ndarray, y_pred: ndarray, epsilon: float = 1.0) float[source]

Compute Mean Absolute Percentage Error.

citibike.evaluation.metrics.compute_rmse(y_true: ndarray, y_pred: ndarray) float[source]

Compute Root Mean Squared Error.

citibike.evaluation.metrics.compute_state_metrics(true_states: DataFrame, pred_states: DataFrame, state: str) dict[str, float][source]

Compute precision, recall, F1 for a specific state.

Args:

true_states: DataFrame with actual states pred_states: DataFrame with predicted states state: Which state to evaluate (“empty” or “full”)

Returns:

Dictionary with precision, recall, f1, count

citibike.evaluation.metrics.inventory_to_states(inventory: DataFrame, capacities: dict[str, float], thresholds: dict[str, float]) DataFrame[source]

Convert inventory counts to states (empty/normal/full).

Args:

inventory: DataFrame with bike counts (index=stations, columns=times) capacities: Dict mapping station -> capacity thresholds: Dict with “empty” and “full” thresholds (as fraction of capacity)

Returns:

DataFrame with states (“empty”, “normal”, “full”)

citibike.evaluation.metrics.summarize_fold_results(fold_results: list) dict[str, tuple[float, float]][source]

Summarize results across cross-validation folds.

Args:

fold_results: List of metric dictionaries from each fold

Returns:

Dictionary mapping metric -> (mean, std)

Cross Validation

Rolling window cross-validation for inventory prediction.

class citibike.evaluation.cross_validation.CVFold(fold_id: int, train_start: Timestamp, train_end: Timestamp, test_start: Timestamp, test_end: Timestamp)[source]

Bases: object

A single cross-validation fold.

fold_id: int
test_end: Timestamp
test_start: Timestamp
train_end: Timestamp
train_start: Timestamp
class citibike.evaluation.cross_validation.RollingWindowCV(train_weeks: int = 3, test_weeks: int = 1, increment_days: int | None = None)[source]

Bases: object

Rolling window cross-validation splitter.

get_n_splits(trips: DataFrame) int[source]
split(trips: DataFrame) Generator[CVFold, None, None][source]

Generate train/test splits.

citibike.evaluation.cross_validation.compute_initial_inventory_for_fold(trips: DataFrame, stations: list, fold_start: Timestamp) Series[source]

Compute initial inventory at start of fold using backward tracking.

Uses trips before fold_start to infer the bike distribution.

Args:

trips: All trip data stations: List of station names fold_start: Start time of the fold

Returns:

Series with estimated bike count per station

citibike.evaluation.cross_validation.run_cross_validation(model, trips: DataFrame, station_stats: DataFrame, config: dict, verbose: bool = True) tuple[list[dict[str, float]], dict[str, tuple[float, float]]][source]

Run rolling window cross-validation for inventory prediction.

Args:

model: Model instance (must have fit/predict_inventory methods) trips: Trip data station_stats: Station information with capacity config: Configuration dictionary verbose: Whether to print progress

Returns:

Tuple of (fold_results, summary)

citibike.evaluation.cross_validation.track_inventory(trips: DataFrame, initial_inventory: Series, start_time: Timestamp, end_time: Timestamp, freq: str = '1h') DataFrame[source]

Track actual inventory by applying trips to initial state.

This gives us GROUND TRUTH inventory - what actually happened.

Args:

trips: Trip data for the period initial_inventory: Starting bike count per station start_time: Start of tracking period end_time: End of tracking period freq: Time frequency

Returns:

DataFrame with actual inventory (index=stations, columns=times)

Utilities

Utility functions for the CitiBike Markov Model.

class citibike.utils.DuckDBConnection(database: str | None = None)[source]

Bases: object

Context manager for DuckDB connections.

__init__(database: str | None = None)[source]

Initialize connection.

Args:

database: Path to database file. If None, uses in-memory database.

citibike.utils.aggregate_trips(parquet_path: Path, group_by: list[str], aggregations: dict[str, str], filters: dict[str, Any] | None = None) DataFrame[source]

Aggregate trip data using DuckDB.

Args:

parquet_path: Path to parquet files group_by: Columns to group by aggregations: Dict of {output_col: aggregation_expr}

Example: {“trip_count”: “COUNT(*)”, “avg_duration”: “AVG(duration)”}

filters: Optional filters to apply before aggregation

Returns:

DataFrame with aggregated results

citibike.utils.count_trips_by_station(parquet_path: Path, start_date: str | None = None, end_date: str | None = None) DataFrame[source]

Count trips by station (both starts and ends).

Args:

parquet_path: Path to parquet files start_date: Optional start date filter end_date: Optional end date filter

Returns:

DataFrame with station trip counts

citibike.utils.create_summary_table(parquet_path: Path, output_path: Path, start_date: str | None = None, end_date: str | None = None) None[source]

Create a summary table for faster querying.

Aggregates trips by hour and station for modeling.

Args:

parquet_path: Path to raw parquet files output_path: Path for summary parquet output start_date: Optional start date filter end_date: Optional end date filter

citibike.utils.export_to_parquet(df: DataFrame, output_path: Path, partition_cols: list[str] | None = None, compression: str = 'zstd') None[source]

Export DataFrame to Parquet using DuckDB.

Args:

df: DataFrame to export output_path: Output path for parquet file partition_cols: Columns to partition by (creates subdirectories) compression: Compression algorithm (zstd, snappy, gzip, etc.)

citibike.utils.get_trip_stats(parquet_path: Path, start_date: str | None = None, end_date: str | None = None) dict[str, Any][source]

Get summary statistics for trip data.

Args:

parquet_path: Path to parquet files start_date: Optional start date filter (YYYY-MM-DD) end_date: Optional end date filter (YYYY-MM-DD)

Returns:

Dictionary with statistics

citibike.utils.load_config(config_path: str = 'config.yaml') dict[source]

Load configuration from YAML file.

Args:

config_path: Path to config file

Returns:

Configuration dictionary

citibike.utils.load_station_info(station_path: str = 'data/stations/station_info.csv', use_parquet: bool = True) DataFrame[source]

Load station information including capacity.

Args:

station_path: Path to station info file (CSV or Parquet) use_parquet: If True, try Parquet first

Returns:

DataFrame with station information

citibike.utils.load_trip_data(data_dir: str = 'data', start_date: str | None = None, end_date: str | None = None, use_parquet: bool = True) DataFrame[source]

Load trip data from Parquet or CSV files using DuckDB.

Args:

data_dir: Directory containing trip data folders start_date: Optional start date filter (YYYY-MM-DD) end_date: Optional end date filter (YYYY-MM-DD) use_parquet: If True, use Parquet files; otherwise fall back to CSV

Returns:

DataFrame with all trip data

citibike.utils.prepare_data(trips: DataFrame, stations: DataFrame, config: dict) tuple[DataFrame, DataFrame][source]

Prepare data for modeling.

  • Filters to valid stations

  • Adds time features

  • Merges with station capacity

Args:

trips: Raw trip data stations: Station information config: Configuration dictionary

Returns:

Tuple of (processed_trips, station_stats)

citibike.utils.query_parquet(parquet_path: Path, columns: list[str] | None = None, filters: dict[str, Any] | None = None, limit: int | None = None) DataFrame[source]

Query Parquet files with DuckDB.

Args:

parquet_path: Path to parquet directory or file columns: List of columns to select (None = all) filters: Dictionary of column filters (e.g., {“year”: 2025, “month”: 9}) limit: Maximum number of rows to return

Returns:

DataFrame with query results