Skip to content

Commit 7ebe5e1

Browse files
authored
Merge branch 'main' into delete-all-astra-ds
2 parents 5f2b76d + 0a0e2e2 commit 7ebe5e1

File tree

16 files changed

+763
-65
lines changed

16 files changed

+763
-65
lines changed

integrations/deepeval/tests/test_evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def eval(self, test_cases, metric) -> EvaluationResult:
8080
retrieval_context=x.retrieval_context,
8181
)
8282
out.append(r)
83-
return EvaluationResult(test_results=out, confident_link=None)
83+
return EvaluationResult(test_results=out, confident_link=None, test_run_id=None)
8484

8585

8686
def test_evaluator_metric_init_params(monkeypatch):

integrations/elasticsearch/CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
# Changelog
22

3+
## [integrations/elasticsearch-v4.1.0] - 2025-10-09
4+
5+
### 🚀 Features
6+
7+
- Adding the operation `delete_all_documents` to the `ElasticSearchDocumentStore` (#2320)
8+
9+
310
## [integrations/elasticsearch-v4.0.0] - 2025-09-24
411

512
### 🚀 Features
613

7-
- [**breaking**] Adding `api_token` and `apit_token_id` authentication support to `ElasticSearchDocumentStore` (#2292)
14+
- [**breaking**] Adding `api_token` and `apit_token_id` support to `ElasticSearchDocumentStore` (#2292)
815

916
### 🧹 Chores
1017

integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py

Lines changed: 129 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
# ruff: noqa: FBT002, FBT001 boolean-type-hint-positional-argument and boolean-default-value-positional-argument
6+
# ruff: noqa: B008 function-call-in-default-argument
7+
# ruff: noqa: S101 disable checks for uses of the assert keyword
8+
9+
510
from collections.abc import Mapping
611
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
712

@@ -68,8 +73,8 @@ def __init__(
6873
hosts: Optional[Hosts] = None,
6974
custom_mapping: Optional[Dict[str, Any]] = None,
7075
index: str = "default",
71-
api_key: Secret = Secret.from_env_var("ELASTIC_API_KEY", strict=False), # noqa: B008
72-
api_key_id: Secret = Secret.from_env_var("ELASTIC_API_KEY_ID", strict=False), # noqa: B008
76+
api_key: Secret = Secret.from_env_var("ELASTIC_API_KEY", strict=False),
77+
api_key_id: Secret = Secret.from_env_var("ELASTIC_API_KEY_ID", strict=False),
7378
embedding_similarity_function: Literal["cosine", "dot_product", "l2_norm", "max_inner_product"] = "cosine",
7479
**kwargs: Any,
7580
):
@@ -119,6 +124,29 @@ def __init__(
119124
msg = "custom_mapping must be a dictionary"
120125
raise ValueError(msg)
121126

127+
if not self._custom_mapping:
128+
self._default_mappings = {
129+
"properties": {
130+
"embedding": {
131+
"type": "dense_vector",
132+
"index": True,
133+
"similarity": self._embedding_similarity_function,
134+
},
135+
"content": {"type": "text"},
136+
},
137+
"dynamic_templates": [
138+
{
139+
"strings": {
140+
"path_match": "*",
141+
"match_mapping_type": "string",
142+
"mapping": {
143+
"type": "keyword",
144+
},
145+
}
146+
}
147+
],
148+
}
149+
122150
def _ensure_initialized(self):
123151
"""
124152
Ensures both sync and async clients are initialized and the index exists.
@@ -150,27 +178,7 @@ def _ensure_initialized(self):
150178
mappings = self._custom_mapping
151179
else:
152180
# Configure mapping for the embedding field if none is provided
153-
mappings = {
154-
"properties": {
155-
"embedding": {
156-
"type": "dense_vector",
157-
"index": True,
158-
"similarity": self._embedding_similarity_function,
159-
},
160-
"content": {"type": "text"},
161-
},
162-
"dynamic_templates": [
163-
{
164-
"strings": {
165-
"path_match": "*",
166-
"match_mapping_type": "string",
167-
"mapping": {
168-
"type": "keyword",
169-
},
170-
}
171-
}
172-
],
173-
}
181+
mappings = self._default_mappings
174182

175183
# Create the index if it doesn't exist
176184
if not self._client.indices.exists(index=self._index):
@@ -227,7 +235,7 @@ def client(self) -> Elasticsearch:
227235
Returns the synchronous Elasticsearch client, initializing it if necessary.
228236
"""
229237
self._ensure_initialized()
230-
assert self._client is not None # noqa: S101
238+
assert self._client is not None
231239
return self._client
232240

233241
@property
@@ -236,7 +244,7 @@ def async_client(self) -> AsyncElasticsearch:
236244
Returns the asynchronous Elasticsearch client, initializing it if necessary.
237245
"""
238246
self._ensure_initialized()
239-
assert self._async_client is not None # noqa: S101
247+
assert self._async_client is not None
240248
return self._async_client
241249

242250
def to_dict(self) -> Dict[str, Any]:
@@ -450,7 +458,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
450458

451459
if errors:
452460
# with stats_only=False, errors is guaranteed to be a list of dicts
453-
assert isinstance(errors, list) # noqa: S101
461+
assert isinstance(errors, list)
454462
duplicate_errors_ids = []
455463
other_errors = []
456464
for e in errors:
@@ -529,7 +537,7 @@ async def write_documents_async(
529537
)
530538
if failed:
531539
# with stats_only=False, failed is guaranteed to be a list of dicts
532-
assert isinstance(failed, list) # noqa: S101
540+
assert isinstance(failed, list)
533541
if policy == DuplicatePolicy.FAIL:
534542
for error in failed:
535543
if "create" in error and error["create"]["status"] == DOC_ALREADY_EXISTS:
@@ -556,6 +564,14 @@ def delete_documents(self, document_ids: List[str]) -> None:
556564
raise_on_error=False,
557565
)
558566

567+
def _prepare_delete_all_request(self, *, is_async: bool) -> Dict[str, Any]:
568+
return {
569+
"index": self._index,
570+
"body": {"query": {"match_all": {}}}, # Delete all documents
571+
"wait_for_completion": False if is_async else True, # block until done (set False for async)
572+
"refresh": True, # Ensure changes are visible immediately
573+
}
574+
559575
async def delete_documents_async(self, document_ids: List[str]) -> None:
560576
"""
561577
Asynchronously deletes all documents with a matching document_ids from the document store.
@@ -575,6 +591,92 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
575591
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
576592
raise DocumentStoreError(msg) from e
577593

594+
def delete_all_documents(self, recreate_index: bool = False) -> None:
595+
"""
596+
Deletes all documents in the document store.
597+
598+
A fast way to clear all documents from the document store while preserving any index settings and mappings.
599+
600+
:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
601+
settings. If False, all documents will be deleted using the `delete_by_query` API.
602+
"""
603+
self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists
604+
605+
if recreate_index:
606+
# get the current index mappings and settings
607+
index_name = self._index
608+
mappings = self._client.indices.get(index=self._index)[index_name]["mappings"] # type: ignore
609+
settings = self._client.indices.get(index=self._index)[index_name]["settings"] # type: ignore
610+
611+
# remove settings that cannot be set during index creation
612+
settings["index"].pop("uuid", None)
613+
settings["index"].pop("creation_date", None)
614+
settings["index"].pop("provided_name", None)
615+
settings["index"].pop("version", None)
616+
617+
self._client.indices.delete(index=self._index) # type: ignore
618+
self._client.indices.create(index=self._index, settings=settings, mappings=mappings) # type: ignore
619+
620+
# delete index
621+
self._client.indices.delete(index=self._index) # type: ignore
622+
623+
# recreate with mappings
624+
self._client.indices.create(index=self._index, mappings=mappings) # type: ignore
625+
626+
else:
627+
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) # type: ignore
628+
logger.info(
629+
"Deleted all the {n_docs} documents from the index '{index}'.",
630+
index=self._index,
631+
n_docs=result["deleted"],
632+
)
633+
634+
async def delete_all_documents_async(self, recreate_index: bool = False) -> None:
635+
"""
636+
Asynchronously deletes all documents in the document store.
637+
638+
A fast way to clear all documents from the document store while preserving any index settings and mappings.
639+
:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
640+
settings. If False, all documents will be deleted using the `delete_by_query` API.
641+
"""
642+
self._ensure_initialized() # ensures _async_client is not None
643+
644+
try:
645+
if recreate_index:
646+
# get the current index mappings and settings
647+
index_name = self._index
648+
index_info = await self._async_client.indices.get(index=self._index) # type: ignore
649+
mappings = index_info[index_name]["mappings"]
650+
settings = index_info[index_name]["settings"]
651+
652+
# remove settings that cannot be set during index creation
653+
settings["index"].pop("uuid", None)
654+
settings["index"].pop("creation_date", None)
655+
settings["index"].pop("provided_name", None)
656+
settings["index"].pop("version", None)
657+
658+
# delete index
659+
await self._async_client.indices.delete(index=self._index) # type: ignore
660+
661+
# recreate with settings and mappings
662+
await self._async_client.indices.create(index=self._index, settings=settings, mappings=mappings) # type: ignore
663+
664+
else:
665+
# use delete_by_query for more efficient deletion without index recreation
666+
# For async, we need to wait for completion to get the deleted count
667+
delete_request = self._prepare_delete_all_request(is_async=True)
668+
delete_request["wait_for_completion"] = True # Override to wait for completion in async
669+
result = await self._async_client.delete_by_query(**delete_request) # type: ignore
670+
logger.info(
671+
"Deleted all the {n_docs} documents from the index '{index}'.",
672+
index=self._index,
673+
n_docs=result["deleted"],
674+
)
675+
676+
except Exception as e:
677+
msg = f"Failed to delete all documents from Elasticsearch: {e!s}"
678+
raise DocumentStoreError(msg) from e
679+
578680
def _bm25_retrieval(
579681
self,
580682
query: str,

0 commit comments

Comments
 (0)