Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/config/yaml.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Our pipeline can ingest .csv, .txt, or .json data from an input location. See th
#### Fields

- `storage` **StorageConfig**
- `type` **file|blob|cosmosdb** - The storage type to use. Default=`file`
- `type` **FileStorage|AzureBlobStorage|AzureCosmosStorage** - The storage type to use. Default=`FileStorage`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm iffy on this change. It breaks with current convention, and feels "off" for how you would put values in yaml. I understand the intent from factory registration perspective of course. Maybe worth tagging a couple of other folks to see if there is consensus one way or the other

- `base_dir` **str** - The base directory to write output artifacts to, relative to the root.
- `connection_string` **str** - (blob/cosmosdb only) The Azure Storage connection string.
- `container_name` **str** - (blob/cosmosdb only) The Azure Storage container name.
Expand Down Expand Up @@ -115,7 +115,7 @@ This section controls the storage mechanism used by the pipeline used for export

#### Fields

- `type` **file|memory|blob|cosmosdb** - The storage type to use. Default=`file`
- `type` **FileStorage|AzureBlobStorage|AzureCosmosStorage** - The storage type to use. Default=`FileStorage`
- `base_dir` **str** - The base directory to write output artifacts to, relative to the root.
- `connection_string` **str** - (blob/cosmosdb only) The Azure Storage connection string.
- `container_name` **str** - (blob/cosmosdb only) The Azure Storage container name.
Expand All @@ -128,7 +128,7 @@ The section defines a secondary storage location for running incremental indexin

#### Fields

- `type` **file|memory|blob|cosmosdb** - The storage type to use. Default=`file`
- `type` **FileStorage|AzureBlobStorage|AzureCosmosStorage** - The storage type to use. Default=`FileStorage`
- `base_dir` **str** - The base directory to write output artifacts to, relative to the root.
- `connection_string` **str** - (blob/cosmosdb only) The Azure Storage connection string.
- `container_name` **str** - (blob/cosmosdb only) The Azure Storage container name.
Expand Down
58 changes: 58 additions & 0 deletions packages/graphrag-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# GraphRAG Storage

## Basic

```python
import asyncio
from graphrag_storage import StorageConfig, create_storage
from graphrag_storage.file_storage import FileStorage

async def run():
storage = create_storage(
StorageConfig(
type="FileStorage", # or FileStorage.__name__
base_dir="output"
)
)

await storage.set("my_key", "value")
print(await storage.get("my_key"))

if __name__ == "__main__":
asyncio.run(run())
```

## Custom Storage

