Skip to content

Commit 2326c08

Browse files
authored
Merge pull request #44 from qiniu/xgopilot/claude/issue-43-1763699096
feat(live_streaming): add dual authentication and new list APIs
2 parents 6b79ccb + 213b262 commit 2326c08

File tree

2 files changed

+159
-3
lines changed

2 files changed

+159
-3
lines changed

src/mcp_server/core/live_streaming/live_streaming.py

Lines changed: 122 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import aiohttp
22
import logging
3+
import qiniu
34

45
from typing import Dict, Any
56
from ...config import config
@@ -13,11 +14,38 @@ def __init__(self, cfg: config.Config = None):
1314
self.config = cfg
1415
self.api_key = cfg.api_key if cfg else None
1516
self.endpoint_url = cfg.endpoint_url if cfg else None
17+
self.access_key = cfg.access_key if cfg else None
18+
self.secret_key = cfg.secret_key if cfg else None
19+
20+
# Initialize qiniu.Auth if access_key and secret_key are available
21+
self.auth = None
22+
if self.access_key and self.secret_key and \
23+
self.access_key != "YOUR_QINIU_ACCESS_KEY" and \
24+
self.secret_key != "YOUR_QINIU_SECRET_KEY":
25+
self.auth = qiniu.Auth(self.access_key, self.secret_key)
1626

1727
def _get_auth_header(self) -> Dict[str, str]:
18-
"""Generate Bearer token authorization header"""
19-
if not self.api_key:
20-
raise ValueError("QINIU_API_KEY is not configured")
28+
"""
29+
Generate authorization header
30+
Priority: QINIU_ACCESS_KEY/QINIU_SECRET_KEY > API KEY
31+
"""
32+
# Priority 1: Use QINIU_ACCESS_KEY/QINIU_SECRET_KEY if configured
33+
if self.auth:
34+
# Generate Qiniu token for the request
35+
# For live streaming API, we use a simple token format
36+
token = self.auth.token_of_request(
37+
url="",
38+
body=None,
39+
content_type="application/x-www-form-urlencoded"
40+
)
41+
return {
42+
"Authorization": f"Qiniu {token}"
43+
}
44+
45+
# Priority 2: Fall back to API KEY if ACCESS_KEY/SECRET_KEY not configured
46+
if not self.api_key or self.api_key == "YOUR_QINIU_API_KEY":
47+
raise ValueError("Neither QINIU_ACCESS_KEY/QINIU_SECRET_KEY nor QINIU_API_KEY is configured")
48+
2149
return {
2250
"Authorization": f"Bearer {self.api_key}"
2351
}
@@ -346,3 +374,94 @@ async def query_live_traffic_stats(self, begin: str, end: str) -> Dict[str, Any]
346374
"message": f"Failed to query traffic stats: {text}",
347375
"status_code": status
348376
}
377+
378+
async def list_buckets(self) -> Dict[str, Any]:
379+
"""
380+
List all live streaming spaces/buckets
381+
382+
Returns:
383+
Dict containing the list of buckets
384+
"""
385+
if not self.endpoint_url:
386+
raise ValueError("QINIU_ENDPOINT_URL is not configured")
387+
388+
# Remove protocol to get base endpoint
389+
endpoint = self.endpoint_url
390+
if endpoint.startswith("http://"):
391+
endpoint = endpoint[7:]
392+
elif endpoint.startswith("https://"):
393+
endpoint = endpoint[8:]
394+
395+
url = f"https://{endpoint}/"
396+
headers = self._get_auth_header()
397+
398+
logger.info(f"Listing all live streaming buckets from {url}")
399+
400+
async with aiohttp.ClientSession() as session:
401+
async with session.get(url, headers=headers) as response:
402+
status = response.status
403+
text = await response.text()
404+
405+
if status == 200:
406+
logger.info("Successfully listed all buckets")
407+
return {
408+
"status": "success",
409+
"data": text,
410+
"message": "Buckets listed successfully",
411+
"status_code": status
412+
}
413+
else:
414+
logger.error(f"Failed to list buckets, status: {status}, response: {text}")
415+
return {
416+
"status": "error",
417+
"message": f"Failed to list buckets: {text}",
418+
"status_code": status
419+
}
420+
421+
async def list_streams(self, bucket_id: str) -> Dict[str, Any]:
422+
"""
423+
List all streams in a specific live streaming bucket
424+
425+
Args:
426+
bucket_id: The bucket ID/name
427+
428+
Returns:
429+
Dict containing the list of streams in the bucket
430+
"""
431+
if not self.endpoint_url:
432+
raise ValueError("QINIU_ENDPOINT_URL is not configured")
433+
434+
# Remove protocol to get base endpoint
435+
endpoint = self.endpoint_url
436+
if endpoint.startswith("http://"):
437+
endpoint = endpoint[7:]
438+
elif endpoint.startswith("https://"):
439+
endpoint = endpoint[8:]
440+
441+
url = f"https://{endpoint}/?streamlist&bucketId={bucket_id}"
442+
headers = self._get_auth_header()
443+
444+
logger.info(f"Listing streams in bucket: {bucket_id}")
445+
446+
async with aiohttp.ClientSession() as session:
447+
async with session.get(url, headers=headers) as response:
448+
status = response.status
449+
text = await response.text()
450+
451+
if status == 200:
452+
logger.info(f"Successfully listed streams in bucket: {bucket_id}")
453+
return {
454+
"status": "success",
455+
"bucket_id": bucket_id,
456+
"data": text,
457+
"message": f"Streams in bucket '{bucket_id}' listed successfully",
458+
"status_code": status
459+
}
460+
else:
461+
logger.error(f"Failed to list streams in bucket: {bucket_id}, status: {status}, response: {text}")
462+
return {
463+
"status": "error",
464+
"bucket_id": bucket_id,
465+
"message": f"Failed to list streams: {text}",
466+
"status_code": status
467+
}

