diff --git a/backend/agents/create_agent_info.py b/backend/agents/create_agent_info.py index a999baf48..ca801109f 100644 --- a/backend/agents/create_agent_info.py +++ b/backend/agents/create_agent_info.py @@ -314,13 +314,12 @@ async def join_minio_file_description_to_query(minio_files, query): if minio_files and isinstance(minio_files, list): file_descriptions = [] for file in minio_files: - if isinstance(file, dict) and "description" in file and file["description"]: - file_descriptions.append(file["description"]) - + if isinstance(file, dict) and "url" in file and file["url"] and "name" in file and file["name"]: + file_descriptions.append(f"File name: {file['name']}, S3 URL: s3:/{file['url']}") if file_descriptions: - final_query = "User provided some reference files:\n" + final_query = "User uploaded files. The file information is as follows:\n" final_query += "\n".join(file_descriptions) + "\n\n" - final_query += f"User wants to answer questions based on the above information: {query}" + final_query += f"User wants to answer questions based on the information in the above files: {query}" return final_query diff --git a/backend/apps/file_management_app.py b/backend/apps/file_management_app.py index 448b03a61..19e382ba1 100644 --- a/backend/apps/file_management_app.py +++ b/backend/apps/file_management_app.py @@ -1,16 +1,13 @@ import logging -import os from http import HTTPStatus from typing import List, Optional -from fastapi import APIRouter, Body, File, Form, Header, HTTPException, Path as PathParam, Query, Request, UploadFile +from fastapi import APIRouter, Body, File, Form, Header, HTTPException, Path as PathParam, Query, UploadFile from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from consts.model import ProcessParams from services.file_management_service import upload_to_minio, upload_files_impl, \ - get_file_url_impl, get_file_stream_impl, delete_file_impl, list_files_impl, \ - preprocess_files_generator -from utils.auth_utils import get_current_user_info + get_file_url_impl, get_file_stream_impl, delete_file_impl, list_files_impl from utils.file_management_utils import trigger_data_process logger = logging.getLogger("file_management_app") @@ -271,61 +268,3 @@ async def get_storage_file_batch_urls( "failed_count": sum(1 for r in results if not r.get("success", False)), "results": results } - - -@file_management_runtime_router.post("/preprocess") -async def agent_preprocess_api( - request: Request, query: str = Form(...), - files: List[UploadFile] = File(...), - authorization: Optional[str] = Header(None) -): - """ - Preprocess uploaded files and return streaming response - """ - try: - # Pre-read and cache all file contents - user_id, tenant_id, language = get_current_user_info( - authorization, request) - file_cache = [] - for file in files: - try: - content = await file.read() - file_cache.append({ - "filename": file.filename or "", - "content": content, - "ext": os.path.splitext(file.filename or "")[1].lower() - }) - except Exception as e: - file_cache.append({ - "filename": file.filename or "", - "error": str(e) - }) - - # Generate unique task ID for this preprocess operation - import uuid - task_id = str(uuid.uuid4()) - conversation_id = request.query_params.get("conversation_id") - if conversation_id: - conversation_id = int(conversation_id) - else: - conversation_id = -1 # Default for cases without conversation_id - - # Call service layer to generate streaming response - return StreamingResponse( - preprocess_files_generator( - query=query, - file_cache=file_cache, - tenant_id=tenant_id, - language=language, - task_id=task_id, - conversation_id=conversation_id - ), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive" - } - ) - except Exception as e: - raise HTTPException( - status_code=500, detail=f"File preprocessing error: {str(e)}") diff --git a/backend/services/file_management_service.py b/backend/services/file_management_service.py index 3e8fb9f3c..1461c1b56 100644 --- a/backend/services/file_management_service.py +++ b/backend/services/file_management_service.py @@ -1,16 +1,13 @@ import asyncio -import json import logging import os from io import BytesIO from pathlib import Path -from typing import List, Optional, AsyncGenerator +from typing import List, Optional -import httpx from fastapi import UploadFile -from agents.preprocess_manager import preprocess_manager -from consts.const import UPLOAD_FOLDER, MAX_CONCURRENT_UPLOADS, DATA_PROCESS_SERVICE, LANGUAGE, MODEL_CONFIG_MAPPING +from consts.const import UPLOAD_FOLDER, MAX_CONCURRENT_UPLOADS, MODEL_CONFIG_MAPPING from database.attachment_db import ( upload_fileobj, get_file_url, @@ -20,9 +17,7 @@ list_files ) from services.vectordatabase_service import ElasticSearchService, get_vector_db_core -from utils.attachment_utils import convert_image_to_text, convert_long_text_to_text from utils.config_utils import tenant_config_manager, get_model_name_from_config -from utils.prompt_template_utils import get_file_processing_messages_template from utils.file_management_utils import save_upload_file from nexent import MessageObserver @@ -188,228 +183,6 @@ async def list_files_impl(prefix: str, limit: Optional[int] = None): return files -def get_parsing_file_data(index: int, total_files: int, filename: str) -> dict: - """ - Get structured data for parsing file message - - Args: - index: Current file index (0-based) - total_files: Total number of files - filename: Name of the file being parsed - - Returns: - dict: Structured data with parameters for internationalization - """ - return { - "params": { - "index": index + 1, - "total": total_files, - "filename": filename - } - } - - -def get_truncation_data(filename: str, truncation_percentage: int) -> dict: - """ - Get structured data for truncation message - - Args: - filename: Name of the file being truncated - truncation_percentage: Percentage of content that was read - - Returns: - dict: Structured data with parameters for internationalization - """ - return { - "params": { - "filename": filename, - "percentage": truncation_percentage - } - } - - -async def preprocess_files_generator( - query: str, - file_cache: List[dict], - tenant_id: str, - language: str, - task_id: str, - conversation_id: int -) -> AsyncGenerator[str, None]: - """ - Generate streaming response for file preprocessing - - Args: - query: User query string - file_cache: List of cached file data - tenant_id: Tenant ID - language: Language preference - task_id: Unique task ID - conversation_id: Conversation ID - - Yields: - str: JSON formatted streaming messages - """ - file_descriptions = [] - total_files = len(file_cache) - - # Create and register the preprocess task - task = asyncio.current_task() - if task: - preprocess_manager.register_preprocess_task( - task_id, conversation_id, task) - - try: - for index, file_data in enumerate(file_cache): - if task and task.done(): - logger.info(f"Preprocess task {task_id} was cancelled") - break - - progress = int((index / total_files) * 100) - progress_message = json.dumps({ - "type": "progress", - "progress": progress, - "message_data": get_parsing_file_data(index, total_files, file_data['filename']) - }, ensure_ascii=False) - yield f"data: {progress_message}\n\n" - await asyncio.sleep(0.1) - - try: - # Check if file already has an error - if "error" in file_data: - raise Exception(file_data["error"]) - - if file_data["ext"] in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']: - description = await process_image_file(query, file_data["filename"], file_data["content"], tenant_id, language) - truncation_percentage = None - else: - description, truncation_percentage = await process_text_file(query, file_data["filename"], file_data["content"], tenant_id, language) - file_descriptions.append(description) - - # Send processing result for each file - file_message_data = { - "type": "file_processed", - "filename": file_data["filename"], - "description": description - } - file_message = json.dumps( - file_message_data, ensure_ascii=False) - yield f"data: {file_message}\n\n" - await asyncio.sleep(0.1) - - # Send truncation notice immediately if file was truncated - if truncation_percentage is not None and int(truncation_percentage) < 100: - if int(truncation_percentage) == 0: - truncation_percentage = "< 1" - - truncation_message = json.dumps({ - "type": "truncation", - "message_data": get_truncation_data(file_data['filename'], truncation_percentage) - }, ensure_ascii=False) - yield f"data: {truncation_message}\n\n" - await asyncio.sleep(0.1) - except Exception as e: - error_description = f"Error parsing file {file_data['filename']}: {str(e)}" - logger.exception(error_description) - file_descriptions.append(error_description) - error_message = json.dumps({ - "type": "error", - "filename": file_data["filename"], - "message": error_description - }, ensure_ascii=False) - yield f"data: {error_message}\n\n" - await asyncio.sleep(0.1) - - # Send completion message - complete_message = json.dumps({ - "type": "complete", - "progress": 100, - "final_query": query - }, ensure_ascii=False) - yield f"data: {complete_message}\n\n" - finally: - preprocess_manager.unregister_preprocess_task(task_id) - - -async def process_image_file(query: str, filename: str, file_content: bytes, tenant_id: str, language: str = LANGUAGE["ZH"]) -> str: - """ - Process image file, convert to text using external API - """ - # Load messages based on language - messages = get_file_processing_messages_template(language) - - try: - image_stream = BytesIO(file_content) - text = convert_image_to_text(query, image_stream, tenant_id, language) - return messages["IMAGE_CONTENT_SUCCESS"].format(filename=filename, content=text) - except Exception as e: - return messages["IMAGE_CONTENT_ERROR"].format(filename=filename, error=str(e)) - - -async def process_text_file(query: str, filename: str, file_content: bytes, tenant_id: str, language: str = LANGUAGE["ZH"]) -> tuple[str, Optional[str]]: - """ - Process text file, convert to text using external API - """ - # Load messages based on language - messages = get_file_processing_messages_template(language) - - # file_content is byte data, need to send to API through file upload - data_process_service_url = DATA_PROCESS_SERVICE - api_url = f"{data_process_service_url}/tasks/process_text_file" - logger.info(f"Processing text file {filename} with API: {api_url}") - - try: - # Upload byte data as a file - files = { - 'file': (filename, file_content, 'application/octet-stream') - } - data = { - 'chunking_strategy': 'basic', - 'timeout': 60 - } - async with httpx.AsyncClient() as client: - response = await client.post(api_url, files=files, data=data, timeout=60) - - if response.status_code == 200: - result = response.json() - raw_text = result.get("text", "") - logger.info( - f"File processed successfully: {raw_text[:200]}...{raw_text[-200:]}..., length: {len(raw_text)}") - else: - error_detail = response.json().get('detail', 'unknown error') if response.headers.get( - 'content-type', '').startswith('application/json') else response.text - logger.error( - f"File processing failed (status code: {response.status_code}): {error_detail}") - raise Exception( - messages["FILE_PROCESSING_ERROR"].format(status_code=response.status_code, error_detail=error_detail)) - - except Exception as e: - return messages["FILE_CONTENT_ERROR"].format(filename=filename, error=str(e)), None - - try: - text, truncation_percentage = convert_long_text_to_text( - query, raw_text, tenant_id, language) - return messages["FILE_CONTENT_SUCCESS"].format(filename=filename, content=text), truncation_percentage - except Exception as e: - return messages["FILE_CONTENT_ERROR"].format(filename=filename, error=str(e)), None - - -def get_file_description(files: List[UploadFile]) -> str: - """ - Generate file description text - """ - if not files: - return "User provided some reference files:\nNo files provided" - - description = "User provided some reference files:\n" - for file in files: - ext = os.path.splitext(file.filename or "")[1].lower() - if ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: - description += f"- Image file {file.filename or ''}\n" - else: - description += f"- File {file.filename or ''}\n" - return description - def get_llm_model(tenant_id: str): # Get the tenant config main_model_config = tenant_config_manager.get_model_config( diff --git a/frontend/app/[locale]/agents/components/tool/ToolPool.tsx b/frontend/app/[locale]/agents/components/tool/ToolPool.tsx index 71d284ca3..0f1132ec7 100644 --- a/frontend/app/[locale]/agents/components/tool/ToolPool.tsx +++ b/frontend/app/[locale]/agents/components/tool/ToolPool.tsx @@ -3,12 +3,13 @@ import { useState, useEffect, useMemo, useCallback, memo } from "react"; import { useTranslation } from "react-i18next"; -import { Button, App, Tabs, Collapse } from "antd"; +import { Button, App, Tabs, Collapse, Tooltip } from "antd"; import { SettingOutlined, LoadingOutlined, ApiOutlined, ReloadOutlined, + BulbOutlined, } from "@ant-design/icons"; import { TOOL_SOURCE_TYPES } from "@/const/agentConfig"; @@ -643,6 +644,25 @@ function ToolPool({

{t("toolPool.title")}

+ + {t("toolPool.tooltip.functionGuide")} + + } + overlayInnerStyle={{ + backgroundColor: "#ffffff", + color: "#374151", + border: "1px solid #e5e7eb", + borderRadius: "6px", + boxShadow: "0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06)", + padding: "12px", + maxWidth: "600px", + minWidth: "400px", + }} + > + +