Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions backend/agents/create_agent_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,12 @@ async def join_minio_file_description_to_query(minio_files, query):
if minio_files and isinstance(minio_files, list):
file_descriptions = []
for file in minio_files:
if isinstance(file, dict) and "description" in file and file["description"]:
file_descriptions.append(file["description"])

if isinstance(file, dict) and "url" in file and file["url"] and "name" in file and file["name"]:
file_descriptions.append(f"File name: {file['name']}, S3 URL: s3:/{file['url']}")
if file_descriptions:
final_query = "User provided some reference files:\n"
final_query = "User uploaded files. The file information is as follows:\n"
final_query += "\n".join(file_descriptions) + "\n\n"
final_query += f"User wants to answer questions based on the above information: {query}"
final_query += f"User wants to answer questions based on the information in the above files: {query}"
return final_query


Expand Down
65 changes: 2 additions & 63 deletions backend/apps/file_management_app.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import logging
import os
from http import HTTPStatus
from typing import List, Optional

from fastapi import APIRouter, Body, File, Form, Header, HTTPException, Path as PathParam, Query, Request, UploadFile
from fastapi import APIRouter, Body, File, Form, Header, HTTPException, Path as PathParam, Query, UploadFile
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse

from consts.model import ProcessParams
from services.file_management_service import upload_to_minio, upload_files_impl, \
get_file_url_impl, get_file_stream_impl, delete_file_impl, list_files_impl, \
preprocess_files_generator
from utils.auth_utils import get_current_user_info
get_file_url_impl, get_file_stream_impl, delete_file_impl, list_files_impl
from utils.file_management_utils import trigger_data_process

logger = logging.getLogger("file_management_app")
Expand Down Expand Up @@ -271,61 +268,3 @@ async def get_storage_file_batch_urls(
"failed_count": sum(1 for r in results if not r.get("success", False)),
"results": results
}


@file_management_runtime_router.post("/preprocess")
async def agent_preprocess_api(
request: Request, query: str = Form(...),
files: List[UploadFile] = File(...),
authorization: Optional[str] = Header(None)
):
"""
Preprocess uploaded files and return streaming response
"""
try:
# Pre-read and cache all file contents
user_id, tenant_id, language = get_current_user_info(
authorization, request)
file_cache = []
for file in files:
try:
content = await file.read()
file_cache.append({
"filename": file.filename or "",
"content": content,
"ext": os.path.splitext(file.filename or "")[1].lower()
})
except Exception as e:
file_cache.append({
"filename": file.filename or "",
"error": str(e)
})

# Generate unique task ID for this preprocess operation
import uuid
task_id = str(uuid.uuid4())
conversation_id = request.query_params.get("conversation_id")
if conversation_id:
conversation_id = int(conversation_id)
else:
conversation_id = -1 # Default for cases without conversation_id

