@@ -87,7 +87,7 @@ def __init__(
8787 Creates a new OpenSearchDocumentStore instance.
8888
8989 The ``embeddings_dim``, ``method``, ``mappings``, and ``settings`` arguments are only used if the index does not
90- exists and needs to be created. If the index already exists, its current configurations will be used.
90+ exist and needs to be created. If the index already exists, its current configurations will be used.
9191
9292 For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)
9393
@@ -107,7 +107,7 @@ def __init__(
107107 :param settings: The settings of the index to be created. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#index-settings)
108108 for more information. Defaults to {"index.knn": True}
109109 :param create_index: Whether to create the index if it doesn't exist. Defaults to True
110- :param http_auth: http_auth param passed to the underying connection class.
110+ :param http_auth: http_auth param passed to the underlying connection class.
111111 For basic authentication with default connection class `Urllib3HttpConnection` this can be
112112 - a tuple of (username, password)
113113 - a list of [username, password]
@@ -319,7 +319,8 @@ async def count_documents_async(self) -> int:
319319 assert self ._async_client is not None
320320 return (await self ._async_client .count (index = self ._index ))["count" ]
321321
322- def _deserialize_search_hits (self , hits : List [Dict [str , Any ]]) -> List [Document ]:
322+ @staticmethod
323+ def _deserialize_search_hits (hits : List [Dict [str , Any ]]) -> List [Document ]:
323324 out = []
324325 for hit in hits :
325326 data = hit ["_source" ]
@@ -344,12 +345,12 @@ def _prepare_filter_search_request(self, filters: Optional[Dict[str, Any]]) -> D
344345 def _search_documents (self , request_body : Dict [str , Any ]) -> List [Document ]:
345346 assert self ._client is not None
346347 search_results = self ._client .search (index = self ._index , body = request_body )
347- return self ._deserialize_search_hits (search_results ["hits" ]["hits" ])
348+ return OpenSearchDocumentStore ._deserialize_search_hits (search_results ["hits" ]["hits" ])
348349
349350 async def _search_documents_async (self , request_body : Dict [str , Any ]) -> List [Document ]:
350351 assert self ._async_client is not None
351352 search_results = await self ._async_client .search (index = self ._index , body = request_body )
352- return self ._deserialize_search_hits (search_results ["hits" ]["hits" ])
353+ return OpenSearchDocumentStore ._deserialize_search_hits (search_results ["hits" ]["hits" ])
353354
354355 def filter_documents (self , filters : Optional [Dict [str , Any ]] = None ) -> List [Document ]:
355356 """
@@ -418,7 +419,8 @@ def _prepare_bulk_write_request(
418419 "stats_only" : False ,
419420 }
420421
421- def _process_bulk_write_errors (self , errors : List [Dict [str , Any ]], policy : DuplicatePolicy ) -> None :
422+ @staticmethod
423+ def _process_bulk_write_errors (errors : List [Dict [str , Any ]], policy : DuplicatePolicy ) -> None :
422424 if len (errors ) == 0 :
423425 return
424426
@@ -461,7 +463,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
461463
462464 bulk_params = self ._prepare_bulk_write_request (documents = documents , policy = policy , is_async = False )
463465 documents_written , errors = bulk (** bulk_params )
464- self ._process_bulk_write_errors (errors , policy )
466+ OpenSearchDocumentStore ._process_bulk_write_errors (errors , policy )
465467 return documents_written
466468
467469 async def write_documents_async (
@@ -478,10 +480,11 @@ async def write_documents_async(
478480 bulk_params = self ._prepare_bulk_write_request (documents = documents , policy = policy , is_async = True )
479481 documents_written , errors = await async_bulk (** bulk_params )
480482 # since we call async_bulk with stats_only=False, errors is guaranteed to be a list (not int)
481- self ._process_bulk_write_errors (errors = errors , policy = policy ) # type: ignore[arg-type]
483+ OpenSearchDocumentStore ._process_bulk_write_errors (errors = errors , policy = policy ) # type: ignore[arg-type]
482484 return documents_written
483485
484- def _deserialize_document (self , hit : Dict [str , Any ]) -> Document :
486+ @staticmethod
487+ def _deserialize_document (hit : Dict [str , Any ]) -> Document :
485488 """
486489 Creates a Document from the search hit provided.
487490 This is mostly useful in self.filter_documents().
@@ -525,6 +528,86 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
525528
526529 await async_bulk (** self ._prepare_bulk_delete_request (document_ids = document_ids , is_async = True ))
527530
531+ def _prepare_delete_all_request (self , * , is_async : bool ) -> Dict [str , Any ]:
532+ return {
533+ "index" : self ._index ,
534+ "body" : {"query" : {"match_all" : {}}}, # Delete all documents
535+ "wait_for_completion" : False if is_async else True , # block until done (set False for async)
536+ }
537+
538+ def delete_all_documents (self , recreate_index : bool = False ) -> None : # noqa: FBT002, FBT001
539+ """
540+ Deletes all documents in the document store.
541+
542+ :param recreate_index: If True, the index will be deleted and recreated with the original mappings and
543+ settings. If False, all documents will be deleted using the `delete_by_query` API.
544+ """
545+ self ._ensure_initialized ()
546+ assert self ._client is not None
547+
548+ try :
549+ if recreate_index :
550+ # get the current index mappings and settings
551+ index_name = self ._index
552+ body = {
553+ "mappings" : self ._client .indices .get (self ._index )[index_name ]["mappings" ],
554+ "settings" : self ._client .indices .get (self ._index )[index_name ]["settings" ],
555+ }
556+ body ["settings" ]["index" ].pop ("uuid" , None )
557+ body ["settings" ]["index" ].pop ("creation_date" , None )
558+ body ["settings" ]["index" ].pop ("provided_name" , None )
559+ body ["settings" ]["index" ].pop ("version" , None )
560+ self ._client .indices .delete (index = self ._index )
561+ self ._client .indices .create (index = self ._index , body = body )
562+ logger .info (
563+ "The index '{index}' recreated with the original mappings and settings." ,
564+ index = self ._index ,
565+ )
566+
567+ else :
568+ result = self ._client .delete_by_query (** self ._prepare_delete_all_request (is_async = False ))
569+ logger .info (
570+ "Deleted all the {n_docs} documents from the index '{index}'." ,
571+ index = self ._index ,
572+ n_docs = result ["deleted" ],
573+ )
574+ except Exception as e :
575+ msg = f"Failed to delete all documents from OpenSearch: { e !s} "
576+ raise DocumentStoreError (msg ) from e
577+
578+ async def delete_all_documents_async (self , recreate_index : bool = False ) -> None : # noqa: FBT002, FBT001
579+ """
580+ Asynchronously deletes all documents in the document store.
581+
582+ :param recreate_index: If True, the index will be deleted and recreated with the original mappings and
583+ settings. If False, all documents will be deleted using the `delete_by_query` API.
584+ """
585+ self ._ensure_initialized ()
586+ assert self ._async_client is not None
587+
588+ try :
589+ if recreate_index :
590+ # get the current index mappings and settings
591+ index_name = self ._index
592+ index_info = await self ._async_client .indices .get (self ._index )
593+ body = {
594+ "mappings" : index_info [index_name ]["mappings" ],
595+ "settings" : index_info [index_name ]["settings" ],
596+ }
597+ body ["settings" ]["index" ].pop ("uuid" , None )
598+ body ["settings" ]["index" ].pop ("creation_date" , None )
599+ body ["settings" ]["index" ].pop ("provided_name" , None )
600+ body ["settings" ]["index" ].pop ("version" , None )
601+
602+ await self ._async_client .indices .delete (index = self ._index )
603+ await self ._async_client .indices .create (index = self ._index , body = body )
604+ else :
605+ await self ._async_client .delete_by_query (** self ._prepare_delete_all_request (is_async = True ))
606+
607+ except Exception as e :
608+ msg = f"Failed to delete all documents from OpenSearch: { e !s} "
609+ raise DocumentStoreError (msg ) from e
610+
528611 def _prepare_bm25_search_request (
529612 self ,
530613 * ,
@@ -580,7 +663,8 @@ def _prepare_bm25_search_request(
580663
581664 return body
582665
583- def _postprocess_bm25_search_results (self , * , results : List [Document ], scale_score : bool ) -> None :
666+ @staticmethod
667+ def _postprocess_bm25_search_results (* , results : List [Document ], scale_score : bool ) -> None :
584668 if not scale_score :
585669 return
586670
@@ -624,7 +708,7 @@ def _bm25_retrieval(
624708 custom_query = custom_query ,
625709 )
626710 documents = self ._search_documents (search_params )
627- self ._postprocess_bm25_search_results (results = documents , scale_score = scale_score )
711+ OpenSearchDocumentStore ._postprocess_bm25_search_results (results = documents , scale_score = scale_score )
628712 return documents
629713
630714 async def _bm25_retrieval_async (
@@ -663,7 +747,7 @@ async def _bm25_retrieval_async(
663747 custom_query = custom_query ,
664748 )
665749 documents = await self ._search_documents_async (search_params )
666- self ._postprocess_bm25_search_results (results = documents , scale_score = scale_score )
750+ OpenSearchDocumentStore ._postprocess_bm25_search_results (results = documents , scale_score = scale_score )
667751 return documents
668752
669753 def _prepare_embedding_search_request (
0 commit comments