Skip to content

Conversation

@NikitaMatskevich
Copy link
Contributor

Rationale for this change

We are using default credential pipeline to get access to Azure (more concretely, managed identities). We found out that fsspec library only allows it if we set anon=False and specify the account name.

Thus, the anon property is added to pyiceberg config of the file io.

Are these changes tested?

We've tested that this works with the following snippet:

import os
from fsspec import AbstractFileSystem
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.catalog.rest import RestCatalog
from typing import Any

ADLS_ANON = "adls.anon"
ADLS_CONNECTION_STRING = "adls.connection-string"
ADLS_ACCOUNT_NAME = "adls.account-name"
ADLS_ACCOUNT_KEY = "adls.account-key"
ADLS_SAS_TOKEN = "adls.sas-token"
ADLS_TENANT_ID = "adls.tenant-id"
ADLS_CLIENT_ID = "adls.client-id"
ADLS_CLIENT_SECRET = "adls.client-secret"
ADLS_ACCOUNT_HOST = "adls.account-host"

Properties = dict[str, Any]

def my_adls(properties: Properties) -> AbstractFileSystem:
    from adlfs import AzureBlobFileSystem

    for key, sas_token in {
        key.replace(f"{ADLS_SAS_TOKEN}.", ""): value for key, value in properties.items() if key.startswith(ADLS_SAS_TOKEN)
    }.items():
        if ADLS_ACCOUNT_NAME not in properties:
            properties[ADLS_ACCOUNT_NAME] = key.split(".")[0]
        if ADLS_SAS_TOKEN not in properties:
            properties[ADLS_SAS_TOKEN] = sas_token

    return AzureBlobFileSystem(
        connection_string=properties.get(ADLS_CONNECTION_STRING),
        anon=properties.get(ADLS_ANON),
        account_name=properties.get(ADLS_ACCOUNT_NAME),
        account_key=properties.get(ADLS_ACCOUNT_KEY),
        sas_token=properties.get(ADLS_SAS_TOKEN),
        tenant_id=properties.get(ADLS_TENANT_ID),
        client_id=properties.get(ADLS_CLIENT_ID),
        client_secret=properties.get(ADLS_CLIENT_SECRET),
        account_host=properties.get(ADLS_ACCOUNT_HOST),
    )

injected_file_io = FsspecFileIO(properties={ADLS_ANON: False, ADLS_ACCOUNT_NAME: "usagestorageprod"})
injected_file_io.get_fs = lambda scheme: my_adls(injected_file_io.properties)

CATALOG_URI = "https://lakehouse..."

catalog_config = {
    "uri": CATALOG_URI,
    "properties": {
        "io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
    },
    ...
}

catalog = RestCatalog("lakehouse", **catalog_config)
catalog.file_io = injected_file_io

table = catalog.load_table("some_ns.some_table")
table.io = injected_file_io
table.scan(snapshot_id=xxx).count()

Are there any user-facing changes?

Zero breaking changes

@kevinjqliu
Copy link
Contributor

@NikitaMatskevich thanks for the PR! could you fix ci?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants