diff --git a/chakra_py/client.py b/chakra_py/client.py index f5d9519..08134f1 100644 --- a/chakra_py/client.py +++ b/chakra_py/client.py @@ -11,6 +11,7 @@ from .exceptions import ChakraAPIError BASE_URL = "https://api.chakra.dev".rstrip("/") + DEFAULT_BATCH_SIZE = 1000 TOKEN_PREFIX = "DDB_" @@ -250,6 +251,20 @@ def _import_data_from_presigned_url(self, table_name: str, s3_key: str) -> None: ) response.raise_for_status() + def _import_data_from_append_only_dedupe_presigned_url( + self, table_name: str, s3_key: str, primary_key_columns: list[str] + ) -> None: + """Import data from a presigned URL into a table.""" + response = self._session.post( + f"{BASE_URL}/api/v1/tables/s3_parquet_import_append_only_dedupe", + json={ + "table_name": table_name, + "s3_key": s3_key, + "primary_key_columns": primary_key_columns, + }, + ) + response.raise_for_status() + def _delete_file_from_s3(self, s3_key: str) -> None: """Delete a file from S3.""" response = self._session.delete( @@ -270,6 +285,8 @@ def push( data: pd.DataFrame, create_if_missing: bool = True, replace_if_exists: bool = False, + dedupe_on_append: bool = False, + primary_key_columns: list[str] = [], ) -> None: """Push data to a table.""" if not self.token: @@ -320,7 +337,12 @@ def push( # Import the data into the warehouse from the presigned URL pbar.set_description("Importing data into warehouse...") - self._import_data_from_presigned_url(table_name, s3_key) + if dedupe_on_append: + self._import_data_from_append_only_dedupe_presigned_url( + table_name, s3_key, primary_key_columns + ) + else: + self._import_data_from_presigned_url(table_name, s3_key) pbar.update(1) # Clean up the data that was previously uploaded