# Call service layer to generate streaming response
return StreamingResponse(
preprocess_files_generator(
query=query,
file_cache=file_cache,
tenant_id=tenant_id,
language=language,
task_id=task_id,
conversation_id=conversation_id
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"File preprocessing error: {str(e)}")
231 changes: 2 additions & 229 deletions backend/services/file_management_service.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import asyncio
import json
import logging
import os
from io import BytesIO
from pathlib import Path
from typing import List, Optional, AsyncGenerator
from typing import List, Optional

import httpx
from fastapi import UploadFile

from agents.preprocess_manager import preprocess_manager
from consts.const import UPLOAD_FOLDER, MAX_CONCURRENT_UPLOADS, DATA_PROCESS_SERVICE, LANGUAGE, MODEL_CONFIG_MAPPING
from consts.const import UPLOAD_FOLDER, MAX_CONCURRENT_UPLOADS, MODEL_CONFIG_MAPPING
from database.attachment_db import (
upload_fileobj,
get_file_url,
Expand All @@ -20,9 +17,7 @@
list_files
)
from services.vectordatabase_service import ElasticSearchService, get_vector_db_core
from utils.attachment_utils import convert_image_to_text, convert_long_text_to_text
from utils.config_utils import tenant_config_manager, get_model_name_from_config
from utils.prompt_template_utils import get_file_processing_messages_template
from utils.file_management_utils import save_upload_file

from nexent import MessageObserver
Expand Down Expand Up @@ -188,228 +183,6 @@ async def list_files_impl(prefix: str, limit: Optional[int] = None):
return files


def get_parsing_file_data(index: int, total_files: int, filename: str) -> dict:
"""
Get structured data for parsing file message

Args:
index: Current file index (0-based)
total_files: Total number of files
filename: Name of the file being parsed

Returns:
dict: Structured data with parameters for internationalization
"""
return {
"params": {
"index": index + 1,
"total": total_files,
"filename": filename
}
}


def get_truncation_data(filename: str, truncation_percentage: int) -> dict:
"""
Get structured data for truncation message

Args:
filename: Name of the file being truncated
truncation_percentage: Percentage of content that was read

Returns:
dict: Structured data with parameters for internationalization
"""
return {
"params": {
"filename": filename,
"percentage": truncation_percentage
}
}


async def preprocess_files_generator(
query: str,
file_cache: List[dict],
tenant_id: str,
language: str,
task_id: str,
conversation_id: int
) -> AsyncGenerator[str, None]:
"""
Generate streaming response for file preprocessing

Args:
query: User query string
file_cache: List of cached file data
tenant_id: Tenant ID
language: Language preference
task_id: Unique task ID
conversation_id: Conversation ID

Yields:
str: JSON formatted streaming messages
"""
file_descriptions = []
total_files = len(file_cache)

# Create and register the preprocess task
task = asyncio.current_task()
if task:
preprocess_manager.register_preprocess_task(
task_id, conversation_id, task)

try:
for index, file_data in enumerate(file_cache):
if task and task.done():
logger.info(f"Preprocess task {task_id} was cancelled")
break

progress = int((index / total_files) * 100)
progress_message = json.dumps({
"type": "progress",
"progress": progress,
"message_data": get_parsing_file_data(index, total_files, file_data['filename'])
}, ensure_ascii=False)
yield f"data: {progress_message}\n\n"
await asyncio.sleep(0.1)

try:
# Check if file already has an error
if "error" in file_data:
raise Exception(file_data["error"])

if file_data["ext"] in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']:
description = await process_image_file(query, file_data["filename"], file_data["content"], tenant_id, language)
truncation_percentage = None
else:
description, truncation_percentage = await process_text_file(query, file_data["filename"], file_data["content"], tenant_id, language)
file_descriptions.append(description)

# Send processing result for each file
file_message_data = {
"type": "file_processed",
"filename": file_data["filename"],
"description": description
}
file_message = json.dumps(
file_message_data, ensure_ascii=False)
yield f"data: {file_message}\n\n"
await asyncio.sleep(0.1)

# Send truncation notice immediately if file was truncated
if truncation_percentage is not None and int(truncation_percentage) < 100:
if int(truncation_percentage) == 0:
truncation_percentage = "< 1"

truncation_message = json.dumps({
"type": "truncation",
"message_data": get_truncation_data(file_data['filename'], truncation_percentage)
}, ensure_ascii=False)
yield f"data: {truncation_message}\n\n"
await asyncio.sleep(0.1)
except Exception as e:
error_description = f"Error parsing file {file_data['filename']}: {str(e)}"
logger.exception(error_description)
file_descriptions.append(error_description)
error_message = json.dumps({
"type": "error",
"filename": file_data["filename"],
"message": error_description
}, ensure_ascii=False)
yield f"data: {error_message}\n\n"
await asyncio.sleep(0.1)

# Send completion message
complete_message = json.dumps({
"type": "complete",
"progress": 100,
"final_query": query
}, ensure_ascii=False)
yield f"data: {complete_message}\n\n"
finally:
preprocess_manager.unregister_preprocess_task(task_id)


async def process_image_file(query: str, filename: str, file_content: bytes, tenant_id: str, language: str = LANGUAGE["ZH"]) -> str:
"""
Process image file, convert to text using external API
"""
# Load messages based on language
messages = get_file_processing_messages_template(language)

try:
image_stream = BytesIO(file_content)
text = convert_image_to_text(query, image_stream, tenant_id, language)
return messages["IMAGE_CONTENT_SUCCESS"].format(filename=filename, content=text)
except Exception as e:
return messages["IMAGE_CONTENT_ERROR"].format(filename=filename, error=str(e))


async def process_text_file(query: str, filename: str, file_content: bytes, tenant_id: str, language: str = LANGUAGE["ZH"]) -> tuple[str, Optional[str]]:
"""
Process text file, convert to text using external API
"""
# Load messages based on language
messages = get_file_processing_messages_template(language)

# file_content is byte data, need to send to API through file upload
data_process_service_url = DATA_PROCESS_SERVICE
api_url = f"{data_process_service_url}/tasks/process_text_file"
logger.info(f"Processing text file {filename} with API: {api_url}")

try:
# Upload byte data as a file
files = {
'file': (filename, file_content, 'application/octet-stream')
}
data = {
'chunking_strategy': 'basic',
'timeout': 60
}
async with httpx.AsyncClient() as client:
response = await client.post(api_url, files=files, data=data, timeout=60)

if response.status_code == 200:
result = response.json()
raw_text = result.get("text", "")
logger.info(
f"File processed successfully: {raw_text[:200]}...{raw_text[-200:]}..., length: {len(raw_text)}")
else:
error_detail = response.json().get('detail', 'unknown error') if response.headers.get(
'content-type', '').startswith('application/json') else response.text
logger.error(
f"File processing failed (status code: {response.status_code}): {error_detail}")
raise Exception(
messages["FILE_PROCESSING_ERROR"].format(status_code=response.status_code, error_detail=error_detail))

except Exception as e:
return messages["FILE_CONTENT_ERROR"].format(filename=filename, error=str(e)), None

try:
text, truncation_percentage = convert_long_text_to_text(
query, raw_text, tenant_id, language)
return messages["FILE_CONTENT_SUCCESS"].format(filename=filename, content=text), truncation_percentage
except Exception as e:
return messages["FILE_CONTENT_ERROR"].format(filename=filename, error=str(e)), None


def get_file_description(files: List[UploadFile]) -> str:
"""
Generate file description text
"""
if not files:
return "User provided some reference files:\nNo files provided"

description = "User provided some reference files:\n"
for file in files:
ext = os.path.splitext(file.filename or "")[1].lower()
if ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']:
description += f"- Image file {file.filename or ''}\n"
else:
description += f"- File {file.filename or ''}\n"
return description

def get_llm_model(tenant_id: str):
# Get the tenant config
main_model_config = tenant_config_manager.get_model_config(
Expand Down
22 changes: 21 additions & 1 deletion frontend/app/[locale]/agents/components/tool/ToolPool.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import { useState, useEffect, useMemo, useCallback, memo } from "react";
import { useTranslation } from "react-i18next";

import { Button, App, Tabs, Collapse } from "antd";
import { Button, App, Tabs, Collapse, Tooltip } from "antd";
import {
SettingOutlined,
LoadingOutlined,
ApiOutlined,
ReloadOutlined,
BulbOutlined,
} from "@ant-design/icons";

import { TOOL_SOURCE_TYPES } from "@/const/agentConfig";
Expand Down Expand Up @@ -643,6 +644,25 @@ function ToolPool({
<h4 className="text-md font-medium text-gray-700">
{t("toolPool.title")}
</h4>
<Tooltip
title={
<div style={{ whiteSpace: "pre-line" }}>
{t("toolPool.tooltip.functionGuide")}
</div>
}
overlayInnerStyle={{
backgroundColor: "#ffffff",
color: "#374151",
border: "1px solid #e5e7eb",
borderRadius: "6px",
boxShadow: "0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06)",
padding: "12px",
maxWidth: "600px",
minWidth: "400px",
}}
>
<BulbOutlined className="ml-2 text-yellow-500" />
</Tooltip>
</div>
<div className="flex items-center gap-2">
<Button
Expand Down
Loading