diff --git a/README.md b/README.md index d393dcc..8a709bf 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,7 @@ from timecopilot import TimeCopilot # - unique_id: Unique identifier for each time series (string) # - ds: Date column (datetime format) # - y: Target variable for forecasting (float format) +# Spark, Ray, and Dask dataframes are accepted and converted to pandas at entry. # The pandas frequency will be inferred from the ds column, if not provided. # If the seasonality is not provided, it will be inferred based on the frequency. # If the horizon is not set, it will default to 2 times the inferred seasonality. @@ -323,4 +324,3 @@ Our pre-print paper is [available in arxiv](https://arxiv.org/abs/2509.00616). } ``` - diff --git a/docs/examples/dask-dataframe.ipynb b/docs/examples/dask-dataframe.ipynb new file mode 100644 index 0000000..61c1290 --- /dev/null +++ b/docs/examples/dask-dataframe.ipynb @@ -0,0 +1,71 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Dask DataFrames\n", + "\n", + "TimeCopilot converts Dask DataFrames to pandas at entry so you can use them directly.\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import dask.dataframe as dd\n", + "\n", + "from timecopilot import TimeCopilotForecaster\n", + "from timecopilot.models.stats import SeasonalNaive\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "df = pd.read_csv(\n", + " \"https://timecopilot.s3.amazonaws.com/public/data/air_passengers.csv\",\n", + " parse_dates=[\"ds\"],\n", + ")\n", + "dask_df = dd.from_pandas(df, npartitions=1)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "tcf = TimeCopilotForecaster(models=[SeasonalNaive()])\n", + "fcst_df = tcf.forecast(df=dask_df, h=12, freq=\"MS\")\n", + "fcst_df.head()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/examples/ray-dataframe.ipynb b/docs/examples/ray-dataframe.ipynb new file mode 100644 index 0000000..6b54439 --- /dev/null +++ b/docs/examples/ray-dataframe.ipynb @@ -0,0 +1,90 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Ray Datasets\n", + "\n", + "TimeCopilot converts Ray datasets to pandas at entry so you can use them directly.\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import ray\n", + "import ray.data as ray_data\n", + "\n", + "from timecopilot import TimeCopilotForecaster\n", + "from timecopilot.models.stats import SeasonalNaive\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "ray.init(ignore_reinit_error=True)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "df = pd.read_csv(\n", + " \"https://timecopilot.s3.amazonaws.com/public/data/air_passengers.csv\",\n", + " parse_dates=[\"ds\"],\n", + ")\n", + "ray_df = ray_data.from_pandas(df)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "tcf = TimeCopilotForecaster(models=[SeasonalNaive()])\n", + "fcst_df = tcf.forecast(df=ray_df, h=12, freq=\"MS\")\n", + "fcst_df.head()\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "ray.shutdown()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/examples/spark-dataframe.ipynb b/docs/examples/spark-dataframe.ipynb new file mode 100644 index 0000000..6af9157 --- /dev/null +++ b/docs/examples/spark-dataframe.ipynb @@ -0,0 +1,89 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Spark DataFrames\n", + "\n", + "TimeCopilot converts Spark DataFrames to pandas at entry so you can use them directly.\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "import pandas as pd\n", + "from pyspark.sql import SparkSession\n", + "\n", + "from timecopilot import TimeCopilotForecaster\n", + "from timecopilot.models.stats import SeasonalNaive\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "spark = SparkSession.builder.master(\"local[1]\").appName(\"timecopilot\").getOrCreate()\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "df = pd.read_csv(\n", + " \"https://timecopilot.s3.amazonaws.com/public/data/air_passengers.csv\",\n", + " parse_dates=[\"ds\"],\n", + ")\n", + "spark_df = spark.createDataFrame(df)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "tcf = TimeCopilotForecaster(models=[SeasonalNaive()])\n", + "fcst_df = tcf.forecast(df=spark_df, h=12, freq=\"MS\")\n", + "fcst_df.head()\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "spark.stop()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index d246854..69caaa2 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -14,10 +14,23 @@ TimeCopilot is available on PyPI as [timecopilot](https://pypi.org/project/timec uv add timecopilot ``` +!!! tip + + Optional dataframe dependencies (Spark, Ray, Dask) can be installed with: + + ```bash + pip install "timecopilot[dataframes]" + ``` + + or + + ```bash + uv sync --group dataframes + ``` + Requires Python 3.10 or later. !!! tip If you don't have a prior experience with `uv`, go to [uv getting started](https://docs.astral.sh/uv/getting-started/) section. - diff --git a/mkdocs.yml b/mkdocs.yml index 385147d..8989a63 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -19,17 +19,20 @@ nav: - Quickstart: getting-started/quickstart.md - Installation: getting-started/installation.md - Examples: - - examples/agent-quickstart.ipynb - - examples/llm-providers.ipynb - - examples/aws-bedrock.ipynb - - examples/google-llms.ipynb - - examples/forecaster-quickstart.ipynb - - examples/anomaly-detection-forecaster-quickstart.ipynb - - examples/ts-foundation-models-comparison-quickstart.ipynb - - examples/gift-eval.ipynb - - examples/chronos-family.ipynb - - examples/cryptocurrency-quickstart.ipynb - - examples/sktime.ipynb + - examples/agent-quickstart.ipynb + - examples/llm-providers.ipynb + - examples/aws-bedrock.ipynb + - examples/google-llms.ipynb + - examples/forecaster-quickstart.ipynb + - examples/anomaly-detection-forecaster-quickstart.ipynb + - examples/ts-foundation-models-comparison-quickstart.ipynb + - examples/gift-eval.ipynb + - examples/chronos-family.ipynb + - examples/cryptocurrency-quickstart.ipynb + - examples/sktime.ipynb + - examples/spark-dataframe.ipynb + - examples/ray-dataframe.ipynb + - examples/dask-dataframe.ipynb - Experiments: - experiments/gift-eval.md - experiments/fev.md diff --git a/pyproject.toml b/pyproject.toml index e995d9d..1802866 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,11 @@ docs = [ "modal>=1.0.4", "ruff>=0.12.1", ] +dataframes = [ + "dask[dataframe]>=2024.9.1", + "pyspark>=3.5.1", + "ray[data]>=2.52.1", +] [project] authors = [ @@ -74,6 +79,11 @@ dependencies = [ "tsfeatures", "utilsforecast[plotting]>=0.2.15", ] +optional-dependencies = { dataframes = [ + "dask[dataframe]>=2024.9.1", + "pyspark>=3.5.1", + "ray[data]>=2.52.1", +] } description = "The GenAI Forecasting Agent · LLMs × Time Series Foundation Models" license = "MIT" name = "timecopilot" diff --git a/tests/utils/test_dataframes.py b/tests/utils/test_dataframes.py new file mode 100644 index 0000000..1cb733e --- /dev/null +++ b/tests/utils/test_dataframes.py @@ -0,0 +1,62 @@ +import pandas as pd +import pytest + +from timecopilot.utils.dataframes import to_pandas + + +def _make_frame() -> pd.DataFrame: + dates = pd.date_range("2024-01-01", periods=3, freq="D") + return pd.DataFrame( + { + "unique_id": ["series_1"] * len(dates), + "ds": dates, + "y": [1.0, 2.0, 3.0], + } + ) + + +def test_to_pandas_with_pandas(): + df = _make_frame() + assert to_pandas(df) is df + + +def test_to_pandas_with_dask(): + dd = pytest.importorskip("dask.dataframe") + df = _make_frame() + dask_df = dd.from_pandas(df, npartitions=1) + out = to_pandas(dask_df) + pd.testing.assert_frame_equal(out, df) + + +def test_to_pandas_with_ray(): + ray = pytest.importorskip("ray") + ray_data = pytest.importorskip("ray.data") + if ray.is_initialized(): + ray.shutdown() + ray.init(ignore_reinit_error=True) + try: + df = _make_frame() + ray_df = ray_data.from_pandas(df) + out = to_pandas(ray_df) + pd.testing.assert_frame_equal(out.sort_values("ds").reset_index(drop=True), df) + finally: + ray.shutdown() + + +def test_to_pandas_with_spark(): + pyspark_sql = pytest.importorskip("pyspark.sql") + spark = pyspark_sql.SparkSession.builder.master("local[1]").appName( + "timecopilot-tests" + ).getOrCreate() + try: + df = _make_frame() + spark_df = spark.createDataFrame(df) + out = to_pandas(spark_df) + pd.testing.assert_frame_equal(out.sort_values("ds").reset_index(drop=True), df) + finally: + spark.stop() + + +def test_to_pandas_unsupported_type(): + with pytest.raises(TypeError, match="Unsupported dataframe type"): + to_pandas(["not", "a", "dataframe"]) diff --git a/timecopilot/agent.py b/timecopilot/agent.py index 059cf83..231f435 100644 --- a/timecopilot/agent.py +++ b/timecopilot/agent.py @@ -1195,8 +1195,9 @@ def analyze( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - a file path or URL pointing to a CSV / Parquet file with the same columns (it will be read automatically). h: Forecast horizon. Number of future periods to predict. If @@ -1258,8 +1259,9 @@ def forecast( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - a file path or URL pointing to a CSV / Parquet file with the same columns (it will be read automatically). h: Forecast horizon. Number of future periods to predict. If @@ -1414,8 +1416,9 @@ async def analyze( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - You must always work with time series data with the columns ds (date) and y (target value), if these are missing, attempt to infer them from similar column names or, if unsure, request @@ -1488,8 +1491,9 @@ async def forecast( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - You must always work with time series data with the columns ds (date) and y (target value), if these are missing, attempt to infer them from similar column names or, if unsure, request diff --git a/timecopilot/forecaster.py b/timecopilot/forecaster.py index a27ac5f..33b22d5 100644 --- a/timecopilot/forecaster.py +++ b/timecopilot/forecaster.py @@ -1,6 +1,7 @@ import pandas as pd from .models.utils.forecaster import Forecaster +from .utils.dataframes import to_pandas class TimeCopilotForecaster(Forecaster): @@ -73,6 +74,7 @@ def _call_models( quantiles: list[float] | None, **kwargs, ) -> pd.DataFrame: + df = to_pandas(df) # infer just once to avoid multiple calls to _maybe_infer_freq freq = self._maybe_infer_freq(df, freq) res_df: pd.DataFrame | None = None @@ -141,6 +143,8 @@ def forecast( - "unique_id": an ID column to distinguish multiple series. - "ds": a time column indicating timestamps or periods. - "y": a target column with the observed values. + Spark, Ray, and Dask dataframes are accepted and converted to + pandas at entry. h (int): Forecast horizon specifying how many future steps to predict. @@ -207,6 +211,8 @@ def cross_validation( - "unique_id": an ID column to distinguish multiple series. - "ds": a time column indicating timestamps or periods. - "y": a target column with the observed values. + Spark, Ray, and Dask dataframes are accepted and converted to + pandas at entry. h (int): Forecast horizon specifying how many future steps to predict in @@ -279,6 +285,8 @@ def detect_anomalies( Args: df (pd.DataFrame): DataFrame containing the time series to detect anomalies. + Spark, Ray, and Dask dataframes are accepted and converted to + pandas at entry. h (int, optional): Forecast horizon specifying how many future steps to predict. In each cross validation window. If not provided, the seasonality diff --git a/timecopilot/utils/dataframes.py b/timecopilot/utils/dataframes.py new file mode 100644 index 0000000..36f870c --- /dev/null +++ b/timecopilot/utils/dataframes.py @@ -0,0 +1,75 @@ +from typing import TYPE_CHECKING, TypeAlias + +import pandas as pd + +if TYPE_CHECKING: + import dask.dataframe as dd + from pyspark.sql import DataFrame as SparkDataFrame + from ray.data import Dataset + + DataFrameLike: TypeAlias = pd.DataFrame | SparkDataFrame | Dataset | dd.DataFrame +else: + DataFrameLike: TypeAlias = pd.DataFrame + + +def to_pandas(df: DataFrameLike) -> pd.DataFrame: + """Convert supported dataframe inputs (Spark, Ray, Dask) to pandas. + + Returns the input unchanged if it is already a pandas DataFrame. Conversion + collects the data into memory and may be expensive. Raises a TypeError for + unsupported dataframe types. + """ + if isinstance(df, pd.DataFrame): + return df + + module = type(df).__module__ + if module.startswith("pyspark"): + spark_df = _maybe_spark_to_pandas(df) + if spark_df is not None: + return spark_df + + if module.startswith("ray"): + ray_df = _maybe_ray_to_pandas(df) + if ray_df is not None: + return ray_df + + if module.startswith("dask"): + dask_df = _maybe_dask_to_pandas(df) + if dask_df is not None: + return dask_df + + raise TypeError( + "Unsupported dataframe type. Provide a pandas DataFrame or a compatible " + "Spark, Ray, or Dask dataframe. Install optional dependencies with " + "`pip install \"timecopilot[dataframes]\"` or `uv sync --group dataframes`." + ) + + +def _maybe_spark_to_pandas(df: Any) -> pd.DataFrame | None: + try: + from pyspark.sql import DataFrame as SparkDataFrame + except ImportError: + return None + if isinstance(df, SparkDataFrame): + return df.toPandas() + return None + + +def _maybe_ray_to_pandas(df: Any) -> pd.DataFrame | None: + try: + from ray.data import Dataset + except ImportError: + return None + if isinstance(df, Dataset): + return df.to_pandas() + return None + + +def _maybe_dask_to_pandas(df: Any) -> pd.DataFrame | None: + try: + import dask.dataframe as dd + except ImportError: + return None + if isinstance(df, dd.DataFrame): + return df.compute() + return None diff --git a/timecopilot/utils/experiment_handler.py b/timecopilot/utils/experiment_handler.py index 5e9bc4c..9e869e9 100644 --- a/timecopilot/utils/experiment_handler.py +++ b/timecopilot/utils/experiment_handler.py @@ -17,6 +17,7 @@ maybe_convert_col_to_datetime, maybe_infer_freq, ) +from .dataframes import to_pandas warnings.simplefilter( action="ignore", @@ -103,6 +104,8 @@ def read_df(path: str | Path) -> pd.DataFrame: def _validate_df(df: pd.DataFrame | str | Path) -> pd.DataFrame: if isinstance(df, str | Path): df = ExperimentDatasetParser.read_df(df) + else: + df = to_pandas(df) if "unique_id" not in df.columns: df["unique_id"] = "series_0" return maybe_convert_col_to_datetime(df, "ds")