-
Notifications
You must be signed in to change notification settings - Fork 106
Iceberg: POC Iceberg Catalog #1059
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,174 @@ | ||||||||||||||||||||
| import type { IcebergApi } from "core/ports/IcebergApi"; | ||||||||||||||||||||
| import type { SqlOlap } from "core/ports/SqlOlap"; | ||||||||||||||||||||
| import { id } from "tsafe/id"; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| export type IcebergCatalogConfig = { | ||||||||||||||||||||
| name: string; | ||||||||||||||||||||
| warehouse: string; | ||||||||||||||||||||
| endpoint: string; | ||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Returns a fresh bearer token before each request. | ||||||||||||||||||||
| * Each catalog can have its own auth provider (different OIDC clients, | ||||||||||||||||||||
| * different realms, etc.). Returns undefined for public catalogs. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| getAccessToken: () => Promise<string | undefined>; | ||||||||||||||||||||
| }; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Creates one IcebergApi instance that manages multiple catalogs. | ||||||||||||||||||||
| * All catalog configs (endpoint, warehouse, token provider) are fixed at | ||||||||||||||||||||
| * creation time — callers select which catalog to use via the `catalog` param. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| export function createDuckDbIcebergApi(params: { | ||||||||||||||||||||
| sqlOlap: SqlOlap; | ||||||||||||||||||||
| catalogs: IcebergCatalogConfig[]; | ||||||||||||||||||||
| }): IcebergApi { | ||||||||||||||||||||
| const { sqlOlap, catalogs } = params; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| function secretName(catalogName: string): string { | ||||||||||||||||||||
|
Check warning on line 28 in web/src/core/adapters/icebergApi/icebergApi.ts
|
||||||||||||||||||||
| return `iceberg_${catalogName}`; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Eagerly install the iceberg extension, create secrets and attach all | ||||||||||||||||||||
| // catalogs in a single connection so everything is ready before the first query. | ||||||||||||||||||||
| const prDb = (async () => { | ||||||||||||||||||||
| const { db } = await sqlOlap.getConfiguredAsyncDuckDb(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const conn = await db.connect(); | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| await conn.query("INSTALL iceberg;\nLOAD iceberg;"); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| for (const catalogConfig of catalogs) { | ||||||||||||||||||||
| const token = await catalogConfig.getAccessToken(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (token !== undefined) { | ||||||||||||||||||||
| await conn.query( | ||||||||||||||||||||
| [ | ||||||||||||||||||||
| `CREATE OR REPLACE SECRET "${secretName(catalogConfig.name)}" (`, | ||||||||||||||||||||
| ` TYPE iceberg,`, | ||||||||||||||||||||
| ` TOKEN '${token}'`, | ||||||||||||||||||||
| ");" | ||||||||||||||||||||
| ].join("\n") | ||||||||||||||||||||
| ); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const attachLines = [ | ||||||||||||||||||||
| `ATTACH '${catalogConfig.warehouse}' AS "${catalogConfig.name}" (`, | ||||||||||||||||||||
| ` TYPE iceberg,`, | ||||||||||||||||||||
| ...(token !== undefined | ||||||||||||||||||||
|
Check warning on line 58 in web/src/core/adapters/icebergApi/icebergApi.ts
|
||||||||||||||||||||
| ? [` SECRET '${secretName(catalogConfig.name)}',`] | ||||||||||||||||||||
| : []), | ||||||||||||||||||||
| ` ENDPOINT '${catalogConfig.endpoint}'`, | ||||||||||||||||||||
| ");" | ||||||||||||||||||||
| ]; | ||||||||||||||||||||
| await conn.query(attachLines.join("\n")); | ||||||||||||||||||||
|
Comment on lines
+45
to
+64
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL injection risk from string interpolation. Values are interpolated directly into SQL strings without escaping:
While these values originate from controlled sources (OIDC tokens, region config), a token or config value containing a single quote ( 🛡️ Proposed minimal escaping+function escapeSqlString(value: string): string {
+ return value.replace(/'/g, "''");
+}
+
if (token !== undefined) {
await conn.query(
[
`CREATE OR REPLACE SECRET "${secretName(catalogConfig.name)}" (`,
` TYPE iceberg,`,
- ` TOKEN '${token}'`,
+ ` TOKEN '${escapeSqlString(token)}'`,
");"
].join("\n")
);
}🧰 Tools🪛 GitHub Check: SonarCloud Code Analysis[warning] 58-58: Unexpected negated condition. 🤖 Prompt for AI Agents |
||||||||||||||||||||
| } | ||||||||||||||||||||
| } finally { | ||||||||||||||||||||
| await conn.close(); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return db; | ||||||||||||||||||||
| })(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return { | ||||||||||||||||||||
| listAllTables: async () => { | ||||||||||||||||||||
| let db: import("@duckdb/duckdb-wasm").AsyncDuckDB; | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| db = await prDb; | ||||||||||||||||||||
| } catch { | ||||||||||||||||||||
| return id<IcebergApi.ListAllTablesResult.Failed>({ | ||||||||||||||||||||
| errorCause: "network error" | ||||||||||||||||||||
| }); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const conn = await db.connect(); | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| const result = await conn.query( | ||||||||||||||||||||
| `SELECT table_catalog AS database, table_schema AS schema, table_name AS name FROM information_schema.tables;` | ||||||||||||||||||||
| ); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const tables: IcebergApi.TableEntry[] = result.toArray().map(row => ({ | ||||||||||||||||||||
| catalog: String(row["database"]), | ||||||||||||||||||||
| namespace: String(row["schema"]), | ||||||||||||||||||||
| name: String(row["name"]) | ||||||||||||||||||||
| })); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return id<IcebergApi.ListAllTablesResult.Success>({ tables }); | ||||||||||||||||||||
| } catch (e) { | ||||||||||||||||||||
| const cause = classifyError(e); | ||||||||||||||||||||
| return id<IcebergApi.ListAllTablesResult.Failed>({ | ||||||||||||||||||||
| errorCause: | ||||||||||||||||||||
| cause === "unauthorized" ? "unauthorized" : "network error" | ||||||||||||||||||||
| }); | ||||||||||||||||||||
| } finally { | ||||||||||||||||||||
| await conn.close(); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }, | ||||||||||||||||||||
|
|
||||||||||||||||||||
| fetchTablePreview: async ({ catalog: catalogName, namespace, table, limit }) => { | ||||||||||||||||||||
| const catalogConfig = catalogs.find(c => c.name === catalogName); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (catalogConfig === undefined) { | ||||||||||||||||||||
| return id<IcebergApi.FetchTablePreviewResult.Failed>({ | ||||||||||||||||||||
| errorCause: "network error" | ||||||||||||||||||||
| }); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| let db: import("@duckdb/duckdb-wasm").AsyncDuckDB; | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| db = await prDb; | ||||||||||||||||||||
| } catch { | ||||||||||||||||||||
| return id<IcebergApi.FetchTablePreviewResult.Failed>({ | ||||||||||||||||||||
| errorCause: "network error" | ||||||||||||||||||||
| }); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const conn = await db.connect(); | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| const result = await conn.query( | ||||||||||||||||||||
| `SELECT * FROM "${catalogName}"."${namespace}"."${table}" LIMIT ${limit};` | ||||||||||||||||||||
| ); | ||||||||||||||||||||
|
Comment on lines
+128
to
+130
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL injection risk in table preview query.
-const result = await conn.query(
- `SELECT * FROM "${catalogName}"."${namespace}"."${table}" LIMIT ${limit};`
-);
+const escapedCatalog = catalogName.replace(/"/g, '""');
+const escapedNamespace = namespace.replace(/"/g, '""');
+const escapedTable = table.replace(/"/g, '""');
+const result = await conn.query(
+ `SELECT * FROM "${escapedCatalog}"."${escapedNamespace}"."${escapedTable}" LIMIT ${limit};`
+);📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
|
|
||||||||||||||||||||
| const columns: IcebergApi.Column[] = ( | ||||||||||||||||||||
| result.schema.fields as { | ||||||||||||||||||||
| name: string; | ||||||||||||||||||||
| type: { toString(): string }; | ||||||||||||||||||||
| nullable: boolean; | ||||||||||||||||||||
| }[] | ||||||||||||||||||||
| ).map((field, index) => ({ | ||||||||||||||||||||
| fieldId: index, | ||||||||||||||||||||
| name: field.name, | ||||||||||||||||||||
| rawType: field.type.toString(), | ||||||||||||||||||||
| isRequired: !field.nullable | ||||||||||||||||||||
| })); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const rows: Record<string, unknown>[] = result | ||||||||||||||||||||
| .toArray() | ||||||||||||||||||||
| .map(row => Object.fromEntries(Object.entries(row))); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return id<IcebergApi.FetchTablePreviewResult.Success>({ columns, rows }); | ||||||||||||||||||||
| } catch (e) { | ||||||||||||||||||||
| return id<IcebergApi.FetchTablePreviewResult.Failed>({ | ||||||||||||||||||||
| errorCause: classifyError(e) | ||||||||||||||||||||
| }); | ||||||||||||||||||||
| } finally { | ||||||||||||||||||||
| await conn.close(); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||||||||||
| // Error classification | ||||||||||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||||||||||
|
|
||||||||||||||||||||
| function classifyError(e: unknown): "unauthorized" | "table not found" | "network error" { | ||||||||||||||||||||
| const msg = (e instanceof Error ? e.message : String(e)).toLowerCase(); | ||||||||||||||||||||
| if (msg.includes("401") || msg.includes("unauthorized") || msg.includes("403")) { | ||||||||||||||||||||
| return "unauthorized"; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if (msg.includes("not found") || msg.includes("does not exist")) { | ||||||||||||||||||||
| return "table not found"; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| return "network error"; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| export * from "./icebergApi"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: InseeFrLab/onyxia
Length of output: 105
🏁 Script executed:
cat -n web/src/core/adapters/icebergApi/icebergApi.ts | head -80Repository: InseeFrLab/onyxia
Length of output: 3450
🏁 Script executed:
Repository: InseeFrLab/onyxia
Length of output: 88
🏁 Script executed:
Repository: InseeFrLab/onyxia
Length of output: 88
🌐 Web query:
DuckDB WASM CREATE OR REPLACE SECRET support token refresh dynamic update💡 Result:
CREATE OR REPLACE SECRETitself is supported in DuckDB SQL, but whether it’s useful in DuckDB-Wasm depends on whether the relevant secret types are available (they’re registered by extensions such ashttpfs/aws). (duckdb.org)httpfsis not available (Wasm uses a separate browser-oriented HTTP implementation), so the common secret types provided byhttpfs(e.g.,s3,http,huggingface, etc.) are not available out of the box in DuckDB-Wasm. (duckdb.org)Token refresh / dynamic update
awssupportsREFRESH autofor credential refresh;httpfshas a refresh mechanism that can recreate/replace secrets when invalid). (duckdb.org)CREATE OR REPLACE SECRETto overwrite the stored token value.CREATE SECRET(e.g., set a variable, then create/replace the secret fromgetvariable(...); update by changing the variable and runningCREATE OR REPLACE SECRETagain). (duckdb.org)Implication: if you need
s3/http/bearer-token secrets with refresh in the browser, you generally must either (a) handle refresh outside DuckDB-Wasm and replace the secret manually, or (b) use a custom DuckDB-Wasm build/extension strategy that actually provides those secret types and refresh hooks (since the stock Wasm distribution doesn’t includehttpfs). (duckdb.org)Citations:
Implement token refresh before queries to honor the interface contract.
The
IcebergCatalogConfig.getAccessToken()interface documents that it returns "a fresh bearer token before each request", but the current implementation calls it only once during initialization. For long-running browser sessions, the token stored in the secret will expire while queries continue using the stale value, causing authentication failures.Refresh the secrets before queries by calling
getAccessToken()again and re-runningCREATE OR REPLACE SECRET. DuckDB-Wasm supports this pattern for dynamic token updates.🤖 Prompt for AI Agents