Skip to content

偶尔会有收不到回调消息 #65

@wilac-pv

Description

@wilac-pv

[Bug Report] Stream SDK 偶现消息丢失问题

问题描述

使用钉钉 Stream SDK (dingtalk-stream-sdk-python) 时,偶尔会出现收不到回调消息的情况。经过代码审查,发现 SDK 实现中存在多个可能导致消息丢失的潜在问题。

环境信息

  • SDK 版本: dingtalk-stream-sdk-python
  • Python 版本: 3.9
  • 操作系统: 5.15.0-78-generic #85-Ubuntu SMP Fri Jul 7 15:25:09 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

问题分析

1. 异步任务未被正确管理和追踪

位置: client.py 第 67-68 行

async for raw_message in websocket:
    json_message = json.loads(raw_message)
    asyncio.create_task(self.background_task(json_message))

问题:

  • asyncio.create_task() 创建的任务没有被保存引用
  • 任务执行过程中的异常可能被静默忽略
  • 在高并发场景下,大量任务同时运行可能导致资源耗尽
  • 无法追踪任务执行状态,难以排查消息处理失败的原因

影响: 当 background_taskroute_message 中出现异常时,消息会丢失且没有明确的错误日志。

2. 消息处理异常捕获范围过大

位置: client.py 第 77-80 行

async def background_task(self, json_message):
    try:
        route_result = await self.route_message(json_message)
        if route_result == DingTalkStreamClient.TAG_DISCONNECT:
            await self.websocket.close()
    except Exception as e:
        self.logger.error(f"error processing message: {e}")

问题:

  • 捕获所有异常但仅记录简单的错误信息
  • 缺少消息内容、堆栈跟踪等关键调试信息
  • 异常发生后消息直接丢弃,没有重试机制

影响: 当业务 handler 处理逻辑有问题时,消息会静默丢失,难以排查根因。

3. Keepalive 任务生命周期管理不当

位置: client.py 第 66 行

asyncio.create_task(self.keepalive(websocket))

问题:

  • keepalive 任务没有被取消或等待
  • 当 websocket 连接异常退出时,keepalive 任务可能继续运行
  • 可能导致资源泄漏和意外的 websocket 操作

4. 网络异常重连延迟过长

位置: client.py 第 60 行和 74 行

await asyncio.sleep(10)  # 连接失败时
await asyncio.sleep(10)  # 网络异常时

问题:

  • 10 秒的重连间隔在网络短暂抖动时过长
  • 这段时间内的消息会完全丢失
  • 缺少指数退避或更智能的重连策略

5. JSON 解析异常未单独处理

位置: client.py 第 68 行

json_message = json.loads(raw_message)

问题:

  • JSON 解析失败会导致整个 websocket 连接中断
  • 应该只跳过当前消息并记录错误,而不是影响后续消息

复现步骤

由于是偶发问题,较难稳定复现,但以下场景更容易触发:

  1. 高并发消息推送(如批量发送钉钉消息)
  2. 网络环境不稳定时
  3. Handler 处理逻辑耗时较长或存在异常时
  4. 服务器负载较高时

预期行为

  1. 所有收到的消息都应该被可靠处理
  2. 处理失败的消息应该有明确的错误日志和堆栈信息
  3. 异步任务异常不应该静默丢失
  4. 网络异常恢复后应该快速重连

建议的修复方案

方案 1: 任务追踪和异常处理

async def start(self):
    self.pre_start()
    self._running_tasks = set()  # 追踪运行中的任务
    
    while True:
        keepalive_task = None
        try:
            connection = self.open_connection()
            if not connection:
                self.logger.error('open connection failed')
                await asyncio.sleep(5)
                continue
            
            uri = f'{connection["endpoint"]}?ticket={quote_plus(connection["ticket"])}'
            async with websockets.connect(uri) as websocket:
                self.websocket = websocket
                keepalive_task = asyncio.create_task(self.keepalive(websocket))
                
                async for raw_message in websocket:
                    try:
                        json_message = json.loads(raw_message)
                        task = asyncio.create_task(self.background_task(json_message))
                        self._running_tasks.add(task)
                        task.add_done_callback(lambda t: self._task_done_callback(t))
                    except json.JSONDecodeError as e:
                        self.logger.error(f'Invalid JSON message: {e}, raw={raw_message[:200]}')
                        
        except KeyboardInterrupt:
            break
        except Exception as e:
            self.logger.exception('Connection error: %s', e)
            await asyncio.sleep(3)
        finally:
            if keepalive_task:
                keepalive_task.cancel()
                try:
                    await keepalive_task
                except asyncio.CancelledError:
                    pass

def _task_done_callback(self, task):
    self._running_tasks.discard(task)
    if task.exception():
        self.logger.error(f"Task failed with exception: {task.exception()}", 
                         exc_info=task.exception())

方案 2: 增强错误日志

async def background_task(self, json_message):
    message_id = json_message.get('headers', {}).get('messageId', 'unknown')
    try:
        self.logger.debug(f"Processing message: {message_id}")
        route_result = await self.route_message(json_message)
        if route_result == DingTalkStreamClient.TAG_DISCONNECT:
            await self.websocket.close()
        self.logger.debug(f"Message processed successfully: {message_id}")
    except Exception as e:
        self.logger.error(
            f"Error processing message: {message_id}, "
            f"type: {json_message.get('type')}, "
            f"topic: {json_message.get('headers', {}).get('topic')}, "
            f"error: {e}",
            exc_info=True
        )

方案 3: 添加消息处理监控

建议在 SDK 中添加以下统计指标:

self._metrics = {
    'messages_received': 0,
    'messages_processed': 0,
    'messages_failed': 0,
    'reconnect_count': 0,
}

临时解决方案

在官方修复之前,用户可以通过以下方式缓解问题:

  1. 在业务代码的 handler 中增加 try-except 确保不抛出未捕获异常
  2. 添加应用层的消息去重和重试机制
  3. 监控 SDK 的错误日志,及时发现问题
  4. 降低单个 handler 的处理时间,避免阻塞

相关信息

  • 相关文件: dingtalk_stream/client.py
  • 相关方法: start(), background_task(), route_message()

感谢钉钉团队的辛勤工作!期待这些问题能够得到重视和修复。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions