Skip to content

Commit 57eb992

Browse files
committed
✨ Multi modal agent.
delete unuse function
1 parent 762f164 commit 57eb992

File tree

4 files changed

+2
-818
lines changed

4 files changed

+2
-818
lines changed

backend/apps/file_management_app.py

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
import logging
2-
import os
32
from http import HTTPStatus
43
from typing import List, Optional
54

6-
from fastapi import APIRouter, Body, File, Form, Header, HTTPException, Path as PathParam, Query, Request, UploadFile
5+
from fastapi import APIRouter, Body, File, Form, Header, HTTPException, Path as PathParam, Query, UploadFile
76
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse
87

98
from consts.model import ProcessParams
109
from services.file_management_service import upload_to_minio, upload_files_impl, \
11-
get_file_url_impl, get_file_stream_impl, delete_file_impl, list_files_impl, \
12-
preprocess_files_generator
13-
from utils.auth_utils import get_current_user_info
10+
get_file_url_impl, get_file_stream_impl, delete_file_impl, list_files_impl
1411
from utils.file_management_utils import trigger_data_process
1512

1613
logger = logging.getLogger("file_management_app")
@@ -271,61 +268,3 @@ async def get_storage_file_batch_urls(
271268
"failed_count": sum(1 for r in results if not r.get("success", False)),
272269
"results": results
273270
}
274-
275-
276-
@file_management_runtime_router.post("/preprocess")
277-
async def agent_preprocess_api(
278-
request: Request, query: str = Form(...),
279-
files: List[UploadFile] = File(...),
280-
authorization: Optional[str] = Header(None)
281-
):
282-
"""
283-
Preprocess uploaded files and return streaming response
284-
"""
285-
try:
286-
# Pre-read and cache all file contents
287-
user_id, tenant_id, language = get_current_user_info(
288-
authorization, request)
289-
file_cache = []
290-
for file in files:
291-
try:
292-
content = await file.read()
293-
file_cache.append({
294-
"filename": file.filename or "",
295-
"content": content,
296-
"ext": os.path.splitext(file.filename or "")[1].lower()
297-
})
298-
except Exception as e:
299-
file_cache.append({
300-
"filename": file.filename or "",
301-
"error": str(e)
302-
})
303-
304-
# Generate unique task ID for this preprocess operation
305-
import uuid
306-
task_id = str(uuid.uuid4())
307-
conversation_id = request.query_params.get("conversation_id")
308-
if conversation_id:
309-
conversation_id = int(conversation_id)
310-
else:
311-
conversation_id = -1 # Default for cases without conversation_id
312-
313-
# Call service layer to generate streaming response
314-
return StreamingResponse(
315-
preprocess_files_generator(
316-
query=query,
317-
file_cache=file_cache,
318-
tenant_id=tenant_id,
319-
language=language,
320-
task_id=task_id,
321-
conversation_id=conversation_id
322-
),
323-
media_type="text/event-stream",
324-
headers={
325-
"Cache-Control": "no-cache",
326-
"Connection": "keep-alive"
327-
}
328-
)
329-
except Exception as e:
330-
raise HTTPException(
331-
status_code=500, detail=f"File preprocessing error: {str(e)}")

backend/services/file_management_service.py

Lines changed: 0 additions & 221 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
list_files
2121
)
2222
from services.vectordatabase_service import ElasticSearchService, get_vector_db_core
23-
from utils.attachment_utils import convert_image_to_text, convert_long_text_to_text
2423
from utils.config_utils import tenant_config_manager, get_model_name_from_config
25-
from utils.prompt_template_utils import get_file_processing_messages_template
2624
from utils.file_management_utils import save_upload_file
2725

2826
from nexent import MessageObserver
@@ -188,225 +186,6 @@ async def list_files_impl(prefix: str, limit: Optional[int] = None):
188186
return files
189187

190188

191-
def get_parsing_file_data(index: int, total_files: int, filename: str) -> dict:
192-
"""
193-
Get structured data for parsing file message
194-
195-
Args:
196-
index: Current file index (0-based)
197-
total_files: Total number of files
198-
filename: Name of the file being parsed
199-
200-
Returns:
201-
dict: Structured data with parameters for internationalization
202-
"""
203-
return {
204-
"params": {
205-
"index": index + 1,
206-
"total": total_files,
207-
"filename": filename
208-
}
209-
}
210-
211-
212-
def get_truncation_data(filename: str, truncation_percentage: int) -> dict:
213-
"""
214-
Get structured data for truncation message
215-
216-
Args:
217-
filename: Name of the file being truncated
218-
truncation_percentage: Percentage of content that was read
219-
220-
Returns:
221-
dict: Structured data with parameters for internationalization
222-
"""
223-
return {
224-
"params": {
225-
"filename": filename,
226-
"percentage": truncation_percentage
227-
}
228-
}
229-
230-
231-
async def preprocess_files_generator(
232-
query: str,
233-
file_cache: List[dict],
234-
tenant_id: str,
235-
language: str,
236-
task_id: str,
237-
conversation_id: int
238-
) -> AsyncGenerator[str, None]:
239-
"""
240-
Generate streaming response for file preprocessing
241-
242-
Args:
243-
query: User query string
244-
file_cache: List of cached file data
245-
tenant_id: Tenant ID
246-
language: Language preference
247-
task_id: Unique task ID
248-
conversation_id: Conversation ID
249-
250-
Yields:
251-
str: JSON formatted streaming messages
252-
"""
253-
file_descriptions = []
254-
total_files = len(file_cache)
255-
256-
# Create and register the preprocess task
257-
task = asyncio.current_task()
258-
if task:
259-
preprocess_manager.register_preprocess_task(
260-
task_id, conversation_id, task)
261-
262-
try:
263-
for index, file_data in enumerate(file_cache):
264-
if task and task.done():
265-
logger.info(f"Preprocess task {task_id} was cancelled")
266-
break
267-
268-
progress = int((index / total_files) * 100)
269-
progress_message = json.dumps({
270-
"type": "progress",
271-
"progress": progress,
272-
"message_data": get_parsing_file_data(index, total_files, file_data['filename'])
273-
}, ensure_ascii=False)
274-
yield f"data: {progress_message}\n\n"
275-
await asyncio.sleep(0.1)
276-
277-
try:
278-
# Check if file already has an error
279-
if "error" in file_data:
280-
raise Exception(file_data["error"])
281-
282-
description = ""
283-
truncation_percentage = None
284-
file_descriptions.append(description)
285-
286-
# Send processing result for each file
287-
file_message_data = {
288-
"type": "file_processed",
289-
"filename": file_data["filename"],
290-
"description": description
291-
}
292-
file_message = json.dumps(
293-
file_message_data, ensure_ascii=False)
294-
yield f"data: {file_message}\n\n"
295-
await asyncio.sleep(0.1)
296-
297-
# Send truncation notice immediately if file was truncated
298-
if truncation_percentage is not None and int(truncation_percentage) < 100:
299-
if int(truncation_percentage) == 0:
300-
truncation_percentage = "< 1"
301-
302-
truncation_message = json.dumps({
303-
"type": "truncation",
304-
"message_data": get_truncation_data(file_data['filename'], truncation_percentage)
305-
}, ensure_ascii=False)
306-
yield f"data: {truncation_message}\n\n"
307-
await asyncio.sleep(0.1)
308-
except Exception as e:
309-
error_description = f"Error parsing file {file_data['filename']}: {str(e)}"
310-
logger.exception(error_description)
311-
file_descriptions.append(error_description)
312-
error_message = json.dumps({
313-
"type": "error",
314-
"filename": file_data["filename"],
315-
"message": error_description
316-
}, ensure_ascii=False)
317-
yield f"data: {error_message}\n\n"
318-
await asyncio.sleep(0.1)
319-
320-
# Send completion message
321-
complete_message = json.dumps({
322-
"type": "complete",
323-
"progress": 100,
324-
"final_query": query
325-
}, ensure_ascii=False)
326-
yield f"data: {complete_message}\n\n"
327-
finally:
328-
preprocess_manager.unregister_preprocess_task(task_id)
329-
330-
331-
async def process_image_file(query: str, filename: str, file_content: bytes, tenant_id: str, language: str = LANGUAGE["ZH"]) -> str:
332-
"""
333-
Process image file, convert to text using external API
334-
"""
335-
# Load messages based on language
336-
messages = get_file_processing_messages_template(language)
337-
338-
try:
339-
image_stream = BytesIO(file_content)
340-
text = convert_image_to_text(query, image_stream, tenant_id, language)
341-
return messages["IMAGE_CONTENT_SUCCESS"].format(filename=filename, content=text)
342-
except Exception as e:
343-
return messages["IMAGE_CONTENT_ERROR"].format(filename=filename, error=str(e))
344-
345-
346-
async def process_text_file(query: str, filename: str, file_content: bytes, tenant_id: str, language: str = LANGUAGE["ZH"]) -> tuple[str, Optional[str]]:
347-
"""
348-
Process text file, convert to text using external API
349-
"""
350-
# Load messages based on language
351-
messages = get_file_processing_messages_template(language)
352-
353-
# file_content is byte data, need to send to API through file upload
354-
data_process_service_url = DATA_PROCESS_SERVICE
355-
api_url = f"{data_process_service_url}/tasks/process_text_file"
356-
logger.info(f"Processing text file {filename} with API: {api_url}")
357-
358-
try:
359-
# Upload byte data as a file
360-
files = {
361-
'file': (filename, file_content, 'application/octet-stream')
362-
}
363-
data = {
364-
'chunking_strategy': 'basic',
365-
'timeout': 60
366-
}
367-
async with httpx.AsyncClient() as client:
368-
response = await client.post(api_url, files=files, data=data, timeout=60)
369-
370-
if response.status_code == 200:
371-
result = response.json()
372-
raw_text = result.get("text", "")
373-
logger.info(
374-
f"File processed successfully: {raw_text[:200]}...{raw_text[-200:]}..., length: {len(raw_text)}")
375-
else:
376-
error_detail = response.json().get('detail', 'unknown error') if response.headers.get(
377-
'content-type', '').startswith('application/json') else response.text
378-
logger.error(
379-
f"File processing failed (status code: {response.status_code}): {error_detail}")
380-
raise Exception(
381-
messages["FILE_PROCESSING_ERROR"].format(status_code=response.status_code, error_detail=error_detail))
382-
383-
except Exception as e:
384-
return messages["FILE_CONTENT_ERROR"].format(filename=filename, error=str(e)), None
385-
386-
try:
387-
text, truncation_percentage = convert_long_text_to_text(
388-
query, raw_text, tenant_id, language)
389-
return messages["FILE_CONTENT_SUCCESS"].format(filename=filename, content=text), truncation_percentage
390-
except Exception as e:
391-
return messages["FILE_CONTENT_ERROR"].format(filename=filename, error=str(e)), None
392-
393-
394-
def get_file_description(files: List[UploadFile]) -> str:
395-
"""
396-
Generate file description text
397-
"""
398-
if not files:
399-
return "User provided some reference files:\nNo files provided"
400-
401-
description = "User provided some reference files:\n"
402-
for file in files:
403-
ext = os.path.splitext(file.filename or "")[1].lower()
404-
if ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']:
405-
description += f"- Image file {file.filename or ''}\n"
406-
else:
407-
description += f"- File {file.filename or ''}\n"
408-
return description
409-
410189
def get_llm_model(tenant_id: str):
411190
# Get the tenant config
412191
main_model_config = tenant_config_manager.get_model_config(

test/backend/app/test_file_management_app.py

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -357,66 +357,3 @@ def fake_get(object_name, expires):
357357
assert any(item["object_name"] == "bad" and item["success"] is False for item in out["results"])
358358

359359

360-
@pytest.mark.asyncio
361-
async def test_agent_preprocess_api_success(monkeypatch):
362-
# Patch get_current_user_info
363-
monkeypatch.setattr(file_management_app, "get_current_user_info", lambda a, r: ("u", "t", "en"))
364-
365-
# Provide an async generator object for streaming
366-
async def gen():
367-
yield "data: {\\\"type\\\": \\\"complete\\\"}\n\n"
368-
369-
monkeypatch.setattr(file_management_app, "preprocess_files_generator", lambda **_: gen())
370-
371-
# Mock files
372-
f1 = make_upload_file("a.txt", b"hello")
373-
f2 = make_upload_file("b.jpg", b"img")
374-
375-
# Minimal ASGI scope for Request
376-
from starlette.requests import Request
377-
scope = {
378-
"type": "http",
379-
"method": "POST",
380-
"path": "/file/preprocess",
381-
"headers": [],
382-
"query_string": b"conversation_id=42",
383-
}
384-
req = Request(scope)
385-
386-
resp = await file_management_app.agent_preprocess_api(
387-
request=req, query="q", files=[f1, f2], authorization="Bearer x"
388-
)
389-
assert resp.media_type == "text/event-stream"
390-
assert resp.headers.get("Cache-Control") == "no-cache"
391-
# Consume a small portion of stream
392-
chunks = []
393-
async for part in resp.body_iterator: # type: ignore[attr-defined]
394-
chunks.append(part)
395-
break
396-
assert chunks
397-
398-
399-
@pytest.mark.asyncio
400-
async def test_agent_preprocess_api_error_from_auth(monkeypatch):
401-
def boom_auth(a, r):
402-
raise RuntimeError("auth failed")
403-
404-
monkeypatch.setattr(file_management_app, "get_current_user_info", boom_auth)
405-
406-
from starlette.requests import Request
407-
scope = {
408-
"type": "http",
409-
"method": "POST",
410-
"path": "/file/preprocess",
411-
"headers": [],
412-
"query_string": b"",
413-
}
414-
req = Request(scope)
415-
416-
with pytest.raises(Exception) as ei:
417-
await file_management_app.agent_preprocess_api(
418-
request=req, query="q", files=[make_upload_file("a.txt")], authorization=None
419-
)
420-
assert "File preprocessing error" in str(ei.value)
421-
422-

0 commit comments

Comments
 (0)