Skip to content

Add DuckDB plugin#633

Draft
andreahlert wants to merge 1 commit intoflyteorg:mainfrom
andreahlert:add-duckdb-plugin
Draft

Add DuckDB plugin#633
andreahlert wants to merge 1 commit intoflyteorg:mainfrom
andreahlert:add-duckdb-plugin

Conversation

@andreahlert
Copy link
Contributor

Summary

  • Add a DuckDB connector plugin for running SQL queries against DuckDB as Flyte tasks
  • DuckDB is an embedded analytical database (like SQLite for OLAP) that runs locally and synchronously
  • Follows the same architecture and patterns as the Snowflake plugin

Features

  • In-memory and file-based database support via DuckDBConfig(database_path=...)
  • Parameterized SQL queries with typed inputs
  • Extension installation and loading (httpfs, json, spatial, etc.)
  • Query results returned as pandas DataFrames via temporary parquet files
  • Automatic cleanup of temporary result files on delete

Design

Unlike Snowflake/BigQuery, DuckDB runs locally with no remote service, so the connector pattern is adapted:

  • Queries execute synchronously in create() (wrapped in run_in_executor for async compat)
  • get() always returns SUCCEEDED since queries complete in create()
  • delete() cleans up temporary parquet result files
  • No credentials, polling, or dashboard links needed

Test plan

  • 25 unit tests covering task creation, config serialization, SQL generation, connector create/get/delete, parameterized queries, extensions, and end-to-end flow
  • All tests pass (pytest plugins/duckdb/tests/ -v)
  • Linting passes (ruff check plugins/duckdb/)
  • Plugin structure matches Snowflake plugin layout

Add a DuckDB connector plugin following the same patterns as the
Snowflake plugin. DuckDB is an embedded analytical database that runs
queries locally and synchronously, so the connector executes queries
in create() and get() always returns SUCCEEDED.

Features:
- In-memory and file-based database support
- Parameterized SQL queries with typed inputs
- Extension installation and loading (httpfs, json, etc.)
- Query results returned as pandas DataFrames via temp parquet files
- Automatic cleanup of temporary result files

Signed-off-by: André Ahlert <andre@aex.partners>

count_rows = DuckDB(
name="count_rows",
query_template="SELECT COUNT(*) AS total FROM 'data.parquet'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the data.parquet coming from?
Is this an input of type flyte.io.DataFrame

Then i would love to support

count_rows = DuckDB(
    name="count_rows",
    query_template="SELECT COUNT(*) AS total FROM '{input}'",
    plugin_config=config,
    input=DataFrame,   # This can be implicit
    output_dataframe_type=DataFrame,
)

Then you can pass a parquet, a pandasDataframe, a spark DataFrame or anything to it

has_output: bool = False


class DuckDBConnector(AsyncConnector):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont want a connector. connector is useful for connecting to remote services. For this it should just be a Dataframe type or a task plugin

@@ -0,0 +1,150 @@
import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this entire file

extensions: Optional[List[str]] = None


class DuckDB(AsyncConnectorExecutorMixin, TaskTemplate):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need of connector mixin. Rather this should implement the execute method. It should be like ContainerTask (not all of it just the execute function)

async def execute(self, **kwargs) -> Any:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you do this, you can delete the connector. You need this way, because you need guaranteed memory and isolation. connector is ok for shared resources, this is usually api calls

Copy link
Contributor

@kumare3 kumare3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you got it a little wrong

@andreahlert
Copy link
Contributor Author

I think you got it a little wrong

Thanks for the review! You're right, I should have looked at the existing flytekit DuckDB plugin as reference instead of modeling it after Snowflake. I'll rework this to use TaskTemplate with execute() and add DataFrame input type support.

@andreahlert andreahlert marked this pull request as draft February 8, 2026 23:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants