diff --git a/server.py b/server.py index bf2ac6c1..d5162e75 100644 --- a/server.py +++ b/server.py @@ -414,6 +414,393 @@ def parse_tool_result_content(content): except: return "Unparseable content" + +# ============================================================================ +# Web Search Support via OpenAI Responses API +# ============================================================================ +# When the Claude CLI sends a request with web_search tool, we route it to +# OpenAI's Responses API (which supports native web search) instead of the +# standard Chat Completions API. + +def has_web_search_tool(tools: Optional[List[Any]]) -> bool: + """Check if the tools list contains a web_search tool (Anthropic-defined tool).""" + if not tools: + return False + for tool in tools: + tool_dict = tool.dict() if hasattr(tool, 'dict') else tool + tool_type = tool_dict.get("type", "") + if tool_type and tool_type.startswith("web_search"): + return True + return False + + +def convert_anthropic_messages_to_input(messages: List[Any], system: Optional[Any] = None) -> str: + """Convert Anthropic messages format to a single input string for OpenAI Responses API.""" + parts = [] + + # Add system message if present + if system: + if isinstance(system, str): + parts.append(f"System: {system}") + elif isinstance(system, list): + system_text = "" + for block in system: + if hasattr(block, 'type') and block.type == "text": + system_text += block.text + "\n" + elif isinstance(block, dict) and block.get("type") == "text": + system_text += block.get("text", "") + "\n" + if system_text: + parts.append(f"System: {system_text.strip()}") + + # Convert messages + for msg in messages: + role = msg.role if hasattr(msg, 'role') else msg.get('role', 'user') + content = msg.content if hasattr(msg, 'content') else msg.get('content', '') + + if isinstance(content, str): + parts.append(f"{role.capitalize()}: {content}") + elif isinstance(content, list): + text_parts = [] + for block in content: + if hasattr(block, 'type'): + if block.type == "text": + text_parts.append(block.text) + elif block.type == "tool_result": + result_content = parse_tool_result_content(block.content if hasattr(block, 'content') else "") + text_parts.append(f"[Tool Result: {result_content}]") + elif isinstance(block, dict): + if block.get("type") == "text": + text_parts.append(block.get("text", "")) + elif block.get("type") == "tool_result": + result_content = parse_tool_result_content(block.get("content", "")) + text_parts.append(f"[Tool Result: {result_content}]") + if text_parts: + parts.append(f"{role.capitalize()}: {' '.join(text_parts)}") + + return "\n\n".join(parts) + + +def get_openai_web_search_model(model: str) -> str: + """Map model name to OpenAI model that supports web search. + + Supported models for Responses API: gpt-4o, gpt-4o-mini, gpt-4.1, gpt-4.1-mini + """ + openai_model = model + if model.startswith("openai/"): + openai_model = model[7:] # Remove "openai/" prefix + + lower_model = openai_model.lower() + + # gpt-4.1 series + if "gpt-4.1-mini" in lower_model: + return "gpt-4.1-mini" + elif "gpt-4.1" in lower_model: + return "gpt-4.1" + # gpt-4o series + elif "gpt-4o-mini" in lower_model: + return "gpt-4o-mini" + elif "gpt-4o" in lower_model: + return "gpt-4o" + # Mini models default to gpt-4.1-mini + elif "mini" in lower_model: + return "gpt-4.1-mini" + # Default to gpt-4.1 for web search + else: + return "gpt-4.1" + + +async def call_openai_responses_api_non_streaming( + model: str, + input_text: str, + api_key: str = None +) -> Dict[str, Any]: + """Call OpenAI's Responses API for web search (non-streaming).""" + openai_model = get_openai_web_search_model(model) + + # Use custom base URL if configured, otherwise direct OpenAI + if OPENAI_BASE_URL: + base_url = OPENAI_BASE_URL.rstrip("/") + url = f"{base_url}/responses" + else: + url = "https://api.openai.com/v1/responses" + + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + + payload = { + "model": openai_model, + "input": input_text, + "tools": [{"type": "web_search"}] + } + + logger.debug(f"Calling OpenAI Responses API with model={openai_model}, stream=False") + + async with httpx.AsyncClient(timeout=120.0) as client: + response = await client.post(url, headers=headers, json=payload) + if response.status_code != 200: + raise HTTPException(status_code=response.status_code, detail=f"OpenAI API error: {response.text}") + return response.json() + + +def convert_openai_responses_to_anthropic( + openai_response: Dict[str, Any], + original_request: Any +) -> Dict[str, Any]: + """Convert OpenAI Responses API response to Anthropic message format.""" + content = [] + + # Extract output from OpenAI response + output_items = openai_response.get("output", []) + + # Track if we've added web search results + search_results_added = False + + for item in output_items: + item_type = item.get("type", "") + + if item_type == "web_search_call": + # Add server_tool_use block for the search + tool_id = item.get("id", f"srvtoolu_{uuid.uuid4().hex[:24]}") + + # Extract search query if available + search_input = {} + if "action" in item: + action = item.get("action", {}) + if isinstance(action, dict) and action.get("type") == "search": + search_input["query"] = action.get("query", "") + + content.append({ + "type": "server_tool_use", + "id": tool_id, + "name": "web_search", + "input": search_input + }) + + # Add a placeholder web_search_tool_result + if not search_results_added: + content.append({ + "type": "web_search_tool_result", + "tool_use_id": tool_id, + "content": [{ + "type": "web_search_result", + "url": "https://search.openai.com", + "title": "Web Search Results", + "encrypted_content": "web_search_via_openai" + }] + }) + search_results_added = True + + elif item_type == "message" or item_type == "output_text": + # Handle text output with potential citations + text = item.get("text", "") or item.get("content", "") + if isinstance(text, list): + for text_block in text: + if isinstance(text_block, dict) and text_block.get("type") == "output_text": + text = text_block.get("text", "") + break + elif isinstance(text_block, str): + text = text_block + break + + if not text: + continue + + annotations = item.get("annotations", []) + + if annotations: + # Convert OpenAI annotations to Anthropic citations + citations = [] + for ann in annotations: + if ann.get("type") == "url_citation": + citations.append({ + "type": "web_search_result_location", + "url": ann.get("url", ""), + "title": ann.get("title", ""), + "encrypted_index": f"idx_{uuid.uuid4().hex[:16]}", + "cited_text": text[ann.get("start_index", 0):ann.get("end_index", len(text))][:150] + }) + + content.append({ + "type": "text", + "text": text, + "citations": citations if citations else None + }) + else: + content.append({ + "type": "text", + "text": text + }) + + # If no content was extracted, add a fallback + if not content: + if "output_text" in openai_response: + content.append({ + "type": "text", + "text": openai_response.get("output_text", "") + }) + else: + content.append({ + "type": "text", + "text": "Web search completed but no results were returned." + }) + + # Build the Anthropic response + response_id = openai_response.get("id", f"msg_{uuid.uuid4().hex[:24]}") + + # Get usage info + usage_info = openai_response.get("usage", {}) + input_tokens = usage_info.get("input_tokens", 0) or usage_info.get("prompt_tokens", 0) + output_tokens = usage_info.get("output_tokens", 0) or usage_info.get("completion_tokens", 0) + + return { + "id": response_id, + "type": "message", + "role": "assistant", + "model": original_request.model, + "content": content, + "stop_reason": "end_turn", + "stop_sequence": None, + "usage": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "server_tool_use": { + "web_search_requests": 1 + } + } + } + + +async def handle_web_search_streaming( + model: str, + input_text: str, + api_key: str, + original_request: Any +): + """Handle streaming responses from OpenAI Responses API and convert to Anthropic format.""" + message_id = f"msg_{uuid.uuid4().hex[:24]}" + + # Send message_start event + message_data = { + 'type': 'message_start', + 'message': { + 'id': message_id, + 'type': 'message', + 'role': 'assistant', + 'model': original_request.model, + 'content': [], + 'stop_reason': None, + 'stop_sequence': None, + 'usage': { + 'input_tokens': 0, + 'cache_creation_input_tokens': 0, + 'cache_read_input_tokens': 0, + 'output_tokens': 0, + 'server_tool_use': { + 'web_search_requests': 0 + } + } + } + } + yield f"event: message_start\ndata: {json.dumps(message_data)}\n\n" + + # Send content_block_start for text + yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" + + # Send ping + yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n" + + output_tokens = 0 + + openai_model = get_openai_web_search_model(model) + + # Use custom base URL if configured, otherwise direct OpenAI + if OPENAI_BASE_URL: + base_url = OPENAI_BASE_URL.rstrip("/") + url = f"{base_url}/responses" + else: + url = "https://api.openai.com/v1/responses" + + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + + payload = { + "model": openai_model, + "input": input_text, + "tools": [{"type": "web_search"}], + "stream": True + } + + logger.debug(f"Calling OpenAI Responses API with model={openai_model}, stream=True, url={url}") + + try: + async with httpx.AsyncClient(timeout=120.0) as client: + async with client.stream("POST", url, headers=headers, json=payload) as response: + if response.status_code != 200: + error_text = await response.aread() + error_msg = f"OpenAI API error: {error_text.decode()}" + logger.error(error_msg) + yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': error_msg}})}\n\n" + else: + async for line in response.aiter_lines(): + if not line or not line.startswith("data:"): + continue + + try: + data_str = line[5:].strip() + if data_str == "[DONE]": + break + + data = json.loads(data_str) + + # Handle different event types from OpenAI Responses API + event_type = data.get("type", "") + + if event_type == "response.output_text.delta": + delta_text = data.get("delta", "") + if delta_text: + yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': delta_text}})}\n\n" + + elif event_type == "response.done": + # Extract usage info + response_data = data.get("response", {}) + usage = response_data.get("usage", {}) + output_tokens = usage.get("output_tokens", 0) + + except json.JSONDecodeError: + continue + except Exception as e: + logger.error(f"Error processing streaming chunk: {e}") + continue + + except Exception as e: + logger.error(f"Error in web search streaming: {e}") + yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': f'Error during web search: {str(e)}'}})}\n\n" + + # Close text block + yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" + + # Send message_delta with stop reason and web_search usage + usage_data = { + 'output_tokens': output_tokens, + 'server_tool_use': { + 'web_search_requests': 1 + } + } + yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': usage_data})}\n\n" + + # Send message_stop + yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" + + # Send final [DONE] + yield "data: [DONE]\n\n" + + def convert_anthropic_to_litellm(anthropic_request: MessagesRequest) -> Dict[str, Any]: """Convert Anthropic API request format to LiteLLM format (which follows OpenAI).""" # LiteLLM already handles Anthropic models when using the format model="anthropic/claude-3-opus-20240229" @@ -1118,7 +1505,43 @@ async def create_message( clean_model = clean_model[len("openai/"):] logger.debug(f"📊 PROCESSING REQUEST: Model={request.model}, Stream={request.stream}") - + + # Check if this is a web search request + # Parse tools from raw body to check for web_search tool type + raw_tools = body_json.get("tools", []) + if has_web_search_tool(raw_tools): + # Route to OpenAI Responses API for web search + web_search_model = get_openai_web_search_model(request.model) + logger.info(f"🔍 WEB SEARCH REQUEST detected - routing to OpenAI Responses API with {web_search_model}") + + # Convert messages to input string for Responses API + input_text = convert_anthropic_messages_to_input(request.messages, request.system) + + if request.stream: + # Handle streaming web search + return StreamingResponse( + handle_web_search_streaming( + model=request.model, + input_text=input_text, + api_key=OPENAI_API_KEY, + original_request=request + ), + media_type="text/event-stream" + ) + else: + # Handle non-streaming web search + try: + openai_response = await call_openai_responses_api_non_streaming( + model=request.model, + input_text=input_text, + api_key=OPENAI_API_KEY + ) + anthropic_response = convert_openai_responses_to_anthropic(openai_response, request) + return JSONResponse(content=anthropic_response) + except Exception as e: + logger.error(f"Error in web search: {e}") + raise HTTPException(status_code=500, detail=f"Web search error: {str(e)}") + # Convert Anthropic request to LiteLLM format litellm_request = convert_anthropic_to_litellm(request)