forked from techwithtim/ProductionGradeRAGPythonApp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
103 lines (88 loc) · 3.57 KB
/
main.py
File metadata and controls
103 lines (88 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import logging
from fastapi import FastAPI
import inngest
import inngest.fast_api
from inngest.experimental import ai
from dotenv import load_dotenv
import uuid
import os
import datetime
from data_loader import load_and_chunk_pdf, embed_texts
from vector_db import QdrantStorage
from custom_types import RAQQueryResult, RAGSearchResult, RAGUpsertResult, RAGChunkAndSrc
load_dotenv()
inngest_client = inngest.Inngest(
app_id="rag_app",
logger=logging.getLogger("uvicorn"),
is_production=False,
serializer=inngest.PydanticSerializer()
)
@inngest_client.create_function(
fn_id="RAG: Ingest PDF",
trigger=inngest.TriggerEvent(event="rag/ingest_pdf"),
throttle=inngest.Throttle(
count=2, period=datetime.timedelta(minutes=1)
),
rate_limit=inngest.RateLimit(
limit=1,
period=datetime.timedelta(hours=4),
key="event.data.source_id",
),
)
async def rag_ingest_pdf(ctx: inngest.Context):
def _load(ctx: inngest.Context) -> RAGChunkAndSrc:
pdf_path = ctx.event.data["pdf_path"]
source_id = ctx.event.data.get("source_id", pdf_path)
chunks = load_and_chunk_pdf(pdf_path)
return RAGChunkAndSrc(chunks=chunks, source_id=source_id)
def _upsert(chunks_and_src: RAGChunkAndSrc) -> RAGUpsertResult:
chunks = chunks_and_src.chunks
source_id = chunks_and_src.source_id
vecs = embed_texts(chunks)
ids = [str(uuid.uuid5(uuid.NAMESPACE_URL, f"{source_id}:{i}")) for i in range(len(chunks))]
payloads = [{"source": source_id, "text": chunks[i]} for i in range(len(chunks))]
QdrantStorage().upsert(ids, vecs, payloads)
return RAGUpsertResult(ingested=len(chunks))
chunks_and_src = await ctx.step.run("load-and-chunk", lambda: _load(ctx), output_type=RAGChunkAndSrc)
ingested = await ctx.step.run("embed-and-upsert", lambda: _upsert(chunks_and_src), output_type=RAGUpsertResult)
return ingested.model_dump()
@inngest_client.create_function(
fn_id="RAG: Query PDF",
trigger=inngest.TriggerEvent(event="rag/query_pdf_ai")
)
async def rag_query_pdf_ai(ctx: inngest.Context):
def _search(question: str, top_k: int = 5) -> RAGSearchResult:
query_vec = embed_texts([question])[0]
store = QdrantStorage()
found = store.search(query_vec, top_k)
return RAGSearchResult(contexts=found["contexts"], sources=found["sources"])
question = ctx.event.data["question"]
top_k = int(ctx.event.data.get("top_k", 5))
found = await ctx.step.run("embed-and-search", lambda: _search(question, top_k), output_type=RAGSearchResult)
context_block = "\n\n".join(f"- {c}" for c in found.contexts)
user_content = (
"Use the following context to answer the question.\n\n"
f"Context:\n{context_block}\n\n"
f"Question: {question}\n"
"Answer concisely using the context above."
)
adapter = ai.openai.Adapter(
auth_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
res = await ctx.step.ai.infer(
"llm-answer",
adapter=adapter,
body={
"max_tokens": 1024,
"temperature": 0.2,
"messages": [
{"role": "system", "content": "You answer questions using only the provided context."},
{"role": "user", "content": user_content}
]
}
)
answer = res["choices"][0]["message"]["content"].strip()
return {"answer": answer, "sources": found.sources, "num_contexts": len(found.contexts)}
app = FastAPI()
inngest.fast_api.serve(app, inngest_client, [rag_ingest_pdf, rag_query_pdf_ai])