```python
import asyncio
from typing import Any
from graphrag_storage import Storage, StorageConfig, create_storage, register_storage

class MyStorage(Storage):
def __init__(self, some_setting: str, **kwargs: Any):
# Validate settings and initialize
...

#Implement rest of interface
...

register_storage("MyStorage", MyStorage)

async def run():
storage = create_storage(
StorageConfig(
type="MyStorage"
some_setting="My Setting"
)
)
# Or use the factory directly to instantiate with a dict instead of using
# StorageConfig + create_factory
# from graphrag_storage.storage_factory import storage_factory
# storage = storage_factory.create(strategy="MyStorage", init_args={"some_setting": "My Setting"})

await storage.set("my_key", "value")
print(await storage.get("my_key"))

if __name__ == "__main__":
asyncio.run(run())
15 changes: 15 additions & 0 deletions packages/graphrag-storage/graphrag_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""The GraphRAG Storage package."""

from graphrag_storage.storage import Storage
from graphrag_storage.storage_config import StorageConfig
from graphrag_storage.storage_factory import create_storage, register_storage

__all__ = [
"Storage",
"StorageConfig",
"create_storage",
"register_storage",
]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""Azure Blob Storage implementation of PipelineStorage."""
"""Azure Blob Storage implementation of Storage."""

import logging
import re
Expand All @@ -12,53 +12,65 @@
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient

from graphrag.storage.pipeline_storage import (
PipelineStorage,
from graphrag_storage.storage import (
Storage,
get_timestamp_formatted_with_local_tz,
)

logger = logging.getLogger(__name__)


class BlobPipelineStorage(PipelineStorage):
class AzureBlobStorage(Storage):
"""The Blob-Storage implementation."""

_connection_string: str | None
_container_name: str
_base_dir: str | None
_encoding: str
_storage_account_blob_url: str | None
_blob_service_client: BlobServiceClient
_storage_account_name: str | None

def __init__(self, **kwargs: Any) -> None:
def __init__(
self,
base_dir: str | None = None,
connection_string: str | None = None,
storage_account_blob_url: str | None = None,
container_name: str | None = None,
encoding: str = "utf-8",
**kwargs: Any,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably the kwargs in these constructors are just to soak additional unknown args from the factory? My preference is definitely to avoid kwargs.get and instead explicitly name it – would it therefore make sense register lambdas with the factory that pluck exact args and perform this soaking function? It's a bit more code, but could set a reasonable pattern of explicit mapping from the factory init call to instances?

) -> None:
"""Create a new BlobStorage instance."""
connection_string = kwargs.get("connection_string")
storage_account_blob_url = kwargs.get("storage_account_blob_url")
base_dir = kwargs.get("base_dir")
container_name = kwargs["container_name"]
if container_name is None:
msg = "No container name provided for blob storage."
raise ValueError(msg)
if connection_string is None and storage_account_blob_url is None:
msg = "No storage account blob url provided for blob storage."
msg = "AzureBlobStorage requires either a connection_string or storage_account_blob_url to be specified."
logger.error(msg)
raise ValueError(msg)

if connection_string is not None and storage_account_blob_url is not None:
msg = "AzureBlobStorage requires only one of connection_string or storage_account_blob_url to be specified, not both."
logger.error(msg)
raise ValueError(msg)

if container_name is None:
msg = "AzureBlobStorage requires a container_name to be specified."
logger.error(msg)
raise ValueError(msg)

_validate_blob_container_name(container_name)

logger.info(
"Creating blob storage at [%s] and base_dir [%s]", container_name, base_dir
)
if connection_string:
self._blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
else:
if storage_account_blob_url is None:
msg = "Either connection_string or storage_account_blob_url must be provided."
raise ValueError(msg)

elif storage_account_blob_url:
self._blob_service_client = BlobServiceClient(
account_url=storage_account_blob_url,
credential=DefaultAzureCredential(),
)
self._encoding = kwargs.get("encoding", "utf-8")
self._encoding = encoding
self._container_name = container_name
self._connection_string = connection_string
self._base_dir = base_dir
Expand Down Expand Up @@ -208,12 +220,12 @@ async def delete(self, key: str) -> None:
async def clear(self) -> None:
"""Clear the cache."""

def child(self, name: str | None) -> "PipelineStorage":
def child(self, name: str | None) -> "Storage":
"""Create a child storage instance."""
if name is None:
return self
path = str(Path(self._base_dir) / name) if self._base_dir else name
return BlobPipelineStorage(
return AzureBlobStorage(
connection_string=self._connection_string,
container_name=self._container_name,
encoding=self._encoding,
Expand Down Expand Up @@ -245,7 +257,7 @@ async def get_creation_date(self, key: str) -> str:
return ""


def validate_blob_container_name(container_name: str):
def _validate_blob_container_name(container_name: str) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could all of these checks be collapsed into a single regex?

"""
Check if the provided blob container name is valid based on Azure rules.

Expand All @@ -267,32 +279,25 @@ def validate_blob_container_name(container_name: str):
"""
# Check the length of the name
if len(container_name) < 3 or len(container_name) > 63:
return ValueError(
f"Container name must be between 3 and 63 characters in length. Name provided was {len(container_name)} characters long."
)
msg = f"Container name must be between 3 and 63 characters in length. Name provided was {len(container_name)} characters long."
raise ValueError(msg)

# Check if the name starts with a letter or number
if not container_name[0].isalnum():
return ValueError(
f"Container name must start with a letter or number. Starting character was {container_name[0]}."
)
msg = f"Container name must start with a letter or number. Starting character was {container_name[0]}."
raise ValueError(msg)

# Check for valid characters (letters, numbers, hyphen) and lowercase letters
if not re.match(r"^[a-z0-9-]+$", container_name):
return ValueError(
f"Container name must only contain:\n- lowercase letters\n- numbers\n- or hyphens\nName provided was {container_name}."
)
msg = f"Container name must only contain:\n- lowercase letters\n- numbers\n- or hyphens\nName provided was {container_name}."
raise ValueError(msg)

# Check for consecutive hyphens
if "--" in container_name:
return ValueError(
f"Container name cannot contain consecutive hyphens. Name provided was {container_name}."
)
msg = f"Container name cannot contain consecutive hyphens. Name provided was {container_name}."
raise ValueError(msg)

# Check for hyphens at the end of the name
if container_name[-1] == "-":
return ValueError(
f"Container name cannot end with a hyphen. Name provided was {container_name}."
)

return True
msg = f"Container name cannot end with a hyphen. Name provided was {container_name}."
raise ValueError(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from azure.cosmos.partition_key import PartitionKey
from azure.identity import DefaultAzureCredential

from graphrag.logger.progress import Progress
from graphrag.storage.pipeline_storage import (
PipelineStorage,

from graphrag_storage.storage import (
Storage,
get_timestamp_formatted_with_local_tz,
)

logger = logging.getLogger(__name__)


class CosmosDBPipelineStorage(PipelineStorage):
class AzureCosmosStorage(Storage):
"""The CosmosDB-Storage Implementation."""

_cosmos_client: CosmosClient
Expand All @@ -39,28 +39,40 @@ class CosmosDBPipelineStorage(PipelineStorage):
_encoding: str
_no_id_prefixes: list[str]

def __init__(self, **kwargs: Any) -> None:
def __init__(
self,
base_dir: str | None = None,
container_name: str | None = None,
connection_string: str | None = None,
cosmosdb_account_url: str | None = None,
**kwargs: Any,
) -> None:
"""Create a CosmosDB storage instance."""
logger.info("Creating cosmosdb storage")
cosmosdb_account_url = kwargs.get("cosmosdb_account_url")
connection_string = kwargs.get("connection_string")
database_name = kwargs["base_dir"]
container_name = kwargs["container_name"]
if not database_name:
msg = "No base_dir provided for database name"
database_name = base_dir
if database_name is None:
msg = "CosmosDB Storage requires a base_dir to be specified. This is used as the database name."
logger.error(msg)
raise ValueError(msg)

if connection_string is None and cosmosdb_account_url is None:
msg = "connection_string or cosmosdb_account_url is required."
msg = "CosmosDB Storage requires either a connection_string or cosmosdb_account_url to be specified."
logger.error(msg)
raise ValueError(msg)

if connection_string is not None and cosmosdb_account_url is not None:
msg = "CosmosDB Storage requires either a connection_string or cosmosdb_account_url to be specified, not both."
logger.error(msg)
raise ValueError(msg)

if container_name is None:
msg = "CosmosDB Storage requires a container_name to be specified."
logger.error(msg)
raise ValueError(msg)

if connection_string:
self._cosmos_client = CosmosClient.from_connection_string(connection_string)
else:
if cosmosdb_account_url is None:
msg = (
"Either connection_string or cosmosdb_account_url must be provided."
)
raise ValueError(msg)
elif cosmosdb_account_url:
self._cosmos_client = CosmosClient(
url=cosmosdb_account_url,
credential=DefaultAzureCredential(),
Expand Down Expand Up @@ -307,7 +319,7 @@ def keys(self) -> list[str]:
msg = "CosmosDB storage does yet not support listing keys."
raise NotImplementedError(msg)

def child(self, name: str | None) -> PipelineStorage:
def child(self, name: str | None) -> "Storage":
"""Create a child storage instance."""
return self

Expand Down
Loading