Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions skeleton/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ data/reports/*
data/cf-reports/*
!data/cf-reports/.gitkeep

# 运行时队列和状态文件
data/*.json
data/*.log

# 环境配置
.env
.env.local
Expand Down
16 changes: 12 additions & 4 deletions skeleton/CLAUDE.md.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ Dashboard API 速查:

### 轮询重启铁律(Critical)

每次处理完 Dashboard 消息后,**必须立即重新启动**后台轮询
每次处理完 Dashboard 消息后,**必须按顺序执行**
```bash
# 1. ACK 确认已处理(清除服务端队列)
curl -sf -X POST http://localhost:7890/api/messages/ack -H 'Content-Type: application/json' -d '{}'
# 2. 清除本地兜底文件
rm -f data/pending-messages.json
# 3. 重启轮询
bash scripts/dashboard-poll.sh # run_in_background, timeout 600000
```
不重启 = Claude 变聋,无法感知后续 Dashboard/Plugin 消息。这是最常犯的错误。
Expand Down Expand Up @@ -148,7 +153,7 @@ Claude 会话可能因 context 限制自动结束。各组件恢复机制:
- **Dashboard 服务**:独立 Express 进程,不受影响
- **轮询**:依赖 run_in_background,新会话启动序列自动重启
- **Team**:跟随会话生命周期,新会话自动重建
- **消息队列**:Plugin 报告应持久化到 data/reports/,不只依赖内存队列
- **消息队列**:服务端持久化到 `data/message-queue.json`,轮询兜底到 `data/pending-messages.json`,双重保障不丢消息
- **关键**:daemon 自动循环,不需要手动重启

---
Expand Down Expand Up @@ -273,9 +278,12 @@ curl -s -X POST http://localhost:{{DASHBOARD_PORT}}/api/worker/$wId/done \
| `/api/worker/:id/remove` | POST | 小人走回 |
| `/api/operation` | POST | 操作横幅 |
| `/api/progress` | POST | 进度条 |
| `/api/claude/status` | POST | Claude 状态 |
| `/api/messages` | GET | 获取并清空消息队列 |
| `/api/claude/status` | GET | 读取 Claude 状态 |
| `/api/claude/status` | POST | 设置 Claude 状态 |
| `/api/messages` | GET | peek 消息队列(不消费) |
| `/api/messages/ack` | POST | 确认已处理消息(`{upToId}` 或清空全部) |
| `/api/messages` | POST | 外部注入消息(Plugin 使用) |
| `/api/messages` | DELETE | 清空消息队列 |

### 消息格式

Expand Down
59 changes: 50 additions & 9 deletions skeleton/plugins/feishu-notify/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
import logging
import re
import subprocess
import time
import requests

# ─── 消息去重(防止 WebSocket 重连时重复投递)───
_seen_messages: dict[str, float] = {} # message_id → timestamp
_DEDUP_TTL = 60 # 60秒内同一 message_id 只处理一次

# 加载 .env(start.sh 已预加载,这里是备用)
_script_dir = os.path.dirname(os.path.abspath(__file__))
_project_dir = os.path.dirname(os.path.dirname(_script_dir)) # plugins/feishu-notify → project root
_project_dir = os.path.dirname(
os.path.dirname(_script_dir)
) # plugins/feishu-notify → project root
_env_file = os.path.join(_project_dir, ".env")
if os.path.exists(_env_file):
with open(_env_file) as f:
Expand Down Expand Up @@ -50,6 +57,7 @@
# ── 实体别名列表(从 entities.yaml 动态读取)──────────────────────────────
KNOWN_ALIASES = []


def _load_aliases():
"""从 entities.yaml 加载实体别名"""
global KNOWN_ALIASES
Expand All @@ -58,6 +66,7 @@ def _load_aliases():
return
try:
import yaml

with open(entities_file) as f:
config = yaml.safe_load(f)
if config and "entities" in config:
Expand All @@ -69,12 +78,13 @@ def _load_aliases():
# 无 PyYAML,尝试简单解析
with open(entities_file) as f:
for line in f:
m = re.match(r'\s*-?\s*alias:\s*(.+)', line)
m = re.match(r"\s*-?\s*alias:\s*(.+)", line)
if m:
KNOWN_ALIASES.append(m.group(1).strip().strip('"').strip("'"))
except Exception as e:
log.warning("加载 entities.yaml 失败: %s", e)


_load_aliases()


Expand All @@ -86,8 +96,10 @@ def feishu_reply(message_id: str, text: str) -> None:
try:
result = subprocess.run(
["bash", _reply_script, message_id, text],
capture_output=True, text=True, timeout=10,
cwd=_project_dir
capture_output=True,
text=True,
timeout=10,
cwd=_project_dir,
)
if result.returncode == 0:
log.info("离线自动回复成功: %s", message_id)
Expand Down Expand Up @@ -141,22 +153,46 @@ def map_to_dashboard(text: str, sender_id: str) -> dict:
if re.search(r"(日志|log)", t):
return {"type": "server_action", "server": alias, "action": "logs"}
if re.search(r"(部署|deploy|发布)", t):
instructions = re.sub(r"部署|deploy|发布|\s*" + alias, "", text, flags=re.I).strip()
return {"type": "server_action", "server": alias, "action": "deploy",
"instructions": instructions or "按默认流程部署"}
instructions = re.sub(
r"部署|deploy|发布|\s*" + alias, "", text, flags=re.I
).strip()
return {
"type": "server_action",
"server": alias,
"action": "deploy",
"instructions": instructions or "按默认流程部署",
}
# 默认:检查
return {"type": "server_action", "server": alias, "action": "check"}

# 通用文本 → feishu_text(Claude 自由处理)
return {"type": "feishu_text", "text": text}


def _dedup_check(message_id: str) -> bool:
"""检查消息是否重复,返回 True 表示重复应跳过"""
now = time.time()
# 清理过期条目
expired = [k for k, v in _seen_messages.items() if now - v >= _DEDUP_TTL]
for k in expired:
del _seen_messages[k]
if message_id in _seen_messages:
return True
_seen_messages[message_id] = now
return False


def on_message(data: P2ImMessageReceiveV1) -> None:
"""飞书消息事件处理器"""
try:
msg = data.event.message
sender = data.event.sender

# 消息去重:防止 WebSocket 重连时重复投递
if _dedup_check(msg.message_id):
log.info("重复消息,跳过: %s", msg.message_id)
return

text = extract_text(msg)
if not text:
log.info("非文本消息,跳过 (type=%s)", msg.message_type)
Expand Down Expand Up @@ -187,14 +223,19 @@ def on_message(data: P2ImMessageReceiveV1) -> None:

# Dashboard 不可达 → 整个系统未启动,通知用户
if not dashboard_ok:
feishu_reply(msg.message_id, "⚠️ Agent 系统当前未启动,消息无法入队。请打开 Claude Code 项目后重新发送。")
feishu_reply(
msg.message_id,
"⚠️ Agent 系统当前未启动,消息无法入队。请打开 Claude Code 项目后重新发送。",
)
return

# Dashboard 可达但 Claude 未连接 → 消息已入队,发送等待提示
claude_status = check_claude_online()
if claude_status not in ("connected", "working", "idle"):
log.info("消息已入队,Claude 当前状态: %s", claude_status)
feishu_reply(msg.message_id, "📥 消息已记录,Claude 当前不在线,上线后会自动处理。")
feishu_reply(
msg.message_id, "📥 消息已记录,Claude 当前不在线,上线后会自动处理。"
)

except Exception as e:
log.error("消息处理失败: %s", e, exc_info=True)
Expand Down
45 changes: 28 additions & 17 deletions skeleton/scripts/dashboard-poll.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/bin/bash
# Dashboard 消息轮询后台脚本
# 正常模式(DAEMON_MODE=0):每 3 秒检查消息队列,发现消息后合并输出并退出唤醒 Claude
# 守护模式(DAEMON_MODE=1):跳过消息轮询(不消费队列),只维持心跳 + Worker 健康检查
# 每 60 秒检查 Team Worker 心跳,缺失时注入 ping_worker 消息
# 正常模式(DAEMON_MODE=0):每 3 秒 peek 消息队列(不消费),发现消息后写入本地文件并退出唤醒 Claude
# 守护模式(DAEMON_MODE=1):跳过消息轮询,只维持心跳 + Worker 健康检查
# Claude 处理完消息后调用 POST /api/messages/ack 确认清除

PROJECT_DIR="${CLAUDE_PROJECT_DIR:-$(cd "$(dirname "$0")/.." && pwd)}"
PROJECT_NAME=$(basename "$PROJECT_DIR")
POLL_PID_FILE="/tmp/claude-${PROJECT_NAME}-dashboard-poll.pid"
PENDING_FILE="$PROJECT_DIR/data/pending-messages.json"

# 加载项目 .env(确保读到正确的 DASHBOARD_PORT)
[ -f "$PROJECT_DIR/.env" ] && { set -a; source "$PROJECT_DIR/.env"; set +a; }
Expand All @@ -24,29 +25,39 @@ LAST_HEALTH_CHECK=$(date +%s)
LAST_PING_TIME=0
PING_COOLDOWN=3600 # 1 小时内不重复 ping 同一个 worker

# ─── 启动时检查本地缓存的未处理消息 ───
if [ "$DAEMON_MODE" != "1" ] && [ -f "$PENDING_FILE" ] && [ -s "$PENDING_FILE" ]; then
pending=$(cat "$PENDING_FILE")
count=$(echo "$pending" | jq 'length' 2>/dev/null)
if [ "$count" -gt "0" ] 2>/dev/null; then
echo "=== 本地缓存的未处理消息 (共 ${count} 条) ==="
echo "$pending" | jq '.[]'
rm -f "$POLL_PID_FILE"
exit 0
fi
fi

while true; do

# 仅正常模式轮询消息(DAEMON_MODE=1 时跳过,避免消费队列但无法唤醒 Claude
# 仅正常模式轮询消息(DAEMON_MODE=1 时跳过)
if [ "$DAEMON_MODE" != "1" ]; then
result=$(curl -sf "$BASE_URL/api/messages" 2>/dev/null)
count=$(echo "$result" | jq '.messages | length' 2>/dev/null)

if [ "$count" -gt "0" ] 2>/dev/null; then
new_msgs=$(echo "$result" | jq '.messages')
ALL_MESSAGES=$(echo "$ALL_MESSAGES $new_msgs" | jq -s 'add')

# 合并窗口:再等 3 秒看有没有更多消息
sleep 3
result2=$(curl -sf "$BASE_URL/api/messages" 2>/dev/null)
count2=$(echo "$result2" | jq '.messages | length' 2>/dev/null)
if [ "$count2" -gt "0" ] 2>/dev/null; then
new_msgs2=$(echo "$result2" | jq '.messages')
ALL_MESSAGES=$(echo "$ALL_MESSAGES $new_msgs2" | jq -s 'add')
fi
# 短暂合并窗口:等 2 秒让更多消息到达,再 peek 一次拿到完整集合
sleep 2
result=$(curl -sf "$BASE_URL/api/messages" 2>/dev/null)

messages=$(echo "$result" | jq '.messages')
total=$(echo "$messages" | jq 'length')

# 写入本地文件作为兜底(Claude 崩溃时不丢消息)
mkdir -p "$PROJECT_DIR/data"
echo "$messages" > "$PENDING_FILE"

total=$(echo "$ALL_MESSAGES" | jq 'length')
echo "=== Dashboard 新消息 (共 ${total} 条) ==="
echo "$ALL_MESSAGES" | jq '.[]'
echo "$messages" | jq '.[]'

# 退出前自启 DAEMON_MODE 副本保活(只做心跳,不消费消息队列)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
Expand Down
Loading