diff --git a/examples/mcp-rl/example_using_http_mcp_server.py b/examples/mcp-rl/example_using_http_mcp_server.py new file mode 100644 index 00000000..a83a70ca --- /dev/null +++ b/examples/mcp-rl/example_using_http_mcp_server.py @@ -0,0 +1,354 @@ +generate_from_http_mcp.py +#!/usr/bin/env python3 +""" +Advanced scenario generator from streamable HTTP MCP server. + +Features: +- Connects to MCP server via HTTP +- Discovers tools and resources +- Generates scenarios with filtering options +- Saves to JSON with rich metadata +- Exports to multiple formats +""" + +import asyncio +import json +import sys +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + +from art.mcp import generate_scenarios, GeneratedScenarioCollection + + +class MCPScenarioGenerator: + """Generator for scenarios from MCP server.""" + + def __init__( + self, + mcp_server_url: str, + llm_model: str = "gpt-oss", # CHANGE THIS + llm_api_key: str = "sk-1234", # CHANGE THIS + llm_base_url: str = "http://vllm:8000/v1" # CHANGE THIS + ): + self.mcp_server_url = mcp_server_url + self.llm_model = llm_model + self.llm_api_key = llm_api_key + self.llm_base_url = llm_base_url + + self.tools: List[Dict] = [] + self.resources: List[Dict] = [] + self.scenarios: Optional[GeneratedScenarioCollection] = None + + async def discover_capabilities(self) -> bool: + """Discover MCP server capabilities.""" + print(f"šŸ” Connecting to MCP server at {self.mcp_server_url}...") + + try: + async with streamablehttp_client(self.mcp_server_url) as (read_stream, write_stream, _): + async with ClientSession(read_stream, write_stream) as session: + # Initialize + await session.initialize() + print("āœ“ Connected successfully") + + # Get tools + tools_response = await session.list_tools() + self.tools = [ + { + "name": tool.name, + "description": tool.description or "", + "parameters": tool.inputSchema if hasattr(tool, 'inputSchema') else {} + } + for tool in tools_response.tools + ] + + # Get resources + resources_response = await session.list_resources() + self.resources = [ + { + "uri": str(resource.uri), # Convert AnyUrl to string + "name": resource.name or str(resource.uri).split("/")[-1], + "description": resource.description or "", + "mimeType": getattr(resource, 'mimeType', None) or "application/octet-stream" + } + for resource in resources_response.resources + ] + + print(f"āœ“ Found {len(self.tools)} tools and {len(self.resources)} resources") + return True + + except Exception as e: + print(f"āŒ Failed to connect: {e}") + return False + + def show_capabilities(self): + """Display discovered capabilities.""" + print("\n" + "=" * 70) + print("šŸ“‹ Discovered Capabilities") + print("=" * 70) + + print(f"\nšŸ”§ Tools ({len(self.tools)}):") + for i, tool in enumerate(self.tools[:10], 1): + desc = tool['description'][:60] + "..." if len(tool['description']) > 60 else tool['description'] + print(f" {i:2d}. {tool['name']}") + if desc: + print(f" {desc}") + + if len(self.tools) > 10: + print(f" ... and {len(self.tools) - 10} more") + + print(f"\nšŸ“š Resources ({len(self.resources)}):") + for i, resource in enumerate(self.resources[:10], 1): + desc = resource['description'][:60] + "..." if len(resource['description']) > 60 else resource['description'] + print(f" {i:2d}. {resource['name']}") + if desc: + print(f" {desc}") + + if len(self.resources) > 10: + print(f" ... and {len(self.resources) - 10} more") + + async def generate( + self, + num_scenarios: int = 10, + difficulty_range: Optional[tuple] = None, + custom_instructions: Optional[str] = None + ) -> bool: + """Generate scenarios.""" + print("\n" + "=" * 70) + print("šŸŽÆ Generating Scenarios") + print("=" * 70) + print() + + if not self.tools and not self.resources: + print("āŒ No tools or resources available") + return False + + try: + instructions = custom_instructions or f""" + Generate realistic, diverse scenarios that: + 1. Effectively use the {len(self.tools)} available tools + 2. Reference the {len(self.resources)} available resources when relevant + 3. Cover different difficulty levels from simple to complex + 4. Represent real-world use cases + 5. Include specific details about what needs to be accomplished + """ + + self.scenarios = await generate_scenarios( + tools=self.tools, + resources=self.resources, + num_scenarios=num_scenarios, + show_preview=True, + custom_instructions=instructions, + generator_model=self.llm_model, + generator_api_key=self.llm_api_key, + generator_base_url=self.llm_base_url, + ) + + # Filter by difficulty if specified + if difficulty_range: + min_diff, max_diff = difficulty_range + self.scenarios = self.scenarios.filter_by_difficulty( + min_difficulty=min_diff, + max_difficulty=max_diff + ) + print(f"\nāœ“ Filtered to difficulty range {min_diff}-{max_diff}: {len(self.scenarios)} scenarios") + + return True + + except Exception as e: + print(f"āŒ Generation failed: {e}") + import traceback + traceback.print_exc() + return False + + def save(self, output_file: str = "scenarios.json", include_metadata: bool = True): + """Save scenarios to JSON file.""" + if not self.scenarios: + print("āŒ No scenarios to save") + return False + + print("\n" + "=" * 70) + print("šŸ’¾ Saving Scenarios") + print("=" * 70) + + output_path = Path(output_file) + + try: + if include_metadata: + # Include rich metadata + summary = self.scenarios.get_summary() + data = { + "metadata": { + "generated_at": datetime.now().isoformat(), + "mcp_server_url": self.mcp_server_url, + "llm_model": self.llm_model, + "num_tools": len(self.tools), + "num_resources": len(self.resources), + "tool_names": [t['name'] for t in self.tools], + "resource_names": [r['name'] for r in self.resources], + "summary": summary + }, + "scenarios": [ + { + "task": scenario.task, + "difficulty": scenario.difficulty + } + for scenario in self.scenarios + ] + } + else: + # Just scenarios + data = [ + { + "task": scenario.task, + "difficulty": scenario.difficulty + } + for scenario in self.scenarios + ] + + with open(output_path, 'w') as f: + json.dump(data, f, indent=2) + + print(f"āœ“ Saved to: {output_path}") + print(f"āœ“ File size: {output_path.stat().st_size:,} bytes") + return True + + except Exception as e: + print(f"āŒ Failed to save: {e}") + return False + + def show_summary(self): + """Display summary statistics.""" + if not self.scenarios: + return + + print("\n" + "=" * 70) + print("šŸ“Š Summary") + print("=" * 70) + + summary = self.scenarios.get_summary() + + print(f"\nšŸ“ˆ Statistics:") + print(f" Total scenarios: {summary['total_scenarios']}") + print(f" Average difficulty: {summary['avg_difficulty']:.1f}/5") + print(f" Average task length: {summary['avg_task_length']:.0f} characters") + + print(f"\nšŸ“Š Difficulty Distribution:") + max_count = max(summary['difficulty_distribution'].values()) + for difficulty in range(1, 6): + count = summary['difficulty_distribution'].get(difficulty, 0) + percentage = (count / summary['total_scenarios'] * 100) if summary['total_scenarios'] > 0 else 0 + bar = "ā–ˆ" * int(count / max_count * 30) if max_count > 0 else "" + print(f" {difficulty}/5: {count:3d} ({percentage:5.1f}%) {bar}") + + +async def main(): + """Main entry point.""" + import argparse + + parser = argparse.ArgumentParser( + description="Generate scenarios from MCP server", + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument( + "--server", + default="http://mcp.server:8000/mcp", # CHANGE THIS + help="MCP server URL (default: http://mcp.server:8000/mcp)" # CHANGE THIS + ) + parser.add_argument( + "--num", + type=int, + default=10, + help="Number of scenarios to generate (default: 10)" + ) + parser.add_argument( + "--output", + default="scenarios.json", + help="Output file path (default: scenarios.json)" + ) + parser.add_argument( + "--min-difficulty", + type=int, + choices=range(1, 6), + help="Minimum difficulty (1-5)" + ) + parser.add_argument( + "--max-difficulty", + type=int, + choices=range(1, 6), + help="Maximum difficulty (1-5)" + ) + parser.add_argument( + "--llm-model", + default="gpt-oss", + help="LLM model name (default: gpt-oss)" + ) + parser.add_argument( + "--llm-base-url", + default="http://vllm:8000/v1", + help="LLM API base URL (default: http://vllm:8000/v1)" + ) + + args = parser.parse_args() + + # Validate difficulty range + difficulty_range = None + if args.min_difficulty or args.max_difficulty: + min_d = args.min_difficulty or 1 + max_d = args.max_difficulty or 5 + if min_d > max_d: + print("āŒ Error: min-difficulty must be <= max-difficulty") + return 1 + difficulty_range = (min_d, max_d) + + print("=" * 70) + print("šŸš€ MCP Scenario Generator") + print("=" * 70) + print(f"\nšŸ“ Server: {args.server}") + print(f"šŸ¤– LLM: {args.llm_model}") + print(f"šŸŽÆ Scenarios: {args.num}") + if difficulty_range: + print(f"⚔ Difficulty: {difficulty_range[0]}-{difficulty_range[1]}") + print() + + # Create generator + generator = MCPScenarioGenerator( + mcp_server_url=args.server, + llm_model=args.llm_model, + llm_base_url=args.llm_base_url + ) + + # Step 1: Discover capabilities + if not await generator.discover_capabilities(): + return 1 + + generator.show_capabilities() + + # Step 2: Generate scenarios + if not await generator.generate( + num_scenarios=args.num, + difficulty_range=difficulty_range + ): + return 1 + + # Step 3: Save results + if not generator.save(output_file=args.output): + return 1 + + # Step 4: Show summary + generator.show_summary() + + print("\nāœ… Complete!") + return 0 + + +if __name__ == "__main__": + try: + exit_code = asyncio.run(main()) + sys.exit(exit_code) + except KeyboardInterrupt: + print("\n\nāš ļø Interrupted by user") + sys.exit(1) diff --git a/examples/mcp-rl/test_scenario_generation.py b/examples/mcp-rl/test_scenario_generation.py index 40b826ce..9cc1c9a9 100644 --- a/examples/mcp-rl/test_scenario_generation.py +++ b/examples/mcp-rl/test_scenario_generation.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """Test scenario generation functionality.""" import asyncio @@ -117,7 +116,9 @@ async def test_basic_scenario_generation(): tools=tools, num_scenarios=5, show_preview=True, - generator_model="openai/gpt-4o-mini", # Use a cheaper model for testing + generator_model="gpt-oss", # Use a cheaper model for testing + generator_api_key="sk-1234", + generator_base_url="http://vllm:8000/v1" ) print(f"[PASS] Generated {len(scenarios)} scenarios successfully") @@ -161,7 +162,9 @@ async def test_scenario_generation_with_resources(): num_scenarios=3, show_preview=True, custom_instructions="Focus on file management and code analysis tasks.", - generator_model="openai/gpt-4o-mini", + generator_model="gpt-oss", # Use a cheaper model for testing + generator_api_key="sk-1234", + generator_base_url="http://vllm:8000/v1" ) print(f"[PASS] Generated {len(scenarios)} scenarios with resources") @@ -225,7 +228,9 @@ async def test_dict_input_compatibility(): resources=resources_dict, num_scenarios=3, show_preview=False, # Don't show preview to keep output clean - generator_model="openai/gpt-4o-mini", + generator_model="gpt-oss", + generator_base_url="http://vllm:8000/v1", + generator_api_key="sk-1234" ) print(f"[PASS] Dictionary input test passed: {len(scenarios)} scenarios") @@ -246,7 +251,10 @@ async def test_error_handling(): tools=[], num_scenarios=1, show_preview=False, - generator_model="openai/gpt-4o-mini", + generator_model="gpt-oss", + generator_base_url="http://vllm:8000/v1", + generator_api_key="sk-1234" + ) print("[FAIL] Should have failed with empty tools list") return False @@ -261,8 +269,9 @@ async def test_error_handling(): tools=tools, num_scenarios=1, show_preview=False, - generator_model="openai/gpt-4o-mini", - generator_api_key="invalid_key", + generator_model="gpt-oss", + generator_base_url="http://vllm:8000/v1", + generator_api_key="sk-1234" ) print("[FAIL] Should have failed with invalid API key") return False diff --git a/src/art/mcp/format_server.py b/src/art/mcp/format_server.py new file mode 100644 index 00000000..4450394e --- /dev/null +++ b/src/art/mcp/format_server.py @@ -0,0 +1,106 @@ +"""Simple MCP server for formatting scenario data into JSON via stdio.""" + +import json +import sys +from typing import Any, Dict + + +def send_response(id: Any, result: Dict[str, Any]) -> None: + """Send JSON-RPC response.""" + response = {"jsonrpc": "2.0", "id": id, "result": result} + print(json.dumps(response), flush=True) + + +def send_error(id: Any, code: int, message: str) -> None: + """Send JSON-RPC error.""" + response = {"jsonrpc": "2.0", "id": id, "error": {"code": code, "message": message}} + print(json.dumps(response), flush=True) + + +def handle_request(request: Dict[str, Any]) -> None: + """Handle MCP protocol request.""" + method = request.get("method") + params = request.get("params", {}) + req_id = request.get("id") + + if method == "initialize": + send_response( + req_id, + { + "protocolVersion": "2024-11-05", + "serverInfo": {"name": "scenario-formatter", "version": "1.0.0"}, + "capabilities": {"tools": {}}, + }, + ) + + elif method == "notifications/initialized": + # Client acknowledges initialization - no response needed + pass + + elif method == "tools/list": + send_response( + req_id, + { + "tools": [ + { + "name": "format_scenario", + "description": "Format a scenario into proper JSON structure", + "inputSchema": { + "type": "object", + "properties": { + "task": {"type": "string", "description": "The task description"}, + "difficulty": { + "type": "integer", + "description": "Difficulty rating from 1-5", + }, + }, + "required": ["task", "difficulty"], + }, + } + ] + }, + ) + + elif method == "tools/call": + tool_name = params.get("name") + args = params.get("arguments", {}) + + if tool_name == "format_scenario": + # Format and validate the scenario + formatted = { + "task": str(args.get("task", "")).strip(), + "difficulty": max(1, min(5, int(args.get("difficulty", 3)))), + } + + send_response( + req_id, + {"content": [{"type": "text", "text": json.dumps(formatted, indent=2)}]}, + ) + else: + send_error(req_id, -32601, f"Unknown tool: {tool_name}") + + elif method and method.startswith("notifications/"): + # Handle other notifications silently + pass + + else: + if req_id: # Only send error if there's an ID to respond to + send_error(req_id, -32601, f"Unknown method: {method}") + + +def main(): + """Main server loop.""" + buffer = "" + for line in sys.stdin: + buffer += line + try: + request = json.loads(buffer) + buffer = "" + handle_request(request) + except json.JSONDecodeError: + # Not complete JSON yet, keep buffering + continue + + +if __name__ == "__main__": + main() diff --git a/src/art/mcp/generate_scenarios.py b/src/art/mcp/generate_scenarios.py index df92ea3c..f764788a 100644 --- a/src/art/mcp/generate_scenarios.py +++ b/src/art/mcp/generate_scenarios.py @@ -1,29 +1,19 @@ -"""Scenario generation for MCP tools.""" +"""Scenario generation for MCP tools using local MCP server for JSON formatting.""" +import asyncio import json +import os import time from typing import Any, Dict, List, Optional import openai +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client from art.mcp.types import GeneratedScenarioCollection, MCPResource, MCPTool from art.utils.logging import _C, dim, err, info, ok, step -def preview_scenarios(scenarios: List[Dict[str, Any]], n: int = 5): - """Preview generated scenarios.""" - n = min(n, len(scenarios)) - for i in range(n): - s = scenarios[i] - task_preview = s["task"][:120].strip() - ellipsis = "&" if len(s["task"]) > 120 else "" - difficulty = s.get("difficulty", "N/A") - dim( - f" {i + 1}. {task_preview}{ellipsis} " - f"{_C.GRAY}(difficulty {difficulty}/5){_C.RESET}" - ) - - async def generate_scenarios( tools: List[MCPTool] | List[Dict[str, Any]], resources: List[MCPResource] | List[Dict[str, Any]] = [], @@ -33,9 +23,11 @@ async def generate_scenarios( generator_model: str = "openai/gpt-4.1-mini", generator_api_key: Optional[str] = None, generator_base_url: str = "https://openrouter.ai/api/v1", + mcp_server_command: str = "python", + mcp_server_args: Optional[List[str]] = None, ) -> GeneratedScenarioCollection: """ - Generate scenarios for MCP tools. + Generate scenarios for MCP tools using an MCP server for JSON formatting. Args: tools: List of Tool objects or list of tool dictionaries @@ -44,13 +36,16 @@ async def generate_scenarios( show_preview: Whether to show a preview of generated scenarios (default: True) custom_instructions: Optional custom instructions for scenario generation generator_model: Model to use for generation (default: "openai/gpt-4.1-mini") - generator_api_key: API key for the generator model. If None, will use OPENROUTER_API_KEY env var + generator_api_key: API key for the generator model generator_base_url: Base URL for the API (default: OpenRouter) + mcp_server_command: Command to start MCP server (default: "python") + mcp_server_args: Args for MCP server (default: None, will use bundled format_server) Returns: GeneratedScenarioCollection containing the generated scenarios """ - import os + if mcp_server_args is None: + mcp_server_args = ["format_server.py"] # Will be replaced with bundled version t0 = time.perf_counter() @@ -62,58 +57,54 @@ async def generate_scenarios( "generator_api_key is required or OPENROUTER_API_KEY env var must be set" ) - # Validate that we have at least tools or resources + # Validate inputs if not tools and not resources: raise ValueError("At least one tool or resource must be provided") ok(f"Using model: {generator_model}") # Convert tools to dictionaries - if isinstance(tools, list) and tools and isinstance(tools[0], MCPTool): - tools_info = [tool.to_dict() for tool in tools] # type: ignore + if tools and hasattr(tools[0], 'to_dict'): + tools_info = [tool.to_dict() for tool in tools] else: - # Assume it's already a list of dictionaries tools_info = [ { - "name": tool.get("name", "") - if isinstance(tool, dict) - else getattr(tool, "name", ""), - "description": tool.get("description", "") - if isinstance(tool, dict) - else getattr(tool, "description", ""), - "parameters": tool.get("parameters", {}) - if isinstance(tool, dict) - else getattr(tool, "parameters", {}), + "name": tool.get("name", "") if isinstance(tool, dict) else getattr(tool, "name", ""), + "description": tool.get("description", "") if isinstance(tool, dict) else getattr(tool, "description", ""), + "parameters": tool.get("parameters", {}) if isinstance(tool, dict) else getattr(tool, "parameters", {}), } for tool in tools ] # Convert resources to dictionaries - if resources is None: - resources_info = [] - elif ( - isinstance(resources, list) - and resources - and isinstance(resources[0], MCPResource) - ): - resources_info = [resource.to_dict() for resource in resources] # type: ignore + if resources and hasattr(resources[0], 'to_dict'): + resources_info = [resource.to_dict() for resource in resources] else: - # Assume it's already a list of dictionaries resources_info = resources or [] + + # Ensure all values are JSON-serializable (convert AnyUrl, etc.) + def make_serializable(obj): + """Convert objects to JSON-serializable types.""" + if isinstance(obj, dict): + return {k: make_serializable(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [make_serializable(item) for item in obj] + elif hasattr(obj, '__str__') and not isinstance(obj, (str, int, float, bool, type(None))): + return str(obj) + return obj + + resources_info = [make_serializable(r) for r in resources_info] info(f"Available: {len(tools_info)} tool(s), {len(resources_info)} resource(s).") - step("Preparing prompt & JSON schema &") + step("Preparing prompt for scenario generation") tools_description = json.dumps(tools_info, indent=2) resources_description = ( - json.dumps(resources_info, indent=2) - if resources_info - else "No resources available" + json.dumps(resources_info, indent=2) if resources_info else "No resources available" ) - prompt = f"""You are an expert at creating realistic scenarios for testing AI agents that interact with MCP (Model Context Protocol) servers. - -Given the following available tools and resources from an MCP server, generate {num_scenarios} diverse, realistic scenarios that a user might want to accomplish using these tools. + # Simple prompt that asks for plain text output + prompt = f"""Generate {num_scenarios} diverse, realistic scenarios for testing AI agents with these MCP tools and resources. AVAILABLE TOOLS: {tools_description} @@ -121,96 +112,106 @@ async def generate_scenarios( AVAILABLE RESOURCES: {resources_description} -Requirements for scenarios: -1. Each scenario should be a task that can be accomplished using the available tools -2. Scenarios should vary in complexity - some simple (1-2 tool calls), some complex (multiple tool calls) -3. Scenarios should cover different use cases and tool combinations (though the task should not specify which tools to use) -4. Each scenario should be realistic - something a real user might actually want to do -5. Assign a difficulty rating from 1 (easy, single tool call) to 5 (hard, complex multi-step analysis) -6. The task should always include generating a summary of the work done and a thorough analysis and report of the results - -You must respond with a JSON object containing a "scenarios" array of exactly {num_scenarios} objects. Each object must have: -- "task": string describing the scenario -- "difficulty": integer from 1-5 representing complexity -""" - - if custom_instructions: - prompt += f"\n\nPay close attention to the following instructions when generating scenarios:\n\n{custom_instructions}" - - response_schema = { - "type": "object", - "properties": { - "scenarios": { - "type": "array", - "items": { - "type": "object", - "properties": { - "task": {"type": "string"}, - "difficulty": {"type": "integer", "minimum": 1, "maximum": 5}, - }, - "required": ["task", "difficulty"], - "additionalProperties": False, - }, - "minItems": num_scenarios, - "maxItems": num_scenarios, - } - }, - "required": ["scenarios"], - "additionalProperties": False, - } - - step(f"Calling model: {_C.BOLD}{generator_model}{_C.RESET} &") - client_openai = openai.OpenAI( - api_key=generator_api_key, - base_url=generator_base_url, - ) +Requirements: +1. Each scenario should use the available tools +2. Vary complexity from simple (1-2 tool calls) to complex (multiple tool calls) +3. Cover different use cases and tool combinations +4. Make scenarios realistic - what real users would actually want to do +5. Rate difficulty from 1 (easy, single tool) to 5 (hard, complex multi-step) +6. Tasks should include generating summaries and thorough analysis/reports + +{f"CUSTOM INSTRUCTIONS: {custom_instructions}" if custom_instructions else ""} + +For each scenario, provide: +- A task description (what the user wants to accomplish) +- A difficulty rating (1-5) + +Format each scenario as: +SCENARIO N: +Task: [description] +Difficulty: [1-5] + +Generate exactly {num_scenarios} scenarios.""" + + step(f"Calling model: {_C.BOLD}{generator_model}{_C.RESET}") + client = openai.OpenAI(api_key=generator_api_key, base_url=generator_base_url) t1 = time.perf_counter() - response = client_openai.chat.completions.create( + response = client.chat.completions.create( model=generator_model, messages=[{"role": "user", "content": prompt}], max_completion_tokens=8000, - response_format={ - "type": "json_schema", - "json_schema": {"name": "scenario_list", "schema": response_schema}, - }, ) dt = time.perf_counter() - t1 ok(f"Model responded in {dt:.2f}s.") content = response.choices[0].message.content - if content is None: - err("Model response content is None.") + if not content: raise ValueError("Model response content is None") + info(f"Raw content length: {len(content)} chars.") - # Parse JSON - try: - result = json.loads(content) - except Exception as e: - err("Failed to parse JSON from model response.") - dim(f" Exception: {e}") - dim(" First 500 chars of response content:") - dim(content[:500] if content else "No content") - raise - - # Extract scenarios - if "scenarios" in result: - scenarios = result["scenarios"] + # Parse plain text response + step("Parsing model output") + scenarios_raw = _parse_plain_text_scenarios(content) + + if len(scenarios_raw) != num_scenarios: + dim(f" Warning: Expected {num_scenarios} scenarios, got {len(scenarios_raw)}.") + + # Use MCP server to format into proper JSON + step("Connecting to MCP server for JSON formatting") + + # If no custom command provided, use the bundled format_server + if mcp_server_command == "python" and mcp_server_args == ["format_server.py"]: + import art.mcp.format_server + server_script = art.mcp.format_server.__file__ + server_params = StdioServerParameters( + command=mcp_server_command, + args=[server_script], + ) else: - scenarios = result if isinstance(result, list) else list(result.values())[0] - - # Validate count - if len(scenarios) != num_scenarios: - err(f"Expected {num_scenarios} scenarios, got {len(scenarios)}.") - raise ValueError(f"Expected {num_scenarios} scenarios, got {len(scenarios)}") - - ok(f"Parsed {len(scenarios)} scenario(s) successfully.") - - # Convert to ScenarioCollection - scenario_collection = GeneratedScenarioCollection.from_dicts(scenarios) - - # Show difficulty distribution and preview using the collection methods + server_params = StdioServerParameters( + command=mcp_server_command, + args=mcp_server_args, + ) + + formatted_scenarios = [] + + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + # Initialize the connection + await session.initialize() + + # Get available tools + tools_response = await session.list_tools() + + if not tools_response.tools: + raise ValueError("MCP server has no tools available") + + format_tool = tools_response.tools[0] # Use first tool + ok(f"Using MCP tool: {format_tool.name}") + + # Format each scenario through MCP + for i, scenario in enumerate(scenarios_raw): + result = await session.call_tool( + format_tool.name, + arguments={ + "task": scenario["task"], + "difficulty": scenario["difficulty"], + } + ) + + # Extract text content + if result.content and hasattr(result.content[0], 'text'): + formatted_scenarios.append(json.loads(result.content[0].text)) + + if (i + 1) % 5 == 0: + info(f"Formatted {i + 1}/{len(scenarios_raw)} scenarios") + + ok(f"Formatted {len(formatted_scenarios)} scenarios via MCP server.") + + # Create collection + scenario_collection = GeneratedScenarioCollection.from_dicts(formatted_scenarios) scenario_collection.print_difficulty_distribution() if show_preview: @@ -220,3 +221,33 @@ async def generate_scenarios( ok(f"Generated {len(scenario_collection)} scenarios in {total_time:.2f}s total.") return scenario_collection + + +def _parse_plain_text_scenarios(content: str) -> List[Dict[str, Any]]: + """Parse plain text scenarios from model output.""" + scenarios = [] + lines = content.strip().split("\n") + + current_scenario = {} + for line in lines: + line = line.strip() + + if line.startswith("Task:") or line.startswith("task:"): + current_scenario["task"] = line.split(":", 1)[1].strip() + elif line.startswith("Difficulty:") or line.startswith("difficulty:"): + try: + diff = int(line.split(":", 1)[1].strip().split()[0]) + current_scenario["difficulty"] = max(1, min(5, diff)) + except (ValueError, IndexError): + current_scenario["difficulty"] = 3 + + # Scenario complete + if current_scenario.get("task"): + scenarios.append(current_scenario) + current_scenario = {} + + # Handle last scenario if needed + if current_scenario.get("task") and current_scenario.get("difficulty"): + scenarios.append(current_scenario) + + return scenarios