Skip to content

Commit d49de4e

Browse files
phenobarbitalclaude
andcommitted
sdd: add spec for FEAT-002 — apache-iceberg-support
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 438af76 commit d49de4e

1 file changed

Lines changed: 261 additions & 0 deletions

File tree

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
# FEAT-002: apache-iceberg-support
2+
3+
**Title**: Apache Iceberg Driver for AsyncDB
4+
**Status**: draft
5+
**Created**: 2026-03-19
6+
**Author**: Jesus Lara
7+
8+
---
9+
10+
## Problem Statement
11+
12+
AsyncDB currently supports a wide range of database and data-lake drivers (PostgreSQL, BigQuery, DuckDB, Delta Lake, Cassandra, etc.), but lacks support for **Apache Iceberg** — the open table format increasingly adopted for large-scale analytics, data lake architectures, and lakehouse patterns.
13+
14+
Users working with Iceberg-based data lakes (on S3, GCS, or HDFS) must resort to external tools or manual integrations. A native asyncdb driver using **PyIceberg** (the pure-Python Iceberg implementation, no JVM required) would enable:
15+
16+
1. Unified async access to Iceberg catalogs (Hive, BigQuery, SQL-Postgres, REST, Glue).
17+
2. Full table lifecycle management (create, load, rename, drop).
18+
3. Namespace (database/schema) management.
19+
4. Read/write operations returning PyArrow Tables, Pandas DataFrames, Polars DataFrames, or DuckDB results.
20+
5. Advanced write modes: partial overwrites, upserts, and file-level ingestion from Parquet.
21+
6. Consistency with the existing asyncdb driver interface (`InitDriver`).
22+
23+
---
24+
25+
## Proposed Solution
26+
27+
Implement a new `iceberg` driver at `asyncdb/drivers/iceberg.py` that wraps the **PyIceberg** library, following the same architectural pattern as the existing `delta` driver (extends `InitDriver`, wraps synchronous calls with `asyncio.to_thread` where needed).
28+
29+
### Key Design Decisions
30+
31+
1. **Base class**: `InitDriver` (not `SQLDriver`) — Iceberg is a table format, not a SQL database. SQL queries over Iceberg data will be delegated to DuckDB (same pattern as `delta.py`).
32+
2. **Catalog-centric connection model**: The `connection()` method loads a PyIceberg `Catalog` instance. Catalog type (hive, glue, rest, sql, bigquery, dynamodb) is determined from `params`.
33+
3. **Async wrapping**: PyIceberg is synchronous. All blocking I/O operations (catalog RPCs, S3/GCS reads, file writes) will be wrapped in `asyncio.to_thread()` to avoid blocking the event loop.
34+
4. **Output formats**: Query results are natively PyArrow Tables; conversion to Pandas, Polars, and DuckDB is done on-demand via factory parameter (consistent with `delta` driver).
35+
5. **Optional dependency group**: PyIceberg and its extras will be declared as an optional dependency group `iceberg` in `pyproject.toml`.
36+
37+
---
38+
39+
## Detailed Design
40+
41+
### Driver Class: `iceberg`
42+
43+
```
44+
asyncdb/drivers/iceberg.py
45+
```
46+
47+
**Class hierarchy:**
48+
```
49+
InitDriver
50+
└── iceberg
51+
```
52+
53+
**Provider metadata:**
54+
```python
55+
_provider = "iceberg"
56+
_syntax = "nosql"
57+
```
58+
59+
### Constructor Parameters
60+
61+
| Parameter | Type | Description |
62+
|-----------|------|-------------|
63+
| `params["catalog_name"]` | `str` | Catalog identifier (default: `"default"`) |
64+
| `params["catalog_type"]` | `str` | One of: `rest`, `hive`, `glue`, `sql`, `bigquery`, `dynamodb` |
65+
| `params["catalog_properties"]` | `dict` | PyIceberg catalog properties (URI, credentials, warehouse, etc.) |
66+
| `params["namespace"]` | `str` | Default namespace to operate in (optional) |
67+
| `params["storage_options"]` | `dict` | S3/GCS/ADLS credentials and config (optional) |
68+
69+
### Connection Lifecycle
70+
71+
```python
72+
async def connection(self, **kwargs) -> Self:
73+
"""Load the PyIceberg catalog."""
74+
# Wraps pyiceberg.catalog.load_catalog() in asyncio.to_thread()
75+
76+
async def close(self) -> None:
77+
"""Release catalog reference and clean up."""
78+
```
79+
80+
### Namespace Operations
81+
82+
| Method | Signature | Description |
83+
|--------|-----------|-------------|
84+
| `create_namespace` | `async def create_namespace(self, namespace: str, properties: dict = None) -> None` | Create a new namespace |
85+
| `list_namespaces` | `async def list_namespaces(self) -> list[str]` | List all namespaces |
86+
| `drop_namespace` | `async def drop_namespace(self, namespace: str) -> None` | Drop a namespace |
87+
| `namespace_properties` | `async def namespace_properties(self, namespace: str) -> dict` | Get namespace metadata |
88+
89+
### Table Operations
90+
91+
| Method | Signature | Description |
92+
|--------|-----------|-------------|
93+
| `create_table` | `async def create_table(self, table_id: str, schema, partition_spec=None, **kwargs) -> Any` | Create table from PyArrow schema or Iceberg schema |
94+
| `register_table` | `async def register_table(self, table_id: str, metadata_location: str) -> Any` | Register an existing Iceberg table |
95+
| `load_table` | `async def load_table(self, table_id: str) -> Any` | Load a table reference |
96+
| `table_exists` | `async def table_exists(self, table_id: str) -> bool` | Check if table exists |
97+
| `rename_table` | `async def rename_table(self, from_id: str, to_id: str) -> None` | Rename a table |
98+
| `drop_table` | `async def drop_table(self, table_id: str, purge: bool = False) -> None` | Drop a table |
99+
| `tables` | `def tables(self, namespace: str = "") -> list[str]` | List tables in namespace |
100+
| `table` | `def table(self, tablename: str = "") -> dict` | Get table schema/metadata |
101+
102+
### Read Operations
103+
104+
| Method | Signature | Description |
105+
|--------|-----------|-------------|
106+
| `query` | `async def query(self, sentence: str = None, table_id: str = None, factory: str = "arrow", **kwargs)` | Query via DuckDB SQL or Iceberg scan expressions |
107+
| `queryrow` | `async def queryrow(self, sentence: str = None, table_id: str = None, factory: str = "arrow", **kwargs)` | Fetch single row |
108+
| `get` | `async def get(self, table_id: str, columns: list = None, row_filter: str = None, factory: str = "arrow")` | Scan table with optional column/row pruning |
109+
| `scan` | `async def scan(self, table_id: str, row_filter=None, selected_fields=None, snapshot_id=None, **kwargs)` | Low-level scan returning PyArrow Table |
110+
| `to_df` | `async def to_df(self, table_id: str, factory: str = "pandas", **kwargs)` | Convert table data to DataFrame |
111+
| `fetch_all` | alias for `query` | |
112+
| `fetch_one` | alias for `queryrow` | |
113+
114+
**Factory parameter values:** `"arrow"`, `"pandas"`, `"polars"`, `"duckdb"`
115+
116+
### Write Operations
117+
118+
| Method | Signature | Description |
119+
|--------|-----------|-------------|
120+
| `write` | `async def write(self, data, table_id: str, mode: str = "append", **kwargs) -> bool` | Write data (PyArrow Table, Pandas DF, or Polars DF) |
121+
| `overwrite` | `async def overwrite(self, data, table_id: str, overwrite_filter: str = None, **kwargs) -> bool` | Partial overwrite with filter expression |
122+
| `upsert` | `async def upsert(self, data, table_id: str, join_cols: list[str], **kwargs) -> bool` | Merge/upsert based on join columns |
123+
| `add_files` | `async def add_files(self, table_id: str, file_paths: list[str], **kwargs) -> bool` | Register existing Parquet files into table |
124+
| `delete` | `async def delete(self, table_id: str, delete_filter: str, **kwargs) -> None` | Delete rows matching filter |
125+
126+
### Metadata & Utility
127+
128+
| Method | Signature | Description |
129+
|--------|-----------|-------------|
130+
| `schema` | `def schema(self, table_id: str) -> Any` | Get table schema (PyArrow or Iceberg) |
131+
| `metadata` | `async def metadata(self, table_id: str) -> dict` | Get table metadata (snapshots, properties) |
132+
| `history` | `async def history(self, table_id: str) -> list[dict]` | Get snapshot history |
133+
| `snapshots` | `async def snapshots(self, table_id: str) -> list` | List table snapshots |
134+
| `current_snapshot` | `async def current_snapshot(self, table_id: str) -> dict` | Get current snapshot info |
135+
136+
### Query via DuckDB (same pattern as Delta driver)
137+
138+
For SQL queries, the driver will:
139+
1. Load the Iceberg table as a PyArrow dataset.
140+
2. Register it in an in-memory DuckDB connection.
141+
3. Execute the SQL query.
142+
4. Return results in the requested factory format (arrow, pandas, polars).
143+
144+
---
145+
146+
## Dependencies
147+
148+
### pyproject.toml optional dependency group
149+
150+
```toml
151+
[project.optional-dependencies]
152+
iceberg = [
153+
"pyiceberg[pyarrow,pandas,duckdb,polars,s3fs,gcsfs,sql-postgres,hive,ray]>=0.9.0",
154+
]
155+
```
156+
157+
Individual extras from pyiceberg:
158+
- `pyarrow` — core data format
159+
- `pyiceberg-core` — Rust-optimized core (optional, improves performance)
160+
- `pandas` — DataFrame support
161+
- `duckdb` — SQL query engine
162+
- `polars` — Polars DataFrame support
163+
- `s3fs` — S3 storage backend
164+
- `gcsfs` — GCS storage backend
165+
- `sql-postgres` — PostgreSQL-backed catalog
166+
- `hive` — Hive metastore catalog
167+
- `ray` — Ray integration for distributed processing
168+
169+
---
170+
171+
## Acceptance Criteria
172+
173+
1. **Driver registration**: `AsyncDB("iceberg", params={...})` loads the `iceberg` driver via the factory.
174+
2. **Catalog connection**: Successfully connects to at least REST, SQL (PostgreSQL), and file-system catalogs.
175+
3. **Namespace CRUD**: Create, list, and drop namespaces.
176+
4. **Table lifecycle**: Create table from PyArrow schema, load, check existence, rename, drop.
177+
5. **Read data**: Scan table and return as PyArrow Table, Pandas DataFrame, Polars DataFrame, or via DuckDB.
178+
6. **Write data**: Append and overwrite data from PyArrow Table and Pandas DataFrame.
179+
7. **Partial overwrite**: Overwrite rows matching a filter expression.
180+
8. **Upsert**: Merge data using join columns.
181+
9. **Add files**: Register existing Parquet files into an Iceberg table.
182+
10. **Query via DuckDB**: Execute SQL queries over Iceberg tables using DuckDB.
183+
11. **Async compliance**: All I/O methods are async; no blocking of the event loop.
184+
12. **Error handling**: All PyIceberg exceptions are wrapped in `DriverError`.
185+
13. **Context manager**: Supports `async with` pattern.
186+
14. **Tests**: Unit tests covering connection, namespace ops, table ops, read, and write.
187+
15. **Example script**: `examples/test_iceberg.py` demonstrating typical usage.
188+
189+
---
190+
191+
## Architectural Design
192+
193+
### File Layout
194+
195+
```
196+
asyncdb/
197+
drivers/
198+
iceberg.py # Main driver implementation
199+
examples/
200+
test_iceberg.py # Usage example
201+
tests/
202+
test_iceberg.py # Unit/integration tests
203+
```
204+
205+
### Integration Points
206+
207+
- **Factory** (`asyncdb/connections.py`): Auto-discovered by module name — no changes needed.
208+
- **Output formats** (`asyncdb/drivers/outputs/`): Reuse existing `OutputFactory` for arrow/pandas serialization where applicable.
209+
- **Exceptions** (`asyncdb/exceptions/`): Wrap all PyIceberg errors in `DriverError`.
210+
211+
### Concurrency Model
212+
213+
```
214+
User code (async) ──► iceberg driver (async methods)
215+
216+
├── asyncio.to_thread(catalog.load_table(...))
217+
├── asyncio.to_thread(table.scan().to_arrow())
218+
└── asyncio.to_thread(table.append(arrow_table))
219+
```
220+
221+
All PyIceberg blocking calls are offloaded to the default thread pool executor via `asyncio.to_thread()`, preserving the async-first contract of asyncdb.
222+
223+
---
224+
225+
## Risks & Mitigations
226+
227+
| Risk | Impact | Mitigation |
228+
|------|--------|------------|
229+
| PyIceberg is synchronous | Could block event loop if not wrapped | All I/O wrapped in `asyncio.to_thread()` |
230+
| PyIceberg API changes (pre-1.0) | Breaking changes in minor versions | Pin minimum version `>=0.9.0`, test against latest |
231+
| Heavy dependency tree (S3, GCS, Hive) | Large install size | Use optional extras; only install what's needed |
232+
| Upsert not natively supported in all PyIceberg versions | Feature gap | Check version at runtime; raise `NotImplementedError` if unavailable |
233+
| Catalog-specific behaviors differ | Inconsistent behavior across backends | Document supported catalogs; test with REST + SQL at minimum |
234+
235+
---
236+
237+
## Out of Scope
238+
239+
- **Schema evolution** (add/rename/drop columns on existing tables) — future enhancement.
240+
- **Time-travel queries** (query at specific snapshot ID) — future enhancement, though `scan()` accepts `snapshot_id`.
241+
- **Compaction / maintenance** operations — defer to PyIceberg CLI or external tools.
242+
- **Distributed writes via Ray** — initial implementation focuses on single-node; Ray integration is a future enhancement.
243+
- **Custom Iceberg expressions DSL** — use string-based filter expressions initially.
244+
245+
---
246+
247+
## Worktree Strategy
248+
249+
- **Isolation unit**: `per-spec` (sequential tasks in a single worktree)
250+
- **Rationale**: All tasks build on each other (base class → namespace ops → table ops → read → write → tests). No parallelizable tasks.
251+
- **Cross-feature dependencies**: None. This spec is independent of FEAT-001 (exception-migration).
252+
253+
---
254+
255+
## References
256+
257+
- [PyIceberg documentation](https://py.iceberg.apache.org/)
258+
- [PyIceberg API reference](https://py.iceberg.apache.org/api/)
259+
- [Apache Iceberg spec](https://iceberg.apache.org/spec/)
260+
- Existing driver reference: `asyncdb/drivers/delta.py` (closest architectural pattern)
261+
- Existing driver reference: `asyncdb/drivers/bigquery.py` (cloud catalog pattern)

0 commit comments

Comments
 (0)