diff --git a/src/mcp_server/core/live_streaming/live_streaming.py b/src/mcp_server/core/live_streaming/live_streaming.py index ce37858..d5b12ef 100644 --- a/src/mcp_server/core/live_streaming/live_streaming.py +++ b/src/mcp_server/core/live_streaming/live_streaming.py @@ -336,16 +336,18 @@ def get_play_urls(self, play_domain: str, bucket: str, stream_name: str) -> Dict "message": "Playback URLs generated successfully" } - async def query_live_traffic_stats(self, begin: str, end: str) -> Dict[str, Any]: + async def query_live_traffic_stats(self, begin: str, end: str, include_raw_data: bool = False) -> Dict[str, Any]: """ Query live streaming traffic statistics Args: begin: Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000) end: End time in format YYYYMMDDHHMMSS (e.g., 20240129105148) + include_raw_data: If True, includes raw JSON data for download (default: False) Returns: - Dict containing traffic statistics + Dict containing traffic statistics with total traffic (bytes), average bandwidth (bps), + peak bandwidth (bps), and optionally raw data """ if not self.live_endpoint: self.live_endpoint = "mls.cn-east-1.qiniumiku.com" @@ -375,14 +377,99 @@ async def query_live_traffic_stats(self, begin: str, end: str) -> Dict[str, Any] if status == 200: logger.info("Successfully queried live traffic stats") - return { - "status": "success", - "begin": begin, - "end": end, - "data": text, - "message": "Traffic statistics retrieved successfully", - "status_code": status - } + + try: + # Parse JSON response + data = json.loads(text) + + # Calculate total traffic and bandwidth metrics + total_traffic_bytes = 0 + bandwidth_values = [] + data_points = [] + + # Data format: [{"time":"2025-11-26T00:00:00+08:00","values":{"flow":0}}, ...] + for item in data: + if isinstance(item, dict) and "values" in item and "flow" in item["values"]: + flow_bytes = item["values"]["flow"] + total_traffic_bytes += flow_bytes + + # Convert to bandwidth: flow is accumulated over 5 minutes (300 seconds) + # Bandwidth (bps) = bytes / 300 seconds * 8 bits/byte + bandwidth_bps = (flow_bytes / 300) * 8 + bandwidth_values.append(bandwidth_bps) + + # Store data point with timestamp + data_points.append({ + "time": item.get("time", ""), + "traffic_bytes": flow_bytes, + "bandwidth_bps": bandwidth_bps + }) + + # Calculate average and peak bandwidth + avg_bandwidth_bps = sum(bandwidth_values) / len(bandwidth_values) if bandwidth_values else 0 + peak_bandwidth_bps = max(bandwidth_values) if bandwidth_values else 0 + + # Convert to human-readable units + def format_bytes(bytes_val): + """Convert bytes to human-readable format""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes_val < 1024.0: + return f"{bytes_val:.2f} {unit}" + bytes_val /= 1024.0 + return f"{bytes_val:.2f} PB" + + def format_bandwidth(bps): + """Convert bits per second to human-readable format""" + for unit in ['bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps']: + if bps < 1000.0: + return f"{bps:.2f} {unit}" + bps /= 1000.0 + return f"{bps:.2f} Pbps" + + result = { + "status": "success", + "begin": begin, + "end": end, + "summary": { + "total_traffic_bytes": total_traffic_bytes, + "total_traffic_formatted": format_bytes(total_traffic_bytes), + "data_points_count": len(data_points), + "average_bandwidth_bps": avg_bandwidth_bps, + "average_bandwidth_formatted": format_bandwidth(avg_bandwidth_bps), + "peak_bandwidth_bps": peak_bandwidth_bps, + "peak_bandwidth_formatted": format_bandwidth(peak_bandwidth_bps), + "granularity": "5 minutes" + }, + "message": "Traffic statistics calculated successfully", + "status_code": status + } + + # Include raw data only if requested + if include_raw_data: + result["raw_data"] = data + result["data_points"] = data_points + + return result + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON response: {e}") + return { + "status": "error", + "begin": begin, + "end": end, + "message": f"Failed to parse traffic stats response: {str(e)}", + "raw_response": text, + "status_code": status + } + except Exception as e: + logger.error(f"Error processing traffic stats: {e}") + return { + "status": "error", + "begin": begin, + "end": end, + "message": f"Error processing traffic stats: {str(e)}", + "status_code": status + } else: logger.error(f"Failed to query traffic stats, status: {status}, response: {text}") return { diff --git a/src/mcp_server/core/live_streaming/tools.py b/src/mcp_server/core/live_streaming/tools.py index 4819981..f86eee5 100644 --- a/src/mcp_server/core/live_streaming/tools.py +++ b/src/mcp_server/core/live_streaming/tools.py @@ -177,7 +177,7 @@ async def get_play_urls(self, **kwargs) -> list[types.TextContent]: @tools.tool_meta( types.Tool( name="live_streaming_query_live_traffic_stats", - description="Query live streaming traffic statistics for a time range. Returns bandwidth and traffic usage data.", + description="Query live streaming traffic statistics for a time range. Returns total traffic (bytes), average bandwidth (bps), peak bandwidth (bps), and optionally raw data for download.", inputSchema={ "type": "object", "properties": { @@ -189,6 +189,11 @@ async def get_play_urls(self, **kwargs) -> list[types.TextContent]: "type": "string", "description": "End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)", }, + "include_raw_data": { + "type": "boolean", + "description": "If true, includes raw JSON data and detailed data points for download. Default is false.", + "default": False, + }, }, "required": ["begin", "end"], },