-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdebug_ingest.py
More file actions
181 lines (146 loc) · 5.34 KB
/
debug_ingest.py
File metadata and controls
181 lines (146 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import logging
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
from llama_index.core import (
Settings,
SimpleDirectoryReader,
StorageContext,
VectorStoreIndex,
)
from llama_index.core.node_parser import SemanticSplitterNodeParser, SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI as LlamaOpenAI
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import AsyncQdrantClient, QdrantClient
from utils.config_manager import ConfigManager
logging.basicConfig(level=logging.INFO)
documents_dir = Path("E:\\Downloads")
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
env_file = project_root / ".env"
if env_file.exists():
from dotenv import load_dotenv
load_dotenv(env_file)
config_path = project_root / "settings.yaml"
config_manager = ConfigManager(str(config_path))
index: Optional[VectorStoreIndex]
collection_name = config_manager.qdrant_config.collection_name or "rag_documents"
def _init_qdrant_clients() -> tuple[QdrantClient, AsyncQdrantClient]:
cfg = config_manager.qdrant_config
url = cfg.url
api_key = cfg.api_key
prefer_grpc = cfg.prefer_grpc
return QdrantClient(
url=url, api_key=api_key, prefer_grpc=prefer_grpc
), AsyncQdrantClient(url=url, api_key=api_key, prefer_grpc=prefer_grpc)
def _configure_settings() -> None:
llm_cfg = config_manager.llm_config
embedding_cfg = config_manager.embedding_config
ingestion_cfg = config_manager.ingestion_config
chunk_sz = ingestion_cfg.chunk_size
overlap = ingestion_cfg.chunk_overlap
temp = llm_cfg.temperature
max_toks = llm_cfg.max_tokens
Settings.embed_model = OpenAIEmbedding(
model=embedding_cfg.model,
api_key=embedding_cfg.api_key,
)
_llm = LlamaOpenAI(
model=llm_cfg.model,
api_key=llm_cfg.api_key,
temperature=temp,
max_tokens=max_toks,
timeout=llm_cfg.request_timeout,
)
Settings.llm = _llm
Settings.chunk_size = chunk_sz
Settings.node_parser = SentenceSplitter(chunk_size=chunk_sz, chunk_overlap=overlap)
print("Settings loaded")
def _persist_files(files: Sequence[Any]) -> List[str]:
saved_paths: List[str] = []
for file in files:
path = None
if isinstance(file, (str, Path)):
path = Path(file)
elif hasattr(file, "name"):
path = Path(file.name)
destination = documents_dir / path.name
saved_paths.append(str(destination))
return saved_paths
def _load_index() -> Optional[VectorStoreIndex]:
try:
if qdrant_client.collection_exists(collection_name):
return VectorStoreIndex.from_vector_store(
vector_store=vector_store,
storage_context=storage_context,
)
except Exception as exc:
print(f"Failed to load index from Qdrant: {exc}")
return None
_configure_settings()
qdrant_client, qdrant_aclient = _init_qdrant_clients()
vector_store = QdrantVectorStore(
collection_name=collection_name,
client=qdrant_client,
aclient=qdrant_aclient,
enable_hybrid=True,
fastembed_sparse_model="Qdrant/bm25",
batch_size=20,
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = _load_index()
files = [
"Section 4.1.5 - Vocabulary - 8 November 2025 V7.0.pdf",
# "Section 4.1.1 - Instructions to Technical Officials (ITTO) - 17 November 2025 V8.0 (1).pdf",
]
def ingest_documents(
files: Sequence[Any],
) -> Dict[str, Any]:
if not files:
return {"status": "No files provided"}
global index
saved_files = _persist_files(files)
if not saved_files:
return {"status": "Unable to read uploaded files"}
reader = SimpleDirectoryReader(input_files=saved_files)
documents = reader.load_data()
if not documents:
return {"status": "No readable text found in uploaded files"}
chunk_size: Optional[int] = (None,)
chunk_overlap: Optional[int] = (None,)
_configure_settings()
if config_manager.ingestion_config.dynamic_chunk:
parser = SemanticSplitterNodeParser(
include_metadata=True,
include_prev_next_rel=False,
buffer_size=2,
embed_model=Settings.embed_model,
breakpoint_percentile_threshold=90,
)
else:
parser = Settings.node_parser or SentenceSplitter(
chunk_size=chunk_size or config_manager.chat_config.chunk_size,
chunk_overlap=chunk_overlap or config_manager.chat_config.chunk_overlap,
)
nodes = parser.get_nodes_from_documents(documents)
# for i, node in enumerate(nodes):
# # Use source document + chunk index
# source_id = node.metadata.get("file_name", "unknown")
# node.node_id = f"{source_id}_chunk_{i}"
if index is None:
index = VectorStoreIndex(
nodes=nodes,
storage_context=storage_context,
show_progress=True,
)
else:
index.insert_nodes(nodes)
documents_count = 0 # current_docs + len(documents)
chunks_count = 0 # current_chunks + len(nodes)
return {
"status": f"Ingested {len(documents)} documents",
"documents_count": documents_count,
"chunks_count": chunks_count,
}
ingest_documents(files)