src/mcp_server/core/live_streaming/tools.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,41 @@ async def query_live_traffic_stats(self, **kwargs) -> list[types.TextContent]:
198198
result = await self.live_streaming.query_live_traffic_stats(**kwargs)
199199
return [types.TextContent(type="text", text=str(result))]
200200

201+
@tools.tool_meta(
202+
types.Tool(
203+
name="live_streaming_list_buckets",
204+
description="List all live streaming spaces/buckets. Returns information about all available live streaming buckets.",
205+
inputSchema={
206+
"type": "object",
207+
"properties": {},
208+
"required": [],
209+
},
210+
)
211+
)
212+
async def list_buckets(self, **kwargs) -> list[types.TextContent]:
213+
result = await self.live_streaming.list_buckets(**kwargs)
214+
return [types.TextContent(type="text", text=str(result))]
215+
216+
@tools.tool_meta(
217+
types.Tool(
218+
name="live_streaming_list_streams",
219+
description="List all streams in a specific live streaming bucket. Returns the list of streams for the given bucket ID.",
220+
inputSchema={
221+
"type": "object",
222+
"properties": {
223+
"bucket_id": {
224+
"type": "string",
225+
"description": "The bucket ID/name to list streams from",
226+
},
227+
},
228+
"required": ["bucket_id"],
229+
},
230+
)
231+
)
232+
async def list_streams(self, **kwargs) -> list[types.TextContent]:
233+
result = await self.live_streaming.list_streams(**kwargs)
234+
return [types.TextContent(type="text", text=str(result))]
235+
201236

202237
def register_tools(live_streaming: LiveStreamingService):
203238
tool_impl = _ToolImpl(live_streaming)
@@ -210,5 +245,7 @@ def register_tools(live_streaming: LiveStreamingService):
210245
tool_impl.get_push_urls,
211246
tool_impl.get_play_urls,
212247
tool_impl.query_live_traffic_stats,
248+
tool_impl.list_buckets,
249+
tool_impl.list_streams,
213250
]
214251
)

0 commit comments

Comments
 (0)