diff --git a/codec_agent_plan.py b/codec_agent_plan.py index ad31235..a0fa316 100644 --- a/codec_agent_plan.py +++ b/codec_agent_plan.py @@ -290,6 +290,21 @@ def set_grants_hash(agent_id: str) -> str: return h +def grants_lock(agent_id: str): + """C5 (Fix #5): cross-process flock around the per-agent grants.json + read-modify-write (mirrors _status_lock for the manifest CAS). The /grant + endpoint holds this across load_grants -> modify -> save_grants -> + set_grants_hash so two concurrent grants can't clobber each other. Falls + back to a no-op context if codec_jsonstore is unavailable (headless/CI) — + never breaks grant_permission.""" + try: + import codec_jsonstore + return codec_jsonstore.file_lock(_agent_dir(agent_id) / "grants.json") + except Exception: + import contextlib + return contextlib.nullcontext() + + # ── Skill-registry validation ───────────────────────────────────────────────── def validate_plan_skills(plan: Plan, registry=None) -> Tuple[bool, List[str]]: """Walk every checkpoint's skills_needed; return (ok, missing_skills). diff --git a/codec_agents.py b/codec_agents.py index ee799e7..77c517b 100644 --- a/codec_agents.py +++ b/codec_agents.py @@ -180,7 +180,26 @@ def _web_search(query: str) -> str: def _web_fetch(url: str) -> str: try: - r = _sync_http.get(url.strip()) + # Fix #7 (H1) + re-audit N3: SSRF guard BEFORE the request AND on every + # redirect hop. The fetched text is returned to the agent/LLM, so a read + # of an internal/metadata host is an exfil path; _sync_http defaults to + # follow_redirects=True, which would reach an internal target via a 302 + # the guard never saw — so we follow redirects manually here. + import codec_ssrf + from urllib.parse import urljoin + cur = url.strip() + try: + for _ in range(6): # initial request + up to 5 redirects + codec_ssrf.validate_url(cur) + r = _sync_http.get(cur, follow_redirects=False) + if r.is_redirect and r.headers.get("location"): + cur = urljoin(cur, r.headers["location"]) + continue + break + else: + return "Fetch error: blocked URL (too many redirects)" + except codec_ssrf.SSRFError as e: + return f"Fetch error: blocked URL ({e})" if r.status_code in (401, 403): return f"Blocked by site (HTTP {r.status_code}). Site requires JavaScript or blocks automated access." if r.status_code >= 400: diff --git a/codec_ask_user.py b/codec_ask_user.py index edf38cf..5e043bd 100644 --- a/codec_ask_user.py +++ b/codec_ask_user.py @@ -201,7 +201,12 @@ def _write_question_notification(record: dict) -> None: surface; failures here log + continue. """ try: - with _FILE_LOCK: + # C5 (Fix #5): hold the cross-process file_lock across the whole + # read-modify-write so concurrent daemons (dashboard / voice / + # agent-runner) can't clobber each other's append. _FILE_LOCK is the + # in-process guard; codec_jsonstore.file_lock is the cross-process one — + # same pairing the PENDING_QUESTIONS read-modify-write already uses. + with _FILE_LOCK, codec_jsonstore.file_lock(NOTIFICATIONS_PATH): try: with open(NOTIFICATIONS_PATH) as f: notifs = json.load(f) @@ -648,7 +653,10 @@ def submit_answer(qid: str, answer: str, *, answered_via: str = "pwa") -> dict: # Mark the matching notification as read. try: - with _FILE_LOCK: + # C5 (Fix #5): same cross-process file_lock as the question-write path + # above, so a concurrent _write_question_notification and this + # mark-read can't clobber each other on notifications.json. + with _FILE_LOCK, codec_jsonstore.file_lock(NOTIFICATIONS_PATH): try: with open(NOTIFICATIONS_PATH) as f: notifs = json.load(f) diff --git a/codec_concurrency.py b/codec_concurrency.py new file mode 100644 index 0000000..ffdbcce --- /dev/null +++ b/codec_concurrency.py @@ -0,0 +1,72 @@ +"""codec_concurrency — small, dependency-free concurrency helpers. + +`run_with_timeout` runs a callable in a daemon thread with a hard wall-clock +timeout that ACTUALLY bounds wall-clock time. + +Motivation (audit C4): the common idiom + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: + fut = ex.submit(fn) + return fut.result(timeout=T) + +defeats its own timeout. When `fut.result(timeout=T)` raises TimeoutError, +the `with` block's __exit__ calls `executor.shutdown(wait=True)`, which BLOCKS +until the runaway task finishes. So a 50ms "timeout" on a 5s task takes ~5s — +the MCP tool dispatch (codec_mcp) and the observer OCR call (codec_observer) +could hang on a slow skill / screencapture popup. + +This helper uses a daemon thread + `join(timeout=...)` and never calls +shutdown(wait=True): on timeout it abandons the still-running worker and +raises promptly. daemon=True means an abandoned worker never blocks process +shutdown. Same shape as the proven `codec_hooks._run_hook_with_timeout`. +""" +import queue +import threading +from typing import Any, Callable + +__all__ = ["run_with_timeout"] + + +def run_with_timeout( + fn: Callable[..., Any], + timeout: float, + *args: Any, + **kwargs: Any, +) -> Any: + """Run ``fn(*args, **kwargs)`` in a daemon thread, bounded by ``timeout`` seconds. + + Returns ``fn``'s return value on success. Re-raises, in the calling thread, + any exception ``fn`` raised (original type, message, and instance preserved). + Raises ``TimeoutError`` if ``fn`` does not complete within ``timeout`` — + promptly, without waiting for the (abandoned, still-running) worker to finish. + + On Python 3.11+ ``concurrent.futures.TimeoutError`` is an alias of the + builtin ``TimeoutError``, so call sites catching either are satisfied. + """ + result_q: "queue.Queue[Any]" = queue.Queue(maxsize=1) + exc_q: "queue.Queue[BaseException]" = queue.Queue(maxsize=1) + + def _runner() -> None: + try: + result_q.put(fn(*args, **kwargs)) + except BaseException as e: # noqa: BLE001 — propagate ANY error to the caller + try: + exc_q.put(e) + except Exception: + pass + + t = threading.Thread(target=_runner, daemon=True, name="codec-run-with-timeout") + t.start() + t.join(timeout=timeout) + + if t.is_alive(): + # Abandon the worker — daemon=True so it never blocks shutdown. No + # shutdown(wait=True), so we return control to the caller immediately. + raise TimeoutError(f"operation exceeded {timeout}s timeout") + if not exc_q.empty(): + raise exc_q.get_nowait() + if not result_q.empty(): + return result_q.get_nowait() + # Thread finished without putting a result or exception — only reachable if + # the result put itself failed. Treat as a None return. + return None diff --git a/codec_config.py b/codec_config.py index 6e0ef4b..30fcac8 100644 --- a/codec_config.py +++ b/codec_config.py @@ -714,6 +714,10 @@ def is_dangerous(cmd): _SKILL_DANGEROUS_MODULES = { "os", "subprocess", "ctypes", "shutil", "importlib", "signal", "pty", "socket", + # re-audit N19: `import builtins; builtins.exec(src)` bypassed the gate — + # builtins.exec/eval are ast.Attribute calls, not the bare-name Calls the + # _SKILL_DANGEROUS_CALLS check catches. Blocking the import closes it. + "builtins", } _SKILL_SAFE_MODULES = { "json", "re", "math", "datetime", "collections", "itertools", "functools", diff --git a/codec_consent.py b/codec_consent.py new file mode 100644 index 0000000..556281f --- /dev/null +++ b/codec_consent.py @@ -0,0 +1,102 @@ +"""codec_consent — strict-consent gate for the chat + MCP skill paths. + +Re-audit (red-team CHAIN-001/002/006): the Step-3 consent gate +(docs/PHASE1-STEP3-DESIGN.md §1.7) was wired ONLY into codec_agent_runner. The +chat ([SKILL:] tag + pre-LLM hijack → codec_dispatch.run_skill) and MCP +(codec_mcp.tool_fn) paths could reach high-power skills with only the +`is_dangerous` heuristic / path blocklists. This module is the shared +classifier + per-transport policy: + + - MCP → hard-refuse destructive skills (claude.ai can't consent at the + operator tier; consistent with the _HTTP_BLOCKED principle). + - chat → require explicit confirmation (the handler returns consent_required; + the user confirms; re-dispatch carries a token). + - voice/agent → existing ask_user announce-and-listen (unchanged). + +A skill is "destructive" if it declares `SKILL_DESTRUCTIVE = True` +(registry-AST-extracted — the extensible per-skill path, Decision C), OR is in +`codec_config._HTTP_BLOCKED`, OR is one of the known high-power built-ins below +(so coverage doesn't depend on regenerating the hash-pinned skill manifest). + +Kill switch: `CONSENT_GATE_ENABLED=false`. +""" +import os + +__all__ = ["gate_enabled", "is_destructive_skill", "chat_consent_ok", "mcp_refuse_message"] + +# Known high-power built-ins that are destructive but NOT in _HTTP_BLOCKED. +# (terminal / python_exec / process_manager / pm2_control / ax_control are +# already covered by the _HTTP_BLOCKED backstop.) +_DESTRUCTIVE_BUILTINS = frozenset({ + "file_ops", # write/append/delete to the filesystem + "file_write", # writes files + "imessage_send", # sends messages as the user + "pilot", # drives a real browser session + "skill_forge", # writes a skill to disk (no review gate) +}) + + +def gate_enabled() -> bool: + """Consent gate on by default; CONSENT_GATE_ENABLED=false disables it.""" + return os.environ.get("CONSENT_GATE_ENABLED", "true").lower() != "false" + + +def is_destructive_skill(tool_name, registry=None) -> bool: + """True if `tool_name` is a high-power/destructive skill needing consent + (chat) or refusal (MCP). Never raises.""" + if not tool_name: + return False + # 1) per-skill SKILL_DESTRUCTIVE flag (extensible — user skills opt in) + try: + reg = registry + if reg is None: + from codec_dispatch import registry as reg # the singleton + if reg is not None and reg.get_destructive(tool_name): + return True + except Exception: + pass + # 2) _HTTP_BLOCKED backstop (terminal, python_exec, process_manager, …) + try: + from codec_config import _HTTP_BLOCKED + if tool_name in _HTTP_BLOCKED: + return True + except Exception: + pass + # 3) known high-power built-ins + return tool_name in _DESTRUCTIVE_BUILTINS + + +def chat_consent_ok(tool_name, query, *, registry=None) -> bool: + """Chat path (A2): a destructive skill requires explicit consent via the + existing AskUserQuestion PWA panel (Phase 1 Step 3 §1.7 — literal verb-match; + generic yes/ok rejected). Returns True if the skill may run (non-destructive, + gate disabled, or consent granted); False if blocked (declined / timeout / + ask_user unavailable). BLOCKS the worker thread on ask_user until answered — + the chat handler invokes this via asyncio.to_thread, so the event loop isn't + blocked. Fail-closed: any error → False (a destructive skill never + auto-runs).""" + if not gate_enabled() or not is_destructive_skill(tool_name, registry=registry): + return True + try: + import codec_ask_user + answer = codec_ask_user.ask( + f"CODEC wants to run the '{tool_name}' skill — a destructive / " + f"high-power operation — for: {(query or '')[:200]}", + destructive=True, + asked_from="chat", + tool_name=tool_name, + ) + return answer not in ( + codec_ask_user.TIMEOUT_SENTINEL, + codec_ask_user.DISABLED_SENTINEL, + ) + except Exception: + return False + + +def mcp_refuse_message(tool_name) -> str: + return ( + f"Skill '{tool_name}' is a destructive/high-power operation and is not " + "permitted over MCP. Run it locally (chat or voice), where the operator " + "can confirm it." + ) diff --git a/codec_dashboard.py b/codec_dashboard.py index 978d55b..6136ee9 100644 --- a/codec_dashboard.py +++ b/codec_dashboard.py @@ -27,7 +27,7 @@ _append_schedule_run_log, AUTH_ENABLED, AUTH_SESSION_HOURS, AUTH_COOKIE_NAME, _auth_sessions, _auth_lock, _e2e_keys, - _auth_available, _verify_biometric_session, + _auth_available, _verify_biometric_session, _session_token_valid, _save_sessions, _save_e2e_keys, get_db, _pending_approvals, _approval_lock, _evict_expired_approvals, @@ -164,14 +164,14 @@ async def dispatch(self, request, call_next): if AUTH_ENABLED and _auth_available(): if _verify_biometric_session(request): return await call_next(request) - # Fallback: accept session token as ?s= query param (for img/stream URLs on mobile) + # Fallback: accept session token as ?s= query param (for img/stream + # URLs on mobile). re-audit N5: route through _session_token_valid so + # the TOTP-verified gate is enforced here too — previously this path + # checked only token existence + age, letting a pre-TOTP token skip + # 2FA via ?s= on any GET /api endpoint. qs_token = request.query_params.get("s", "") - if qs_token and request.method == "GET": - with _auth_lock: - if qs_token in _auth_sessions: - session = _auth_sessions[qs_token] - if datetime.now() - session["created"] <= timedelta(hours=AUTH_SESSION_HOURS): - return await call_next(request) + if qs_token and request.method == "GET" and _session_token_valid(qs_token): + return await call_next(request) # Biometric failed — reject cookie_val = request.cookies.get(AUTH_COOKIE_NAME, "") log.warning("AUTH REJECTED: path=%s method=%s ip=%s cookie=%s...", @@ -2239,7 +2239,11 @@ async def web_search_endpoint(request: Request): # Self-improvement & meta "ai_news_digest", "scheduler", # Skill creation & delegation - "create_skill", "skill_forge", "ask_codec_to_build", "delegate", + # re-audit (CHAIN-002): skill_forge writes forged code to disk WITHOUT the + # review gate, so it must not be auto-firable from a chat [SKILL:...] tag — + # skill creation goes through create_skill's review-and-approve flow only + # (PR-1B). ask_codec_to_build had no backing skill file (stale entry). + "create_skill", "delegate", # Phase 2 Step 7 — end-of-day shift report (read-only, no destructive side effects) "shift_report", # Phase 2 Step 6 — first declarative trigger (clipboard URL → web_fetch). @@ -2448,6 +2452,14 @@ def _try_skill(user_text: str): from codec_dispatch import check_skill, run_skill skill = check_skill(user_text) if skill and skill.get("name") in CHAT_SKILL_ALLOWLIST: + # re-audit A2: destructive skills need explicit consent (reuses the + # AskUserQuestion PWA panel; blocks this worker thread until answered). + import codec_consent + if not codec_consent.chat_consent_ok(skill["name"], user_text): + return skill["name"], ( + f"⚠ '{skill['name']}' is a destructive operation and wasn't " + "confirmed — skipped." + ) result = run_skill(skill, user_text, app="CODEC Chat") if result is not None: return skill["name"], str(result) @@ -2468,6 +2480,14 @@ def _try_skill_by_name(name: str, query: str): try: from codec_dispatch import run_skill skill = {"name": name} + # re-audit A2: a destructive skill emitted via a post-LLM [SKILL:...] tag + # (the prompt-injection vector) needs explicit consent before it runs — + # reuses the AskUserQuestion PWA panel. Blocks until answered. + import codec_consent + if not codec_consent.chat_consent_ok(name, query): + return name, ( + f"⚠ '{name}' is a destructive operation and wasn't confirmed — skipped." + ) result = run_skill(skill, query, app="CODEC Chat (LLM-routed)") if result is not None: return name, str(result) @@ -2646,6 +2666,131 @@ def _should_escalate_to_project(user_text: str, session_id: str) -> dict: } +def _chat_vision_response(body: dict, messages: list): + """If the request carries images, route to the vision model and return the + response dict; else return None. Fix #8 (intra-file CC reduction): + extracted verbatim from chat_completion, behavior-preserving. The inline + vision POST is an A-11-pending site and stays in codec_dashboard.""" + images = body.get("images", []) + if not images: + return None + import requests as rq2 + config2 = {} + try: + with open(CONFIG_PATH) as f: + config2 = json.load(f) + except (OSError, json.JSONDecodeError) as e: + log.warning(f"Config read failed; proceeding without overrides: {e}") + vision_url = config2.get("vision_base_url", "http://localhost:8083/v1") + vision_model = config2.get("vision_model", "mlx-community/Qwen2.5-VL-7B-Instruct-4bit") + # Build multimodal message: last user text + all images + last_text = "" + for m in reversed(messages): + if m.get("role") == "user" and isinstance(m.get("content"), str): + last_text = m["content"] + break + if not last_text: + last_text = "Describe and analyze this image in detail." + mm_content = [] + for img_b64 in images: + mm_content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}) + mm_content.append({"type": "text", "text": last_text}) + v_payload = { + "model": vision_model, + "messages": [{"role": "user", "content": mm_content}], + "max_tokens": 4000, + "temperature": 0.7 + } + # re-audit N7: guard the vision-backend call + parse. This helper runs + # OUTSIDE chat_completion's try/except, so a non-200 / malformed response + # (model not loaded, OOM, timeout) previously surfaced as a raw 500 with no + # JSON body. Return a graceful 502 instead. + try: + vr = rq2.post(f"{vision_url}/chat/completions", json=v_payload, + headers={"Content-Type": "application/json"}, timeout=120) + vr.raise_for_status() + vdata = vr.json() + vanswer = vdata["choices"][0]["message"]["content"].strip() + except Exception as e: + log.warning(f"[chat] vision backend call failed: {e}") + return JSONResponse( + {"error": f"Vision model unavailable: {type(e).__name__}"}, + status_code=502, + ) + import re as re2 + vanswer = re2.sub(r'[\s\S]*?', '', vanswer).strip() + return {"response": vanswer, "model": vision_model} + + +def _build_chat_system_prompt(config: dict, budget, has_attachment: bool, + last_user_text: str) -> str: + """Build the chat system prompt: override-aware base + per-turn step-budget + warnings + attachment / content-rewrite / observer-injection suffixes. + + Fix #8 (intra-file CC reduction): extracted verbatim from chat_completion; + behavior-preserving. `budget` is mutated exactly as before — warn_now() and + consume('llm_call') happen here, once, where they ran inline. + """ + from datetime import datetime as _dt + _overrides = _load_prompt_overrides() + _chat_prompt = _overrides.get("chat", CHAT_SYSTEM_PROMPT) + sys_prompt = _chat_prompt.format(date=_dt.now().strftime("%A, %B %d, %Y")) + if budget.warn_now(): + sys_prompt += ( + "\n\n⚠ 1 step remaining in this turn. Wrap up — do NOT " + "emit additional [SKILL:...] tags." + ) + budget.consume("llm_call") + if budget.at_limit(): + sys_prompt += ( + "\n\n## Step Budget Exhausted\n" + "You've hit the per-turn step budget. Summarize what you " + "accomplished and any blockers in one short paragraph. " + "DO NOT emit [SKILL:...] tags or call additional tools." + ) + if has_attachment: + sys_prompt += ( + "\n\n## This Turn\n" + "The user has attached a file or image and its content is already " + "embedded in their message between [IMAGE ANALYSIS]/[DOCUMENT] markers. " + "Respond conversationally about the attached content. " + "DO NOT emit [SKILL:...] tool-calling tags in this response." + ) + _u_text_lower = (last_user_text or "").lower() + _content_rewrite_intent = any( + kw in _u_text_lower for kw in ( + "format my email", "format this email", "format my message", + "reformat", "rewrite", "reword", "redraft", "polish", + "proofread", "edit my email", "fix my email", "fix the grammar", + "make this sound", "translate this", "translate the following", + "draft a reply", "draft an email", "draft this", + ) + ) + if _content_rewrite_intent: + sys_prompt += ( + "\n\n## This Turn\n" + "The user is asking you to generate or rewrite text directly " + "(format/edit/draft/translate/polish their email or message). " + "Respond with the rewritten content as plain prose. " + "DO NOT emit [SKILL:...] tool-calling tags in this response — " + "the answer IS the rewritten text, no tools needed." + ) + try: + from codec_observer import maybe_inject_observation_summary + _obs_transport = "local" if "localhost" in (config.get("llm_base_url") or "") else "chat" + _obs_summary, _obs_reason = maybe_inject_observation_summary( + user_prompt=last_user_text or "", + transport=_obs_transport, + skill_name=None, # post-LLM tag path, no skill resolved yet + skill_module=None, + ) + if _obs_summary: + sys_prompt += f"\n\n{_obs_summary}" + except Exception as _e: + log.debug(f"[observer] injection failed (non-fatal): {_e}") + return sys_prompt + + @app.post("/api/chat") async def chat_completion(request: Request): """Direct LLM chat with full context window + tool calling""" @@ -2730,41 +2875,10 @@ async def _skill_stream(): else: return {"response": f"**⚡ {skill_name}**: {skill_result}", "skill": skill_name} - # Check for images — route to vision model - images = body.get("images", []) - if images: - import requests as rq2 - config2 = {} - try: - with open(CONFIG_PATH) as f: config2 = json.load(f) - except (OSError, json.JSONDecodeError) as e: - log.warning(f"Config read failed; proceeding without overrides: {e}") - vision_url = config2.get("vision_base_url", "http://localhost:8083/v1") - vision_model = config2.get("vision_model", "mlx-community/Qwen2.5-VL-7B-Instruct-4bit") - # Build multimodal message: last user text + all images - last_text = "" - for m in reversed(messages): - if m.get("role") == "user" and isinstance(m.get("content"), str): - last_text = m["content"] - break - if not last_text: - last_text = "Describe and analyze this image in detail." - mm_content = [] - for img_b64 in images: - mm_content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}) - mm_content.append({"type": "text", "text": last_text}) - v_payload = { - "model": vision_model, - "messages": [{"role": "user", "content": mm_content}], - "max_tokens": 4000, - "temperature": 0.7 - } - vr = rq2.post(f"{vision_url}/chat/completions", json=v_payload, headers={"Content-Type": "application/json"}, timeout=120) - vdata = vr.json() - vanswer = vdata["choices"][0]["message"]["content"].strip() - import re as re2 - vanswer = re2.sub(r'[\s\S]*?', '', vanswer).strip() - return {"response": vanswer, "model": vision_model} + # Check for images — route to vision model (extracted, Fix #8) + vision_resp = _chat_vision_response(body, messages) + if vision_resp is not None: + return vision_resp try: config = {} @@ -2783,84 +2897,12 @@ async def _skill_stream(): force_search = body.get("force_search", False) messages = _enrich_messages(messages, config, force_search=bool(force_search)) - # Inject CODEC system prompt (use override if user edited it) - from datetime import datetime as _dt - _overrides = _load_prompt_overrides() - _chat_prompt = _overrides.get("chat", CHAT_SYSTEM_PROMPT) - sys_prompt = _chat_prompt.format(date=_dt.now().strftime("%A, %B %d, %Y")) - # Phase 1 Step 3 §3 — consume one step for the LLM call itself. - # If we're now at limit-1, append the "1 step remaining" warning - # to the system prompt so the LLM wraps up. If we're already - # exhausted, switch to forced-summary mode. - if _budget.warn_now(): - sys_prompt += ( - "\n\n⚠ 1 step remaining in this turn. Wrap up — do NOT " - "emit additional [SKILL:...] tags." - ) - _budget.consume("llm_call") - if _budget.at_limit(): - sys_prompt += ( - "\n\n## Step Budget Exhausted\n" - "You've hit the per-turn step budget. Summarize what you " - "accomplished and any blockers in one short paragraph. " - "DO NOT emit [SKILL:...] tags or call additional tools." - ) - # Bugfix 2026-04-16: when the user attaches a file/image, the default - # system prompt still teaches the LLM to emit [SKILL:...] tags, which - # then leak through the streaming path. Force conversational mode for - # this turn so the LLM analyses the attached content directly. - if has_attachment: - sys_prompt += ( - "\n\n## This Turn\n" - "The user has attached a file or image and its content is already " - "embedded in their message between [IMAGE ANALYSIS]/[DOCUMENT] markers. " - "Respond conversationally about the attached content. " - "DO NOT emit [SKILL:...] tool-calling tags in this response." - ) - - # Bugfix 2026-04-27: detect content-rewriting intents (format/draft/ - # rewrite/reword/proofread an email/message/text) — these are pure-text - # generation tasks, NOT skill calls. Past failure: LLM emitted - # [SKILL:translate:] for "format my email", which ran the - # translate skill on the email and returned "Translation failed." - _u_text_lower = (last_user_text or "").lower() - _content_rewrite_intent = any( - kw in _u_text_lower for kw in ( - "format my email", "format this email", "format my message", - "reformat", "rewrite", "reword", "redraft", "polish", - "proofread", "edit my email", "fix my email", "fix the grammar", - "make this sound", "translate this", "translate the following", - "draft a reply", "draft an email", "draft this", - ) + # Build the system prompt (override + step-budget + attachment / + # content-rewrite / observer suffixes) — extracted to a helper for + # readability (Fix #8). Consumes the llm_call step budget internally. + sys_prompt = _build_chat_system_prompt( + config, _budget, has_attachment, last_user_text ) - if _content_rewrite_intent: - sys_prompt += ( - "\n\n## This Turn\n" - "The user is asking you to generate or rewrite text directly " - "(format/edit/draft/translate/polish their email or message). " - "Respond with the rewritten content as plain prose. " - "DO NOT emit [SKILL:...] tool-calling tags in this response — " - "the answer IS the rewritten text, no tools needed." - ) - # Phase 2 Step 5 — Observer summary injection (gated per §X). - # Local Qwen always injects; cloud transports (this chat path uses - # local-by-default but may be cloud-routed by the user — pass the - # detected transport tag) gate on possessive / continuation / - # skill-flag patterns. Returns (summary_or_None, reason); audit - # emit fires inside the helper ONLY when summary non-None. - try: - from codec_observer import maybe_inject_observation_summary - _obs_transport = "local" if "localhost" in (config.get("llm_base_url") or "") else "chat" - _obs_summary, _obs_reason = maybe_inject_observation_summary( - user_prompt=last_user_text or "", - transport=_obs_transport, - skill_name=None, # post-LLM tag path, no skill resolved yet - skill_module=None, - ) - if _obs_summary: - sys_prompt += f"\n\n{_obs_summary}" - except Exception as _e: - log.debug(f"[observer] injection failed (non-fatal): {_e}") # Prepend system message (or replace existing one) if messages and messages[0].get("role") == "system": @@ -3060,8 +3102,10 @@ async def update_schedule(sched_id: str, request: Request): for s in schedules: if s.get("id") == sched_id: s.update({k: v for k, v in body.items() if k != "id"}) - with open(sched_path, "w") as f: - json.dump(schedules, f, indent=2) + # re-audit medium: atomic write (was truncate-then-write, racing + # the scheduler's concurrent read of schedules.json). + import codec_jsonstore + codec_jsonstore.atomic_write_json(sched_path, schedules) return {"schedule": s} return JSONResponse({"error": "Not found"}, status_code=404) diff --git a/codec_jsonstore.py b/codec_jsonstore.py index 0b77f80..8de103b 100644 --- a/codec_jsonstore.py +++ b/codec_jsonstore.py @@ -24,15 +24,27 @@ from typing import Any, Iterator -def atomic_write_json(path: Any, data: Any) -> None: - """Atomically write `data` as JSON to `path` (tmp + fsync + replace, 0600).""" +def atomic_write_json( + path: Any, + data: Any, + *, + default: Any = None, + sort_keys: bool = False, +) -> None: + """Atomically write `data` as JSON to `path` (tmp + fsync + replace, 0600). + + `default` is the json.dump fallback serializer (pass `str` for datetime/Path + values — Fix #9 Phase 0, so this primitive subsumes the hand-rolled + `default=str` helpers). `sort_keys` forces deterministic key ordering for + callers that need stable diffs/hashes. Both default to the prior behavior. + """ path = str(path) directory = os.path.dirname(path) or "." os.makedirs(directory, exist_ok=True) fd, tmp = tempfile.mkstemp(dir=directory, suffix=".tmp") try: with os.fdopen(fd, "w") as f: - json.dump(data, f, indent=2) + json.dump(data, f, indent=2, default=default, sort_keys=sort_keys) f.flush() os.fsync(f.fileno()) os.replace(tmp, path) @@ -66,3 +78,32 @@ def file_lock(path: Any) -> Iterator[None]: except Exception: pass lock_file.close() + + +def read_modify_write( + path: Any, + mutate_fn, + *, + default_factory=dict, + default: Any = None, + sort_keys: bool = False, +) -> Any: + """Lock + read + mutate + atomic-write `path` as one cross-process-safe unit. + + Holds `file_lock(path)` across the whole read-modify-write so concurrent + daemons can't lose each other's update (Fix #9 — standardizes the pattern so + a future RMW site can't forget the lock). `mutate_fn(data)` receives the + current parsed JSON (or `default_factory()` if the file is missing/corrupt) + and returns the new value to persist. Returns the persisted value. + `default`/`sort_keys` are forwarded to `atomic_write_json`. + """ + path = str(path) + with file_lock(path): + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + except (FileNotFoundError, json.JSONDecodeError, OSError): + data = default_factory() + new_data = mutate_fn(data) + atomic_write_json(path, new_data, default=default, sort_keys=sort_keys) + return new_data diff --git a/codec_mcp.py b/codec_mcp.py index 738ed10..b641f5b 100644 --- a/codec_mcp.py +++ b/codec_mcp.py @@ -18,6 +18,7 @@ SKILL_TIMEOUT_SEC = int(os.environ.get("CODEC_SKILL_TIMEOUT", "30")) from codec_audit import audit as _audit +from codec_concurrency import run_with_timeout from codec_hooks import HookVeto, run_with_hooks @@ -205,6 +206,23 @@ def tool_fn(task: str, context: str = "") -> str: correlation_id=cid) return err + # re-audit B1: hard-refuse destructive / high-power skills over + # MCP — claude.ai (the caller) can't consent at the operator + # tier. Most are already in _HTTP_BLOCKED (refused earlier); this + # covers the full destructive set (file_ops, file_write, + # imessage_send, pilot, …) + any skill declaring SKILL_DESTRUCTIVE. + try: + import codec_consent + if codec_consent.gate_enabled() and codec_consent.is_destructive_skill(rkey, registry=registry): + _audit(sname, event="denied", + task_len=tlen, context_len=clen, + duration_ms=(time.time()-t0)*1000, + outcome="denied", error_type="DestructiveBlockedMCP", + correlation_id=cid) + return codec_consent.mcp_refuse_message(rkey) + except ImportError: + pass # consent module unavailable — fall through (other guards remain) + # Phase 1 Step 2: refactor per design §3.3 path 5. The # threadpool/timeout/result block becomes the `invoke` closure # passed to run_with_hooks. The invoke closure RAISES on skill @@ -213,15 +231,17 @@ def tool_fn(task: str, context: str = "") -> str: _transport = os.environ.get("CODEC_MCP_TRANSPORT", "stdio") def _invoke(t: str, c: str) -> str: - import concurrent.futures def _run_inner(): mod = registry.load(rkey) if mod is None or not hasattr(mod, "run"): raise _SkillLoadError("load_failed") return mod.run(t, c) - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: - fut = ex.submit(_run_inner) - return fut.result(timeout=SKILL_TIMEOUT_SEC) + # C4 fix: run_with_timeout actually bounds wall-clock time. + # The old `with ThreadPoolExecutor() as ex` exit blocked on + # shutdown(wait=True), so a hung skill defeated the timeout. + # It raises TimeoutError (== concurrent.futures.TimeoutError + # on py3.11+), caught by the except clause below. + return run_with_timeout(_run_inner, SKILL_TIMEOUT_SEC) import concurrent.futures try: diff --git a/codec_memory.py b/codec_memory.py index 6e0e6f6..3d400dd 100644 --- a/codec_memory.py +++ b/codec_memory.py @@ -3,6 +3,7 @@ import os import re import sqlite3 +import threading from datetime import datetime, timedelta log = logging.getLogger(__name__) @@ -34,6 +35,13 @@ class CodecMemory: def __init__(self, db_path: str = DB_PATH): self.db_path = db_path self._conn = None + # C5 (Fix #5): serialize all runtime use of the shared sqlite3 + # connection (check_same_thread=False) across threads — concurrent + # access to one connection can corrupt cursor state or segfault. + # RLock so a locked method calling another (get_context->search, + # cleanup->close) doesn't self-deadlock. WAL + busy_timeout remain the + # cross-PROCESS layer; this lock is the in-process one. + self._lock = threading.RLock() self._init_fts() # ── Connection ──────────────────────────────────────────────────────────── @@ -48,12 +56,13 @@ def _get_conn(self) -> sqlite3.Connection: def close(self): """Close the persistent connection. Safe to call multiple times.""" - if self._conn is not None: - try: - self._conn.close() - except Exception as e: - log.debug("Memory DB connection close failed: %s", e) - self._conn = None + with self._lock: + if self._conn is not None: + try: + self._conn.close() + except Exception as e: + log.debug("Memory DB connection close failed: %s", e) + self._conn = None # ── Init ───────────────────────────────────────────────────────────────── @@ -181,13 +190,14 @@ def _migrate(self, conn, from_version): def save(self, session_id: str, role: str, content: str, user_id: str = "default") -> int: """Insert one message. Triggers keep FTS in sync automatically.""" - conn = self._get_conn() - cur = conn.execute( - "INSERT INTO conversations (session_id, timestamp, role, content, user_id) VALUES (?,?,?,?,?)", - (session_id, datetime.now().isoformat(), role, content[:4000], user_id), - ) - conn.commit() - return cur.lastrowid + with self._lock: + conn = self._get_conn() + cur = conn.execute( + "INSERT INTO conversations (session_id, timestamp, role, content, user_id) VALUES (?,?,?,?,?)", + (session_id, datetime.now().isoformat(), role, content[:4000], user_id), + ) + conn.commit() + return cur.lastrowid # ── Search ─────────────────────────────────────────────────────────────── @@ -197,11 +207,12 @@ def search(self, query: str, limit: int = 10, user_id: str = None) -> list[dict] sanitized = _sanitize_fts_query(query) if not sanitized: return [] - conn = self._get_conn() - try: - return self._fts_query(conn, sanitized, limit, user_id=user_id) - except sqlite3.OperationalError: - return [] + with self._lock: + conn = self._get_conn() + try: + return self._fts_query(conn, sanitized, limit, user_id=user_id) + except sqlite3.OperationalError: + return [] def _fts_query(self, conn, query: str, limit: int, user_id: str = None) -> list[dict]: if user_id is not None: @@ -232,28 +243,29 @@ def search_recent(self, days: int = 7, limit: int = 50, user_id: str = None) -> """Return recent conversations from the past N days. If user_id is provided, only return results for that user.""" since = (datetime.now() - timedelta(days=days)).isoformat() - conn = self._get_conn() - if user_id is not None: - rows = conn.execute(""" - SELECT id, session_id, timestamp, role, content - FROM conversations - WHERE timestamp >= ? AND user_id = ? - ORDER BY id DESC - LIMIT ? - """, (since, user_id, limit)).fetchall() - else: - rows = conn.execute(""" - SELECT id, session_id, timestamp, role, content - FROM conversations - WHERE timestamp >= ? - ORDER BY id DESC - LIMIT ? - """, (since, limit)).fetchall() - return [ - {"id": r[0], "session_id": r[1], "timestamp": r[2], - "role": r[3], "content": r[4]} - for r in rows - ] + with self._lock: + conn = self._get_conn() + if user_id is not None: + rows = conn.execute(""" + SELECT id, session_id, timestamp, role, content + FROM conversations + WHERE timestamp >= ? AND user_id = ? + ORDER BY id DESC + LIMIT ? + """, (since, user_id, limit)).fetchall() + else: + rows = conn.execute(""" + SELECT id, session_id, timestamp, role, content + FROM conversations + WHERE timestamp >= ? + ORDER BY id DESC + LIMIT ? + """, (since, limit)).fetchall() + return [ + {"id": r[0], "session_id": r[1], "timestamp": r[2], + "role": r[3], "content": r[4]} + for r in rows + ] def get_context(self, query: str, n: int = 5, user_id: str = None) -> str: """Return a formatted string of top-N matching snippets for LLM injection.""" @@ -270,68 +282,71 @@ def get_context(self, query: str, n: int = 5, user_id: str = None) -> str: def get_sessions(self, limit: int = 20, user_id: str = None) -> list[dict]: """Return distinct sessions with message count and last timestamp. If user_id is provided, only return sessions for that user.""" - conn = self._get_conn() - if user_id is not None: - rows = conn.execute(""" - SELECT session_id, - COUNT(*) AS msg_count, - MIN(timestamp) AS started, - MAX(timestamp) AS last_msg, - MAX(CASE WHEN role='user' THEN content ELSE '' END) AS last_user_msg - FROM conversations - WHERE user_id = ? - GROUP BY session_id - ORDER BY last_msg DESC - LIMIT ? - """, (user_id, limit)).fetchall() - else: - rows = conn.execute(""" - SELECT session_id, - COUNT(*) AS msg_count, - MIN(timestamp) AS started, - MAX(timestamp) AS last_msg, - MAX(CASE WHEN role='user' THEN content ELSE '' END) AS last_user_msg - FROM conversations - GROUP BY session_id - ORDER BY last_msg DESC - LIMIT ? - """, (limit,)).fetchall() - return [ - {"session_id": r[0], "msg_count": r[1], - "started": r[2], "last_msg": r[3], - "preview": (r[4] or "")[:100]} - for r in rows - ] + with self._lock: + conn = self._get_conn() + if user_id is not None: + rows = conn.execute(""" + SELECT session_id, + COUNT(*) AS msg_count, + MIN(timestamp) AS started, + MAX(timestamp) AS last_msg, + MAX(CASE WHEN role='user' THEN content ELSE '' END) AS last_user_msg + FROM conversations + WHERE user_id = ? + GROUP BY session_id + ORDER BY last_msg DESC + LIMIT ? + """, (user_id, limit)).fetchall() + else: + rows = conn.execute(""" + SELECT session_id, + COUNT(*) AS msg_count, + MIN(timestamp) AS started, + MAX(timestamp) AS last_msg, + MAX(CASE WHEN role='user' THEN content ELSE '' END) AS last_user_msg + FROM conversations + GROUP BY session_id + ORDER BY last_msg DESC + LIMIT ? + """, (limit,)).fetchall() + return [ + {"session_id": r[0], "msg_count": r[1], + "started": r[2], "last_msg": r[3], + "preview": (r[4] or "")[:100]} + for r in rows + ] def cleanup(self, retention_days: int = 90) -> dict: """Delete conversations older than retention_days and VACUUM the database. Returns dict with deleted count and final size.""" cutoff = (datetime.now() - timedelta(days=retention_days)).isoformat() - conn = self._get_conn() - before = conn.execute("SELECT COUNT(*) FROM conversations").fetchone()[0] - conn.execute("DELETE FROM conversations WHERE timestamp < ?", (cutoff,)) - conn.commit() - after = conn.execute("SELECT COUNT(*) FROM conversations").fetchone()[0] - deleted = before - after - # Rebuild FTS after bulk delete - if deleted > 0: - conn.execute("INSERT INTO conversations_fts(conversations_fts) VALUES('rebuild')") + with self._lock: + conn = self._get_conn() + before = conn.execute("SELECT COUNT(*) FROM conversations").fetchone()[0] + conn.execute("DELETE FROM conversations WHERE timestamp < ?", (cutoff,)) conn.commit() - # VACUUM requires closing and reopening (cannot run inside a transaction on reused conn) - self.close() - tmp = sqlite3.connect(self.db_path) - tmp.execute("VACUUM") - tmp.close() - size = os.path.getsize(self.db_path) - return {"deleted": deleted, "remaining": after, "size_bytes": size} + after = conn.execute("SELECT COUNT(*) FROM conversations").fetchone()[0] + deleted = before - after + # Rebuild FTS after bulk delete + if deleted > 0: + conn.execute("INSERT INTO conversations_fts(conversations_fts) VALUES('rebuild')") + conn.commit() + # VACUUM requires closing and reopening (cannot run inside a transaction on reused conn) + self.close() # RLock is reentrant, so this nested acquire is safe + tmp = sqlite3.connect(self.db_path) + tmp.execute("VACUUM") + tmp.close() + size = os.path.getsize(self.db_path) + return {"deleted": deleted, "remaining": after, "size_bytes": size} def rebuild_fts(self) -> int: """Full FTS rebuild — use after bulk imports. Returns row count.""" - conn = self._get_conn() - conn.execute("INSERT INTO conversations_fts(conversations_fts) VALUES('rebuild')") - conn.commit() - count = conn.execute("SELECT COUNT(*) FROM conversations_fts").fetchone()[0] - return count + with self._lock: + conn = self._get_conn() + conn.execute("INSERT INTO conversations_fts(conversations_fts) VALUES('rebuild')") + conn.commit() + count = conn.execute("SELECT COUNT(*) FROM conversations_fts").fetchone()[0] + return count # ── CLI ────────────────────────────────────────────────────────────────────── diff --git a/codec_observer.py b/codec_observer.py index ea6d055..6aca9ee 100644 --- a/codec_observer.py +++ b/codec_observer.py @@ -79,11 +79,13 @@ import threading import time from collections import deque -from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError +from concurrent.futures import TimeoutError as FuturesTimeoutError from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple +from codec_concurrency import run_with_timeout + # ── Audit emit ──────────────────────────────────────────────────────────────── # Lazy-import so module import doesn't pull in codec_audit at startup time. # We DO want to fail loudly if audit is unavailable when we try to emit, but @@ -265,17 +267,16 @@ def _get_screenshot_ocr(timeout_ms: int, retry_timeout_ms: int) -> Tuple[str, bo On non-mac or vision-unavailable env: returns ("", True).""" def _ocr_call(timeout_s: float) -> Optional[str]: try: - # Lazy-import skills.screenshot_text — it pulls in vision libs. - # Fail soft if unavailable. - sys.path.insert(0, os.path.expanduser("~/.codec/skills")) - sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/skills") - # Note: we don't actually use a Python import here because the - # skill triggers heavy imports. Instead, use the same primitive - # the skill uses (screencapture + Vision via osascript), but - # subject to a hard timeout. - with ThreadPoolExecutor(max_workers=1) as ex: - future = ex.submit(_screencapture_and_ocr_blocking) - return future.result(timeout=timeout_s) + # re-audit N8: we do NOT import the screenshot_text skill here (it + # triggers heavy vision libs). We use the same primitive it uses + # (screencapture + Vision via osascript), subject to a hard timeout. + # The old `sys.path.insert(...)` lines were dead AND leaked: they ran + # on every poll (up to 2x), growing sys.path unboundedly for the life + # of the codec-observer process. Removed. + # C4 fix: run_with_timeout actually bounds wall-clock time. The old + # `with ThreadPoolExecutor() as ex` exit blocked on shutdown(wait=True), + # so a stuck screencapture popup hung each poll for ~5s. + return run_with_timeout(_screencapture_and_ocr_blocking, timeout_s) except (FuturesTimeoutError, TimeoutError): return None except Exception as e: diff --git a/codec_skill_registry.py b/codec_skill_registry.py index a33f656..b4b1e72 100644 --- a/codec_skill_registry.py +++ b/codec_skill_registry.py @@ -76,6 +76,9 @@ def _extract_metadata(filepath: str) -> Optional[Dict[str, Any]]: "SKILL_OBSERVATION_TRIGGER", # Phase 2 Step 5 §X — skill-flag injection override. "SKILL_NEEDS_OBSERVATION", + # re-audit — marks a high-power skill that requires consent + # before firing from chat / refusal over MCP (codec_consent). + "SKILL_DESTRUCTIVE", ): try: meta[target.id] = ast.literal_eval(node.value) @@ -231,6 +234,12 @@ def get_mcp_expose(self, name: str) -> Optional[bool]: meta = self._meta.get(name, {}) return meta.get("SKILL_MCP_EXPOSE", None) + def get_destructive(self, name: str) -> bool: + """re-audit — True if the skill declares SKILL_DESTRUCTIVE=True (a + high-power op that needs consent on chat / refusal over MCP). Defaults + False when undeclared. Read by codec_consent.is_destructive_skill.""" + return bool(self._meta.get(name, {}).get("SKILL_DESTRUCTIVE", False)) + def get_observation_trigger(self, name: str) -> Optional[Dict[str, Any]]: """Phase 2 Step 6 — return the SKILL_OBSERVATION_TRIGGER dict for a skill, or None if not declared. Validation happens in diff --git a/codec_ssrf.py b/codec_ssrf.py new file mode 100644 index 0000000..f46cbef --- /dev/null +++ b/codec_ssrf.py @@ -0,0 +1,83 @@ +"""codec_ssrf — SSRF guard for outbound URL fetches (Fix #7 / H1·H2·H6). + +`validate_url(url)` raises `SSRFError` when a URL is unsafe to fetch, and +returns the URL unchanged when it is safe. It is the shared chokepoint for +every place CODEC fetches an attacker-influenceable URL (the `web_fetch` +skill — which `clipboard_url_fetch` delegates to — and the `_web_fetch` +crew tool). + +Rejections: +- scheme not in {http, https} (blocks file://, ftp://, gopher://, …) +- missing host +- the host resolves to ANY non-public address: loopback (127.0.0.1, ::1), + private (10/8, 172.16/12, 192.168/16, fc00::/7), link-local (incl. + 169.254.169.254 cloud-metadata), multicast, reserved, or unspecified + (0.0.0.0). Every resolved address is checked, so a dual-record / + dual-stack host that mixes a public and an internal IP is still rejected. + +Limitation (documented): there is a TOCTOU gap between this DNS resolution +and the actual connect() inside requests/httpx — a determined DNS-rebinding +attacker controlling an authoritative server could return a public IP here +and an internal IP at connect time. Closing that fully needs IP-pinned +connections (custom adapter); this guard covers the static-URL and +naive-rebind cases the audit flagged. Keep call sites' own timeouts/size +caps as defence in depth. +""" +import ipaddress +import socket +from urllib.parse import urlparse + +__all__ = ["SSRFError", "validate_url"] + +_ALLOWED_SCHEMES = {"http", "https"} + + +class SSRFError(Exception): + """Raised when a URL is rejected by the SSRF guard.""" + + +def _is_blocked_ip(ip_str: str) -> bool: + try: + ip = ipaddress.ip_address(ip_str) + except ValueError: + return True # unparseable address → block (fail closed) + # IPv4-mapped IPv6 (e.g. ::ffff:127.0.0.1) — unwrap and re-check. + if getattr(ip, "ipv4_mapped", None) is not None: + ip = ip.ipv4_mapped + return ( + ip.is_private + or ip.is_loopback + or ip.is_link_local + or ip.is_multicast + or ip.is_reserved + or ip.is_unspecified + ) + + +def validate_url(url: str) -> str: + """Return `url` if safe to fetch; raise `SSRFError` otherwise.""" + if not url or not isinstance(url, str): + raise SSRFError("empty or non-string URL") + + parsed = urlparse(url.strip()) + scheme = (parsed.scheme or "").lower() + if scheme not in _ALLOWED_SCHEMES: + raise SSRFError(f"scheme '{parsed.scheme}' not allowed (only http/https)") + + host = parsed.hostname + if not host: + raise SSRFError("URL has no host") + + port = parsed.port or (443 if scheme == "https" else 80) + try: + infos = socket.getaddrinfo(host, port, proto=socket.IPPROTO_TCP) + except socket.gaierror as e: + raise SSRFError(f"DNS resolution failed for {host}: {e}") from e + + addrs = {info[4][0] for info in infos} + if not addrs: + raise SSRFError(f"no addresses resolved for {host}") + for addr in addrs: + if _is_blocked_ip(addr): + raise SSRFError(f"host {host!r} resolves to blocked address {addr}") + return url diff --git a/docs/FIX8-ROUTES-CHAT-EXTRACTION-DESIGN.md b/docs/FIX8-ROUTES-CHAT-EXTRACTION-DESIGN.md new file mode 100644 index 0000000..844db7e --- /dev/null +++ b/docs/FIX8-ROUTES-CHAT-EXTRACTION-DESIGN.md @@ -0,0 +1,132 @@ +# FIX8 — `routes/chat.py` extraction (first slice of the dashboard god-module) + +> Follow-on design note required by `docs/SECURITY-REMEDIATION-DESIGN.md` Fix #8 +> and CLAUDE.md §11 (>1-module structural change). **No code is written until +> this note is approved.** Closes the start of audit finding C9. + +**Author:** audit follow-up · **Status:** AWAITING APPROVAL + +--- + +## 1. What & why + +`codec_dashboard.py` is **3,861 LOC** — the audit's C9 "god-module". The single +most complex unit in it is the chat handler `chat_completion` +(`codec_dashboard.py:2650-3005`, ~356 lines, cyclomatic complexity ~48 per the +audit). High complexity in the request path that runs every chat turn is a +maintenance + correctness risk (the streaming/skill-tag/step-budget interplay +is hard to reason about and easy to regress). + +**Goal:** a *behavior-preserving* extraction of the chat-handler cluster into a +new `routes/chat.py` `APIRouter`, mounted via `app.include_router(...)` exactly +like the existing `routes/agents.py`, `routes/skills.py`, `routes/auth.py`. +Pure move + seam. **Zero behavior change.** Complexity of the handler itself is +reduced by splitting its phases into named helpers as part of the move. + +This is explicitly the *first* slice — it does NOT attempt to dismantle the +whole god-module. One coherent, low-risk extraction with the existing chat/ +stream tests as the safety net. + +## 2. The cluster to move (verified against current HEAD) + +| Symbol | Lines | Role | +|---|---|---| +| `CHAT_SKILL_ALLOWLIST` | 2207 | set gating pre-LLM hijack + post-LLM tag | +| `_StepBudget` | 2369 | per-turn step cap (Phase 1 Step 3) | +| `_try_skill` / `_try_skill_by_name` | 2442 / 2459 | pre-LLM skill hijack | +| `_classify_chat_message` | 2570 | Qwen project-escalation classifier (Step 10) | +| `_should_escalate_to_project` | 2616 | 2-signal escalation gate (Step 10) | +| `chat_completion` | 2650-3005 | the `@app.post("/api/chat")` handler | +| `_stream_gen` (nested) | 2889 | SSE streaming generator inside the handler | + +Contiguous span ≈ **lines 2207-3005 (~800 LOC)**. + +## 3. Collaborators (the seam — what crosses the new module boundary) + +`chat_completion` references, and the extraction must thread (import or pass): + +- **Cross-module (already importable, no change):** `codec_chat_stream` + (`SkillTagBuffer`, `SKILL_TAG_RE` — already imported at dashboard:39), + `codec_llm` (`stream(keepalive=True)`, `call(raise_on_error=True)`), + `codec_observer` (system-prompt injection), `codec_memory` (context), + `codec_dispatch.run_skill`, `codec_slash_commands.parse_slash`, + `codec_identity` (system prompt), `codec_audit`. +- **Dashboard-local state that must be SHARED, not duplicated:** + `_autoescalate_silence_set` + `_AUTOESCALATE_SILENCE_LOCK` (Step 10 in-memory + per-session silence — CLAUDE.md "don't-touch from outside"), + `AGENT_AUTO_ESCALATE_ENABLED`, `ESCALATE_CHECKPOINTS_THRESHOLD`, the memory + singleton, any LLM config getters. + +**Decision point (see §7, Decision A):** how to share that module-level state +between `codec_dashboard.py` and the new `routes/chat.py` without creating a +circular import. + +## 4. Proposed approach + +1. **Create `routes/chat.py`** with `router = APIRouter()` (mirrors the 4 + existing route modules). +2. **Move the cluster** (§2) verbatim into it. Convert `@app.post("/api/chat")` + → `@router.post("/api/chat")`. +3. **Resolve shared state via a small `routes/_chat_state.py` (or extend + `routes/_shared.py`)** that owns `_autoescalate_silence_set`, + `_AUTOESCALATE_SILENCE_LOCK`, and the escalation flags/consts. Both + `codec_dashboard` (if it still references them) and `routes/chat` import from + there. This breaks the would-be circular import (`dashboard → chat → dashboard`). +4. **Reduce `chat_completion` CC** by extracting its phases into named helpers + *during* the move (each is a straight cut of an existing block, no logic + change): `_resolve_slash_or_skill(...)`, `_build_chat_messages(...)` + (memory + observer injection), `_stream_chat_response(...)` + (the `_stream_gen` body), `_finalize_nonstream(...)`. Target handler CC ≈ 15. +5. **Mount** in `codec_dashboard.py`: `from routes.chat import router as chat_router; app.include_router(chat_router)` — same pattern already used for agents/skills/auth. +6. Delete the now-moved definitions from `codec_dashboard.py`. + +## 5. Migration / compat + +- **No API change.** Path stays `/api/chat`; request/response/SSE shape + identical. The PWA is untouched. +- **No new dependency.** Pure restructure. +- Auth middleware, step budget, observer injection, skill hijack, post-LLM tag + resolution, auto-escalation all behave identically — they're moved, not + altered. +- Import-order: `app.include_router` must run after `app` is created; place the + mount next to the existing `include_router` calls. + +## 6. Test plan + +- The existing chat + streaming tests are the behavior-preserving net: + `tests/test_chat_stream*.py`, `tests/test_skill_tag*.py`, + `tests/test_dashboard*.py`, `tests/test_step_budget*.py`, + `tests/test_*escalat*` (exact set enumerated at implementation start). **All + must pass unchanged** — no test edits beyond import paths. +- Add `tests/test_routes_chat_smoke.py`: assert `routes.chat.router` exposes + `/api/chat` and that `chat_completion` is importable from the new module. +- Full suite green before/after (diff = same pass count). + +## 7. Open decisions (need sign-off) + +**Decision A — shared module-level state:** +- **A1 (Recommended):** move `_autoescalate_silence_set` + lock + escalation + consts into `routes/_shared.py` (or a new `routes/_chat_state.py`); both + modules import them. Cleanest break of the circular import. +- A2: keep them in `codec_dashboard.py` and have `routes/chat.py` import the + dashboard module lazily inside the handler. Smaller diff, but re-introduces a + dashboard↔chat coupling the extraction was meant to reduce. + +**Decision B — scope of this slice:** +- **B1 (Recommended):** chat cluster only (§2). Leave the other ~3,000 LOC for + later slices. Smallest reviewable unit. +- B2: also pull the schedules/notifications routes in the same PR. Bigger blast + radius; rejected for a "first slice". + +## 8. Rollback + +Single revert of the extraction commit restores the inlined handler. No +persisted state, no schema, no API change — rollback is clean. + +## 9. Risk + +Medium. The handler is hot (every chat turn) and the streaming path is subtle. +Mitigation: behavior-preserving move only, existing stream/tag/budget tests as +the gate, helper extraction is mechanical block-cutting. Recommend implementing +on the existing `security-remediation` branch as its own commit so it can be +reverted independently of the landed fixes. diff --git a/docs/FIX9-JSONSTORE-STATE-REGISTRY-DESIGN.md b/docs/FIX9-JSONSTORE-STATE-REGISTRY-DESIGN.md new file mode 100644 index 0000000..d5ec08c --- /dev/null +++ b/docs/FIX9-JSONSTORE-STATE-REGISTRY-DESIGN.md @@ -0,0 +1,137 @@ +# FIX9 — Promote `codec_jsonstore` to the mandatory state registry + +> Follow-on design note required by `docs/SECURITY-REMEDIATION-DESIGN.md` Fix #9 +> and CLAUDE.md §11 (large multi-module surface). **No code is written until +> this note is approved.** Closes audit finding C8 (~25 ad-hoc `~/.codec/*.json` +> writers). + +**Author:** audit follow-up · **Status:** AWAITING APPROVAL + +--- + +## 1. What & why + +`codec_jsonstore.py` already provides the two correct primitives: +`atomic_write_json(path, data)` (unique tmp + flush + **fsync** + atomic +replace + 0600) and `file_lock(path)` (cross-process flock for a +read-modify-write). Per PR-4C they were meant to be the single chokepoint for +all `~/.codec/*.json` persistence. They are **not yet universal** — a survey of +HEAD finds ~30 writer sites still doing raw `json.dump(f)` (no fsync, often no +atomicity, sometimes a read-modify-write with no lock) plus 2 modules that +hand-roll their own atomic helper. + +Consequences (all observed by the audit): partial-write corruption on crash, +lost-update races between the 11 PM2 daemons, and inconsistent perms. + +**Goal:** migrate every ad-hoc writer onto the shared primitives, converge the +duplicate helpers, add a `docs/STATE-FILES.md` registry, and add a CI guard +(same shape as the Fix #10 A-12 guard) that fails when a NEW raw `json.dump` +to a `~/.codec` path is introduced. + +## 2. Inventory (from HEAD survey — implementation step 1 produces the exact line list) + +**ALREADY-SAFE** (`codec_jsonstore`): `codec_jsonstore.py` itself; the Fix #5 +sites (grants.json via `grants_lock`, both notifications.json read-modify-writes +in `codec_ask_user` now under `file_lock`); `codec_oauth_provider` fallback +(migrated in Fix #1b). + +**HAS-OWN-ATOMIC** (hand-rolled tmp+fsync+replace — durable but duplicate code, +candidates to converge): +- `codec_ask_user._atomic_write_text` (uses `json.dumps(..., default=str)`) +- `codec_agent_plan._atomic_write_json` (0600 + 0700 dir, `sort_keys=False`) + +**AD-HOC-UNSAFE** (raw `json.dump`, ~30 sites) — representative grouping: +- **Full-overwrite, low risk:** `codec_alerts.py:48`, `codec_marketplace.py:51,74`, + `codec_memory_upgrade.py:235`, `codec_proactive.py:209`, `codec_agent.py:62`, + `skills/pomodoro.py:32`, `routes/skills.py:235` (custom_triggers.json), + `routes/agents.py:180` (custom agent save). +- **Read-modify-write (NEED `file_lock`):** `routes/_shared.py:202,241` + (the shared notifications read/write — used by scheduler, heartbeat, + autopilot), `codec_heartbeat.py:404` (notifs), `codec_scheduler.py:127` + (notifications), `codec_heartbeat.py:258` (executed history). +- **Daemon state files:** `codec_imessage.py:101`, `codec_heartbeat.py:371`, + `codec_scheduler.py:40`, `codec_voice.py:161` (voice_session.json), + `codec_agent_messaging.py:98`. +- **Don't-touch / sensitive (migrate LAST, per-file sign-off):** + `codec_config.py:68,120,250` (config.json), `routes/auth.py:239,314,349` + (auth/TOTP config writes), `codec_google_auth.py:92` (Google OAuth token), + `codec_dashboard.py:652,693,3064,3375,3410` (config/schedules). + +(`codec_sandbox.py:202,205` are inside a generated wrapper string, not a live +state write — excluded. `codec_core.py` likewise emits writes as script text.) + +## 3. Migration hazards (must be handled, NOT mechanical) + +1. **`default=str`.** `atomic_write_json` does `json.dump(data, f, indent=2)` + with **no `default=str`**. Any writer currently persisting non-JSON-native + values (e.g. `datetime`, `Path`) relies on `default=str` and will raise under + a naive swap. `codec_ask_user._atomic_write_text` does this deliberately. + → **Decision A:** add an optional `default` param to `atomic_write_json`, or + require callers to pre-serialize. (Recommended: add `default=None` passthrough + so the primitive subsumes the `_atomic_write_text` use case cleanly.) +2. **`sort_keys`.** Some writers use `sort_keys=True` (memory_upgrade) for + stable diffs; the primitive uses insertion order. Where stability matters + (anything hashed/compared), preserve it. → `atomic_write_json` may need a + `sort_keys` passthrough. +3. **Read-modify-write vs overwrite.** RMW sites need `file_lock` wrapping the + load→modify→write (like Fix #5), not just `atomic_write_json`. Misclassifying + an RMW as an overwrite re-opens the lost-update race. +4. **Don't-touch zones** (config.json, oauth/Google tokens, auth) — these are + in the CLAUDE.md protected set. Migrate LAST, one file per commit, surfaced + to the operator (same protocol as Fix #1b's oauth surfacing). + +## 4. Proposed phased plan (each phase its own commit, suite green between) + +- **Phase 0 — primitive hardening:** add `default`/`sort_keys` passthrough to + `atomic_write_json` (TDD); add `file_lock`-aware `read_modify_write(path, fn)` + convenience if it reduces churn. No call-site change yet. +- **Phase 1 — full-overwrite low-risk sites** → `atomic_write_json`. Mechanical, + one commit, regression by existing per-module tests. +- **Phase 2 — read-modify-write sites** → `file_lock` + `atomic_write_json` + (notably `routes/_shared.py` notifications, heartbeat, scheduler). Add + concurrent-write no-clobber tests (Fix #5 pattern). +- **Phase 3 — converge duplicate helpers:** replace `codec_ask_user. + _atomic_write_text` and `codec_agent_plan._atomic_write_json` bodies with calls + to the (now `default=`-aware) `codec_jsonstore` primitive; keep the thin + wrappers as named shims so call sites don't churn. +- **Phase 4 — don't-touch zones**, one file per commit, operator sign-off each + (config.json, Google token, auth writes). +- **Phase 5 — guardrails:** `docs/STATE-FILES.md` registry (path → writer → + reader → lock policy); `tests/test_jsonstore_invariant.py` CI guard that fails + on a NEW raw `json.dump`/`write_text(json.dumps(...))` targeting a `~/.codec` + path outside an allowlist (mirrors the Fix #10 A-12 guard). + +## 5. Test plan + +- Phase 0: unit tests for `atomic_write_json` `default`/`sort_keys` + fsync + (extend existing jsonstore tests). +- Phases 1-4: each migrated file keeps its existing tests green; RMW sites add a + concurrent-writer no-clobber test. +- Phase 5: the invariant guard test + a synthetic-violation test (proves it + catches a new raw writer), exactly like `tests/test_a12_invariant.py`. + +## 6. Rollback + +Per-phase, per-file reverts. No schema changes (same JSON shapes, same paths, +same-or-better perms). The only behavior change is *durability + atomicity*, +which is strictly safer. + +## 7. Open decisions (need sign-off) + +- **Decision A — primitive signature:** add `default=` + `sort_keys=` + passthrough to `atomic_write_json` (Recommended), vs. require callers to + pre-serialize to text. The former lets Phase 3 fully retire the duplicate + helpers. +- **Decision B — scope of this fix:** Phases 0-3 + 5 now, Phase 4 (don't-touch + zones) deferred to a separate sign-off (Recommended), vs. all phases in one + PR. Phase 4 touches protected files and should not be bundled. +- **Decision C — `read_modify_write` helper:** add a `codec_jsonstore. + read_modify_write(path, mutate_fn)` convenience (lock + load + mutate + atomic + write) to standardize RMW sites, vs. inline `file_lock` at each (as Fix #5 + did). A helper reduces the chance a future RMW forgets the lock. + +## 8. Risk + +Medium-high by surface area, low per-site. The phasing keeps each commit small +and independently revertible; the don't-touch zones are quarantined to Phase 4 +with explicit sign-off. The Phase 5 guard prevents regression once migrated. diff --git a/docs/REAUDIT-CONSENT-GATE-DESIGN.md b/docs/REAUDIT-CONSENT-GATE-DESIGN.md new file mode 100644 index 0000000..77cabf9 --- /dev/null +++ b/docs/REAUDIT-CONSENT-GATE-DESIGN.md @@ -0,0 +1,123 @@ +# RE-AUDIT — strict-consent gate on the chat / MCP skill paths + +> Design note for the re-audit's cross-cutting MEDIUM (red-team CHAIN-001/002/006). +> CLAUDE.md §11 gate: **no code written until this is approved** — it's a +> >1-module change that alters chat/MCP UX. Design-only. + +**Author:** re-audit follow-up · **Status:** AWAITING APPROVAL + +--- + +## 1. Problem + +The Step-3 strict-consent gate (literal-verb confirmation for destructive ops, +`docs/PHASE1-STEP3-DESIGN.md §1.7`) is wired **only into `codec_agent_runner`**. +The three *other* execution paths that can reach the same high-power skills go +through chokepoints with **no consent gate**: + +| Path | Chokepoint | Current guard | +|---|---|---| +| Chat pre-LLM hijack | `codec_dashboard._try_skill` → `codec_dispatch.run_skill` (codec_dashboard.py:2452-2455) | allowlist + `is_dangerous` (terminal only) | +| Chat post-LLM `[SKILL:]` tag | `_try_skill_by_name` → `codec_dispatch.run_skill` (2473-2475) | allowlist + `is_dangerous` | +| MCP tool call | `codec_mcp.tool_fn` invoke closure → `registry.load().run()` (codec_mcp.py:218-221) | `_HTTP_BLOCKED` only | + +So a **prompt-injected** chat message / MCP tool call (from web_fetch'd content, +an email summary, a malicious doc claude.ai reads) can fire `terminal`, +`file_ops`, `pilot`, `imessage_send`, etc. with only `is_dangerous` (which +CLAUDE.md explicitly calls "a heuristic / typo-catcher, NOT a complete security +boundary") or a path blocklist in the way. (`skill_forge`/`python_exec` are +already off the chat allowlist; this is about the rest.) + +## 2. Chokepoints to gate + +- **`codec_dispatch.run_skill(skill, task, app)`** — covers chat (both paths), + voice, wake-word, session, triggers. ONE function. +- **`codec_mcp.tool_fn`** — covers MCP stdio + HTTP. Separate. + +A shared `codec_consent.gate(tool_name, task, *, transport) -> ConsentDecision` +called at the top of both, so the policy lives in one module. + +## 3. The hard part — consent UX differs by transport + +`codec_ask_user.ask(destructive=True, …)` **blocks the worker thread** on a +`threading.Event` until the user answers via PWA/voice. That's fine for the +agent-runner (background) and voice (announce-and-listen), but: + +- **Chat** is a *synchronous HTTP request*. Blocking it for consent hangs the + chat turn for up to the timeout — poor UX, and the user may not be looking at + the PWA consent panel. +- **MCP** — claude.ai (the caller) can't perform operator-grade (Touch ID/PIN) + consent, and blocking the tool call stalls the connector. + +So the gate must be **per-transport**, not one-size. + +## 4. Proposed policy (per transport) + +- **MCP (`tool_fn`)** → **hard-refuse** destructive skills (return a clear + "not permitted over MCP" string), NOT a prompt. Most are already in + `_HTTP_BLOCKED`; this extends the *principle* to the full destructive set + (file_ops-write, file_write, imessage_send, pilot, ax_control). claude.ai + brings its own context and cannot consent at the operator tier. +- **Chat** → consent required before a destructive skill fires. Two UX options + (Decision A). +- **Voice** → reuse `ask_user` announce-and-listen (already works on the agent + path); the voice WS is now authenticated (re-audit N1). + +## 5. Defining "destructive" + +Today `codec_ask_user._is_destructive_tool` = membership in `_HTTP_BLOCKED` +(`python_exec, terminal, process_manager, pm2_control, ax_control`). That misses +`file_ops`(write/delete), `file_write`, `imessage_send`, `pilot`, `skill_forge`. +→ **Decision C**: a per-skill `SKILL_DESTRUCTIVE = True` module flag +(AST-extracted by `codec_skill_registry`, like `SKILL_MCP_EXPOSE`) — extensible, +self-documenting — vs a central `DESTRUCTIVE_SKILLS` set in `codec_config`. + +## 6. Open decisions (need sign-off) + +**Decision A — chat consent UX:** +- **A1 (Recommended):** *return a `consent_required` response* (don't block) — + the chat UI renders a confirm affordance; on confirm the client re-dispatches + with a short-lived consent token. Clean for synchronous chat; needs a small + `codec_dashboard.html` + chat-handler change. +- A2: *block-and-prompt* — call `ask_user.ask` from the chat handler (PWA + AskUserQuestion panel already polls every 8s), worker thread waits with a + short timeout. Smaller code change; worse UX (hangs the turn). + +**Decision B — MCP destructive policy:** +- **B1 (Recommended):** hard-refuse the full destructive set over MCP (extend + the `_HTTP_BLOCKED` principle to a `DESTRUCTIVE`-aware refusal in `tool_fn`). +- B2: out-of-band PWA consent (the MCP call returns "pending", the operator + approves in the PWA, claude.ai retries) — much more machinery; likely overkill. + +**Decision C — destructive classification:** per-skill `SKILL_DESTRUCTIVE` flag +(Recommended) vs central set. + +**Decision D — `is_dangerous` interplay:** keep `is_dangerous` as the +command-content heuristic for `terminal` AND layer the skill-level consent gate +on top (defense in depth) — Recommended. Don't remove `is_dangerous`. + +**Decision E — scope:** gate `run_skill` (chat) + `tool_fn` (MCP) now; leave +voice on the existing `ask_user` path. Or include a voice allowlist too. + +## 7. Test plan + +- destructive skill via chat post-LLM tag → consent required (A1: returns + `consent_required`, skill NOT run until token; A2: `ask_user` invoked). +- destructive skill via MCP `tool_fn` → refused (B1), audit `mcp_destructive_blocked`. +- non-destructive skill (weather, calculator) → unaffected on both paths. +- consent token round-trip re-dispatches and runs once (A1). +- kill switch env var disables the gate (parity with other Step-3 switches). +- `is_dangerous` still fires for terminal (Decision D layering). + +## 8. Rollback / risk + +Per-chokepoint revert + a `CONSENT_GATE_ENABLED` kill switch. Risk: UX +regression (over-prompting) — mitigated by the per-skill `SKILL_DESTRUCTIVE` +flag (only flagged skills prompt) and the per-transport policy (MCP refuses, +chat prompts, voice announces). Behavior for non-destructive skills is unchanged. + +## 9. What this does NOT do + +- Does not change the agent-runner consent (already correct). +- Does not alter `_HTTP_BLOCKED` membership (the gate is additive). +- Does not weaken `is_dangerous` or the skill load-time AST gate. diff --git a/docs/STATE-FILES.md b/docs/STATE-FILES.md new file mode 100644 index 0000000..0cc94da --- /dev/null +++ b/docs/STATE-FILES.md @@ -0,0 +1,66 @@ +# CODEC State-File Registry + +> Canonical map of every `~/.codec/*.json` (and related) state file: who writes +> it, who reads it, and its persistence/locking policy. Introduced by Fix #9 +> (audit C8) to make the `codec_jsonstore` convergence trackable. + +## Persistence policy levels + +- **SAFE** — writes via `codec_jsonstore.atomic_write_json` / `file_lock` / + `read_modify_write` (tmp + fsync + atomic replace + 0600; cross-process lock + for read-modify-write). This is the target for every state file. +- **OWN-ATOMIC** — a module-local hand-rolled tmp+fsync+replace helper + (durable, but duplicates the primitive; Phase 3 converges these). +- **AD-HOC** — raw `json.dump(f)` / `write_text(json.dumps(...))`: no fsync, + often no cross-process lock. Phase 1/2 migration targets. + +## Registry + +| File | Writer(s) | RMW? | Policy | Status | +|---|---|---|---|---| +| `notifications.json` | `routes/_shared._save_notification`, `_write_notifications`; `codec_ask_user` (question + mark-read); `codec_agent_messaging.post_message` | yes | **SAFE** | `_write_notifications` atomic (C-3); RMW under `file_lock` — `_save_notification` (Fix #9 P2), ask_user (Fix #5), messaging (B-11) | +| `pending_questions.json` | `codec_ask_user._save_pending_questions` | yes | OWN-ATOMIC (+`file_lock`) | RMW already under `file_lock` (C-4). Helper `_atomic_write_text` → converge in Phase 3. **Don't-touch zone.** | +| `agents//grants.json` | `codec_agent_plan.save_grants` (+ `grants_lock`) | yes | OWN-ATOMIC (+`file_lock`) | RMW under `grants_lock` (Fix #5). Helper `_atomic_write_json` → Phase 3. **Tamper-hashed.** | +| `agents//{plan,state,manifest}.json` | `codec_agent_plan.save_*` (manifest CAS under `_status_lock`) | manifest yes | OWN-ATOMIC | `_atomic_write_json` (0600/0700). Phase 3 convergence. **Don't-touch after approval.** | +| `agents//messages.jsonl` | `codec_agent_messaging.post_message` | append | n/a (append) | append-only JSONL, not a JSON doc | +| `agent_silence.json` | `codec_agent_messaging.set_silenced` | yes | review in Phase 2 | per-agent silence | +| `oauth_state.json` | `codec_oauth_provider._save` (fallback) | no | **SAFE** | `atomic_write_json` (Fix #1b). Keychain-primary. **Don't-touch zone.** | +| `audit.log` | `codec_audit` | append | bespoke (flock + HMAC) | PR-4E/PR-2E — its own contract, NOT json.dump. **Don't-touch zone.** | +| `config.json` | `codec_config`, `routes/auth`, `codec_dashboard` | yes | AD-HOC | **Phase 4** (don't-touch: holds migrated-out secrets, auth/TOTP). Per-file sign-off. | +| `custom_triggers.json` | `routes/skills.save_triggers` | yes | AD-HOC | Phase 1/2 | +| `triggers.json` / `triggers_killed.json` | `codec_triggers` | yes | review | Phase 2 | +| `schedules.json` | `codec_scheduler`, `codec_dashboard` | yes | AD-HOC | Phase 1/2 | +| `voice_session.json` | `codec_voice` | no | AD-HOC | Phase 1 | +| `.json` | `codec_google_auth` | no | AD-HOC | **Phase 4** (don't-touch: OAuth token). | +| `.marketplace.json` | `codec_marketplace` | yes | AD-HOC | Phase 1 | +| daemon state (`codec_alerts`, `codec_heartbeat`, `codec_imessage`, `codec_proactive`, `codec_agent_messaging`) | respective module | mixed | AD-HOC | Phase 1 | +| `pomodoro` state | `skills/pomodoro` | no | AD-HOC | Phase 1 | +| E2E keys / auth sessions (`routes/_shared`) | `_save_e2e_keys`, `_save_sessions` | no | AD-HOC (0600) | Phase 1 (sensitive — review) | + +## Migration backlog (Fix #9 follow-on) + +Done in the current PR: **Phase 0** (primitive hardening: `atomic_write_json` +`default=`/`sort_keys=`, new `read_modify_write`) and **Phase 2** +(`_save_notification` cross-process `file_lock`). + +Remaining, tracked for follow-on commits: +- **Phase 1** — migrate the AD-HOC full-overwrite writers above to + `atomic_write_json` (mechanical durability upgrade; one batch). +- **Phase 3** — converge the OWN-ATOMIC duplicate helpers + (`codec_ask_user._atomic_write_text`, `codec_agent_plan._atomic_write_json`) + onto the now-`default=`-aware `codec_jsonstore.atomic_write_json`, keeping the + named helpers as thin shims. NOTE: these write don't-touch files + (`pending_questions.json`, agent `grants/manifest`), so convergence is + byte-output-preserving and reviewed per-helper. +- **Phase 4** — the **don't-touch** files (`config.json`, Google/OAuth tokens, + auth writes): migrate last, one file per commit, with operator sign-off + (same protocol as Fix #1b's `oauth_state.json` surfacing). + +## Why no broad "raw json.dump" CI guard + +Considered (the Fix #10 A-12 guard is the model) but rejected: `json.dump` has +many legitimate non-state uses, so a repo-wide ratchet is low-signal / +high-false-positive. Regression prevention instead relies on (a) this registry, +(b) the existing source-level guards `test_json_write_safety.test_notifications_writer_atomic` +and `test_ask_user_uses_file_lock`, and (c) per-RMW concurrency tests added with +each migration phase. diff --git a/requirements.txt b/requirements.txt index 41baf9c..2d96956 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,18 @@ fastmcp>=3.3.1 pyotp>=2.9.0 qrcode[pil]>=8.0 +# Security floors for transitive / runtime deps (Fix #6, re-audit dependency +# pass). These pin the advisory-safe minor so a fresh `pip install -r` is +# patched. NONE is actively exploitable in CODEC's usage (fastmcp's SSRF is in +# the OpenAPIProvider, which CODEC doesn't use; pillow's CVEs are in image +# PARSING and CODEC only generates QR codes; urllib3's need the low-level +# ProxyManager / a malicious server vs the SSRF-guarded web_fetch) — this is +# hygiene, keeping the dep tree current. The deployed interpreters were found +# STALE vs these floors; refresh them (see docs note) at a maintenance window. +urllib3>=2.7.0 # CVE-2026-44431 / -44432 (transitive via requests) +pillow>=12.2.0 # CVE-2026-42311 + others (transitive via qrcode[pil]) +cryptography>=46.0.7 # CVE-2026-39892 / -34073 (routes/auth + codec_license) + # Optional: TTS (Kokoro 82M — Apple Silicon) # mlx-audio misaki num2words phonemizer-fork spacy diff --git a/routes/_shared.py b/routes/_shared.py index e55720f..ab1ee37 100644 --- a/routes/_shared.py +++ b/routes/_shared.py @@ -129,7 +129,12 @@ def _save_notification(title, body, status="success", schedule_id=None): "read": False, "schedule_id": schedule_id } - with _notif_lock: + # Fix #9: the notification writers run in separate PM2 daemons (dashboard, + # scheduler, heartbeat, autopilot) that don't share _notif_lock. Hold the + # cross-process file_lock across the load->insert->write so they can't + # clobber each other (matches the codec_ask_user notification pairing). + import codec_jsonstore + with _notif_lock, codec_jsonstore.file_lock(NOTIFICATIONS_PATH): notifications = _load_notifications() notifications.insert(0, notif) _write_notifications(notifications) @@ -268,15 +273,19 @@ def _is_totp_enabled(): return False -def _verify_biometric_session(request): - """Check if the request has a valid auth session cookie.""" - if not AUTH_ENABLED or not _auth_available(): - return True - token = request.cookies.get(AUTH_COOKIE_NAME) +def _session_token_valid(token) -> bool: + """Core session validity: the token exists, is unexpired, and is + TOTP-verified when TOTP is enabled. Shared by the cookie path + (_verify_biometric_session) AND the ?s= query-param fallback in + AuthMiddleware — re-audit N5: the ?s= path previously checked only + existence + age, skipping the TOTP-verified gate, so a pre-TOTP token could + bypass 2FA via ?s= on any GET /api endpoint.""" + if not token: + return False with _auth_lock: - if not token or token not in _auth_sessions: + session = _auth_sessions.get(token) + if not session: return False - session = _auth_sessions[token] if datetime.now() - session["created"] > timedelta(hours=AUTH_SESSION_HOURS): del _auth_sessions[token] _save_sessions() @@ -286,6 +295,13 @@ def _verify_biometric_session(request): return True +def _verify_biometric_session(request): + """Check if the request has a valid auth session cookie.""" + if not AUTH_ENABLED or not _auth_available(): + return True + return _session_token_valid(request.cookies.get(AUTH_COOKIE_NAME)) + + # ── Database helpers ── # M-5 (PR-4J): one SQLite connection PER THREAD instead of a single global shared @@ -332,6 +348,7 @@ def _close_all_db_conns(): _pending_skills: dict = {} _research_jobs: dict = {} +_research_jobs_lock = threading.Lock() # re-audit N9: guards add/del of _research_jobs _agent_jobs: dict = {} _agent_jobs_lock = threading.Lock() # guards structural add/del of _agent_jobs (H-4) _AGENT_JOB_TTL_SECONDS = 86400 # evict terminal jobs older than 24h (H-4) @@ -367,6 +384,33 @@ def _evict_stale_agent_jobs(now=None, ttl_seconds: int = _AGENT_JOB_TTL_SECONDS) return removed +def _evict_stale_research_jobs(now=None, ttl_seconds: int = _AGENT_JOB_TTL_SECONDS) -> int: + """re-audit N9: mirror _evict_stale_agent_jobs for _research_jobs, which + previously had NO lock and NO eviction → unbounded growth (memory leak → + max_memory_restart) plus a 'dict changed size during iteration' race + against the deep_research worker. Snapshot-iterates under + `_research_jobs_lock`; never evicts a `running` job; keeps entries with a + missing/unparseable `started`. Returns the number evicted. Never raises.""" + now = now if now is not None else datetime.now() + removed = 0 + try: + with _research_jobs_lock: + for jid, job in list(_research_jobs.items()): + if not isinstance(job, dict) or job.get("status") == "running": + continue + started = job.get("started", "") + try: + age = (now - datetime.fromisoformat(started)).total_seconds() + except (ValueError, TypeError): + continue # unparseable timestamp → keep + if age > ttl_seconds: + _research_jobs.pop(jid, None) + removed += 1 + except Exception as e: + log.warning("research-job eviction failed: %s", e) + return removed + + # ── Remote command approval (dashboard/phone) ── _pending_approvals: dict = {} # {approval_id: {command, action, is_dangerous, explanation, timestamp, status}} _approval_lock = threading.Lock() diff --git a/routes/agents.py b/routes/agents.py index d37cb53..abf04e2 100644 --- a/routes/agents.py +++ b/routes/agents.py @@ -13,6 +13,7 @@ from routes._shared import ( _research_jobs, _agent_jobs, _AGENTS_DIR, _agent_jobs_lock, _evict_stale_agent_jobs, + _research_jobs_lock, _evict_stale_research_jobs, ) router = APIRouter() @@ -27,17 +28,25 @@ async def deep_research_start(request: Request): return JSONResponse({"error": "Topic too short"}, status_code=400) job_id = str(uuid.uuid4())[:8] - _research_jobs[job_id] = {"status": "running", "topic": topic, "started": datetime.now().isoformat()} + # re-audit N9: evict stale jobs + guard the add/update under the lock so the + # dict can't grow unbounded and the worker write can't race the insert. + _evict_stale_research_jobs() + with _research_jobs_lock: + _research_jobs[job_id] = {"status": "running", "topic": topic, "started": datetime.now().isoformat()} async def _run_async(): try: from codec_agents import run_crew result = await run_crew("deep_research", topic=topic) - _research_jobs[job_id].update(result) + with _research_jobs_lock: + if job_id in _research_jobs: + _research_jobs[job_id].update(result) except Exception as e: import traceback; traceback.print_exc() - _research_jobs[job_id]["status"] = "error" - _research_jobs[job_id]["error"] = str(e) + with _research_jobs_lock: + if job_id in _research_jobs: + _research_jobs[job_id]["status"] = "error" + _research_jobs[job_id]["error"] = str(e) asyncio.create_task(_run_async()) return {"job_id": job_id, "status": "running", "topic": topic} @@ -176,8 +185,9 @@ async def save_custom_agent(request: Request): status_code=409, ) path = os.path.join(_AGENTS_DIR, safe_id + ".json") - with open(path, "w") as f: - json.dump({**body, "id": safe_id}, f, indent=2) + # re-audit medium: atomic write (was truncate-then-write, racing readers). + import codec_jsonstore + codec_jsonstore.atomic_write_json(path, {**body, "id": safe_id}) return {"saved": True, "id": safe_id, "path": path} except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @@ -554,13 +564,17 @@ def grant_permission(agent_id: str, body: GrantBody, request: Request = None): detail=f"refused: '{body.value}' is a blocked, traversal, or over-broad path grant", ) - grants = _cap.load_grants(agent_id) - if not grants: - raise HTTPException(status_code=409, detail="agent has no grants yet (not approved?)") - - grants[body.kind] = sorted(set(grants.get(body.kind, []) + [body.value])) - _cap.save_grants(agent_id, grants) - _cap.set_grants_hash(agent_id) # B-4: keep the tamper hash in sync with the legit grant + # C5 (Fix #5): hold the cross-process flock across the whole + # load -> modify -> save -> hash-sync so two concurrent /grant calls can't + # read the same grants, each add one value, and clobber each other's write. + with _cap.grants_lock(agent_id): + grants = _cap.load_grants(agent_id) + if not grants: + raise HTTPException(status_code=409, detail="agent has no grants yet (not approved?)") + + grants[body.kind] = sorted(set(grants.get(body.kind, []) + [body.value])) + _cap.save_grants(agent_id, grants) + _cap.set_grants_hash(agent_id) # B-4: keep the tamper hash in sync with the legit grant # If blocked, unblock if manifest.get("status") == "blocked_on_permission": diff --git a/routes/auth.py b/routes/auth.py index 12019a0..c4acafa 100644 --- a/routes/auth.py +++ b/routes/auth.py @@ -1,6 +1,7 @@ """CODEC Dashboard — Auth routes (biometric, PIN, TOTP, E2E key exchange).""" import os import json +import hmac import secrets import time import subprocess @@ -131,14 +132,14 @@ async def auth_pin(request: Request): return JSONResponse({"error": f"Too many failed attempts. Locked out for {remaining}s."}, status_code=429) try: - if pin_hash == AUTH_PIN_HASH: + if hmac.compare_digest(pin_hash, AUTH_PIN_HASH): _audit_write(f"[{datetime.now().isoformat()}] AUTH_SUCCESS: method=pin ip={client_ip}\n") else: _audit_write(f"[{datetime.now().isoformat()}] AUTH_FAILED: method=pin error=wrong_pin ip={client_ip}\n") except Exception: pass - if pin_hash == AUTH_PIN_HASH: + if hmac.compare_digest(pin_hash, AUTH_PIN_HASH): method = "pin" log_event("auth_success", "codec-auth", f"Auth success: {method}", extra={"method": method}) _pin_attempts.pop(client_ip, None) diff --git a/routes/skills.py b/routes/skills.py index 0cc4041..d7fc941 100644 --- a/routes/skills.py +++ b/routes/skills.py @@ -26,6 +26,26 @@ router = APIRouter() +def _pinned_builtin_names(): + """Basenames (e.g. 'calculator.py') of hash-pinned built-in skills, read + from the committed /skills/.manifest.json. An approved user skill must + never take one of these names: doing so shadows the trusted built-in (or, + if the write dir is the repo skills dir, overwrites the hash-pinned file). + Fix #7b / H2·H6. Returns a lowercased set on success, or **None on any read + failure** so the caller can FAIL CLOSED (re-audit N20: returning an empty + set let the approve guard block nothing during a transient manifest read + failure). Lowercased so the guard holds on case-insensitive filesystems.""" + try: + repo_skills = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "skills" + ) + with open(os.path.join(repo_skills, ".manifest.json"), encoding="utf-8") as f: + data = json.load(f) + return {str(n).lower() for n in (data.get("skills") or {}).keys()} + except Exception: + return None + + @router.post("/api/skill/review") async def skill_review(request: Request): """Stage LLM-generated skill code for human review -- does NOT write to disk.""" @@ -54,6 +74,33 @@ async def skill_approve(request: Request): filename = pending["filename"] if "SKILL_DESCRIPTION" not in code or "def run(" not in code: return JSONResponse({"error": "Invalid skill: must contain SKILL_DESCRIPTION and def run()"}, status_code=400) + # Fix #7b (H2/H6): never let an approved skill take a hash-pinned built-in's + # name — that would shadow (or overwrite) the trusted built-in. The PR-1A + # load-time hash/AST gate is the last line of defense; this refuses at the + # write point so the trusted file is never displaced in the first place. + # re-audit N20: FAIL CLOSED — _pinned_builtin_names() returns None if the + # manifest can't be read, in which case we refuse rather than allow (the old + # empty-set return blocked nothing during a transient read failure). + _pinned = _pinned_builtin_names() + if _pinned is None or filename.lower() in _pinned: + _reason = ("manifest_unreadable" if _pinned is None + else "pinned_builtin_overwrite") + try: + from codec_audit import log_event + log_event( + "skill_approve_blocked", source="codec-routes-skills", + message=f"refused approve of {filename}: {_reason}", + level="warning", outcome="denied", + extra={"filename": filename, "reason": _reason}, + ) + except Exception: + pass + _msg = ( + "Refused: cannot verify the protected built-in manifest." + if _pinned is None + else f"Refused: '{filename}' is a protected built-in skill name and cannot be overwritten" + ) + return JSONResponse({"error": _msg}, status_code=400) from codec_config import is_dangerous_skill_code dangerous, reason = is_dangerous_skill_code(code) if dangerous: @@ -192,7 +239,9 @@ async def save_triggers(request: Request): elif triggers is None: # Reset to default custom.pop(skill_name, None) - os.makedirs(os.path.dirname(CUSTOM_TRIGGERS_PATH), exist_ok=True) - with open(CUSTOM_TRIGGERS_PATH, "w") as f: - json.dump(custom, f, indent=2) + # re-audit medium: atomic write so a concurrent reader (SkillRegistry / + # _load_custom_triggers) can't catch a truncated file mid-write and silently + # fall back to {} (wiping all custom triggers for that read). + import codec_jsonstore + codec_jsonstore.atomic_write_json(CUSTOM_TRIGGERS_PATH, custom) return {"status": "saved", "custom_count": len(custom)} diff --git a/routes/websocket.py b/routes/websocket.py index 47dfe9d..9e6b8c5 100644 --- a/routes/websocket.py +++ b/routes/websocket.py @@ -1,4 +1,5 @@ """CODEC Dashboard -- WebSocket routes (voice pipeline).""" +import hmac import os from fastapi import APIRouter, WebSocket, WebSocketDisconnect @@ -9,6 +10,55 @@ router = APIRouter() +def _ws_authorized(websocket) -> bool: + """N1 (re-audit): mirror the HTTP AuthMiddleware gate for the WS handshake. + + BaseHTTPMiddleware (AuthMiddleware's base) only runs on the `http` scope, + NOT `websocket` — so /ws/voice was unauthenticated. This replicates the + HTTP Layers 0/1/2: open when nothing is configured (loopback dev posture); + else accept a matching dashboard_token (via `?token=` or an Authorization: + Bearer header) OR a valid biometric session cookie (TOTP-verified per + _verify_biometric_session). Never raises — returns False on any error. + """ + # Imported lazily so test monkeypatches of these module attrs take effect. + from routes._shared import ( + AUTH_ENABLED, + _auth_available, + _verify_biometric_session, + ) + try: + from codec_config import get_dashboard_token + token = get_dashboard_token() or "" + except Exception: + token = "" + biometric = bool(AUTH_ENABLED) and bool(_auth_available()) + + # Layer 0 — nothing configured → open (loopback-only dev; the startup + # safety gate refuses a public bind without a token or auth). + if not token and not biometric: + return True + + # Layer 1 — dashboard_token bearer, presented as ?token= or Authorization. + if token: + presented = websocket.query_params.get("token", "") or "" + if not presented: + hdr = websocket.headers.get("authorization", "") or "" + if hdr.lower().startswith("bearer "): + presented = hdr[7:] + if presented and hmac.compare_digest(presented, token): + return True + + # Layer 2 — biometric/PIN session cookie (TOTP-enforced inside the helper). + if biometric: + try: + if _verify_biometric_session(websocket): + return True + except Exception: + return False + + return False + + @router.get("/voice", response_class=HTMLResponse) async def voice_page(): """Serve the voice call UI.""" @@ -21,6 +71,12 @@ async def voice_page(): async def voice_websocket(websocket: WebSocket): """WebSocket endpoint -- one VoicePipeline per connection. Pass ?resume= to resume a dropped session.""" + # N1 (re-audit): authenticate the handshake — AuthMiddleware can't (it's + # HTTP-scope only). Reject before accept() so the voice→skill pipeline is + # never reachable unauthenticated when the dashboard is exposed. + if not _ws_authorized(websocket): + await websocket.close(code=4401) # 4401 = application "Unauthorized" + return await websocket.accept() from codec_metrics import metrics metrics.inc("codec_voice_sessions_total") diff --git a/skills/.manifest.json b/skills/.manifest.json index 58fa776..79497f0 100644 --- a/skills/.manifest.json +++ b/skills/.manifest.json @@ -29,9 +29,9 @@ "create_skill.py": "28070280abfe2179d5c9b33eee74b4db5a5dd085f76a09f02d516302ba330036", "delegate.py": "7c595d5605cd9913a8afa331ae0013861d6996f378f8a59aca0249f1b2f3a474", "fact_extract.py": "a43ed03b8c51704415f4135de46a44e097f757305af3921dd389b8dfd0540906", - "file_ops.py": "bf90094ba87b08caf8e35820bf7397883662ce06ebf3e6b3b5da4dd493afa812", + "file_ops.py": "890f581c891a2c89dbd51177cf7b8152dba224a590a7f526737e9003f9046dce", "file_search.py": "c2667fdb35e8576a48180d616f934037cc391f93e88ec5c8612815233c868147", - "file_write.py": "5ad88f9996192b558f9e121793603b3c49d3a89d345fce7eaf1317b569c90b88", + "file_write.py": "0c7a91354464c7aceb3af2b6ee8ba903093ba203b9f810f1de86edb5f1eaf7a6", "google_calendar.py": "7ef8c9a7fd02a5c2b52c7ad09094bda0c15851360345c5cb110cfd032ef9a562", "google_docs.py": "75980457cb9304e970e9dcaaa12e2acca51559344d2f022896260a6a884f8318", "google_drive.py": "29ce91f3c5bbb43f67a18268c318fddab98627c4c879f5d76d2faa519274307c", @@ -76,7 +76,7 @@ "tts_say.py": "bbc8828d73f0fa4ebb935919ffd0e64f706362bf715fe86851046e4527f62c92", "volume.py": "cbd20b36ee7d4010a4a4d13829b2d7e96e5123bd112ccd5a17fe18ab85c18429", "weather.py": "d33a479119a23a26a485df230440a7e4e1beb04bb0bb68bd2a7444e3504a6b2e", - "web_fetch.py": "e305aab7590a48b37a6f7fc170e76f471851dc69a043620725674758fed72f2a", + "web_fetch.py": "cd6e2b7565bc3f486faeabdf6d338f5fd052d6cde0e5c8d7625b3b630f987859", "web_search.py": "49023eef091cbedb7e099ad7a545e02eed68676f5c0de0e168625d51914d6462" } } diff --git a/skills/file_ops.py b/skills/file_ops.py index fd20904..347d7eb 100644 --- a/skills/file_ops.py +++ b/skills/file_ops.py @@ -91,14 +91,15 @@ def _is_safe_path(path): Emits `file_ops_blocked` on refusal.""" requested = path expanded = os.path.expanduser(path) - # The file may not exist yet (write/append) — realpath the parent and - # re-join the basename so we still resolve symlinked parents. - parent = os.path.dirname(expanded) or "." + # re-audit N10: realpath the FULL path so a symlinked FINAL component + # resolves to its (blocked) target. The old parent-only realpath let a + # symlinked basename (x.json -> ~/.codec/oauth_state.json) slip past the + # blocklist and open() then read/wrote through it. realpath handles a + # not-yet-existing file (resolves the existing prefix, appends the rest). try: - real_parent = os.path.realpath(parent) + real = os.path.realpath(expanded) except Exception: - real_parent = parent - real = os.path.join(real_parent, os.path.basename(expanded)) + real = expanded for bp in _BLOCKED_ROOTS_REAL: if real == bp or real.startswith(bp + os.sep): @@ -194,16 +195,21 @@ def run(task, app="", ctx=""): if not safe: return reason + # re-audit N4/N10: operate on the realpath-resolved target (the same path + # _is_safe_path validated) so a symlinked final component can't redirect the + # read/write between check and open. + target = os.path.realpath(os.path.expanduser(path)) + if action == "read": - if not os.path.exists(path): + if not os.path.exists(target): return f"File not found: {path}" - if not os.path.isfile(path): + if not os.path.isfile(target): return f"Not a file: {path}" try: - size = os.path.getsize(path) + size = os.path.getsize(target) if size > 1_000_000: return f"File too large ({size:,} bytes). Max 1 MB for safety." - with open(path, "r", encoding="utf-8", errors="replace") as f: + with open(target, "r", encoding="utf-8", errors="replace") as f: content = f.read(_MAX_READ) truncated = " (truncated)" if len(content) >= _MAX_READ else "" return f"File: {path} ({size:,} bytes){truncated}\n\n{content}" @@ -222,7 +228,7 @@ def run(task, app="", ctx=""): if len(content) > _MAX_WRITE: return f"Content too large ({len(content):,} chars). Max {_MAX_WRITE:,}." # Ensure parent directory exists - parent = os.path.dirname(path) + parent = os.path.dirname(target) if parent and not os.path.exists(parent): try: os.makedirs(parent, exist_ok=True) @@ -230,7 +236,7 @@ def run(task, app="", ctx=""): return f"Cannot create directory {parent}: {e}" try: mode = "a" if action == "append" else "w" - with open(path, mode, encoding="utf-8") as f: + with open(target, mode, encoding="utf-8") as f: f.write(content) verb = "Appended to" if action == "append" else "Written to" return f"{verb} {path} ({len(content)} chars)" diff --git a/skills/file_write.py b/skills/file_write.py index c710b5a..5caa569 100644 --- a/skills/file_write.py +++ b/skills/file_write.py @@ -130,14 +130,16 @@ def _is_safe_target(path: str): if not path: return False, "Empty path." expanded = os.path.expanduser(path) - # If parent exists, realpath the parent and append basename — the file - # itself may not exist yet, so we can't realpath(path) directly. - parent = os.path.dirname(expanded) or "." + # re-audit N4: realpath the FULL path so a symlinked FINAL component is + # resolved too. The old parent-only realpath (realpath(parent) + raw + # basename) let ~/Documents/x.md -> a symlink into ~/.codec / ~/.zshrc slip + # past the blocklist, and open() then followed it (write-through). realpath + # handles a not-yet-existing file: it resolves the existing path prefix and + # appends the remaining (new) components literally. try: - real_parent = os.path.realpath(parent) + real_path = os.path.realpath(expanded) except Exception: - real_parent = parent - real_path = os.path.join(real_parent, os.path.basename(expanded)) + real_path = expanded # Filename + extension checks apply globally, regardless of directory. base_lower = os.path.basename(real_path).lower() @@ -319,8 +321,14 @@ def run(task: str, context: str = "") -> str: mode_label = _extract_mode(task) fmode = "a" if mode_label == "append" else "w" + # re-audit N4: write to the realpath-resolved target (the same path + # _is_safe_target validated) so a symlinked final component can't redirect + # the write between check and open, and the write lands on the validated + # file rather than following the symlink. + target = os.path.realpath(os.path.expanduser(path)) + # Ensure parent dir exists. - parent = os.path.dirname(path) + parent = os.path.dirname(target) if parent and not os.path.exists(parent): try: os.makedirs(parent, exist_ok=True) @@ -328,7 +336,7 @@ def run(task: str, context: str = "") -> str: return f"file_write: cannot create directory {parent}: {e}" try: - with open(path, fmode, encoding="utf-8") as f: + with open(target, fmode, encoding="utf-8") as f: f.write(content) except PermissionError as e: return f"file_write: permission denied for {path}: {e}" diff --git a/skills/web_fetch.py b/skills/web_fetch.py index db0337b..8a5d5c1 100644 --- a/skills/web_fetch.py +++ b/skills/web_fetch.py @@ -5,16 +5,44 @@ SKILL_MCP_EXPOSE = True import re +from urllib.parse import urljoin + import requests + +def _get_validating_redirects(url: str, max_redirects: int = 5): + """SSRF-safe GET (Fix #7 H1 + re-audit N3). Validates the URL, then follows + redirects MANUALLY, re-validating every hop — requests' default + auto-redirect would otherwise reach an internal / loopback / cloud-metadata + target via a 302 the guard never saw. Raises codec_ssrf.SSRFError on any + blocked hop or too many redirects.""" + import codec_ssrf + for _ in range(max_redirects + 1): + codec_ssrf.validate_url(url) + resp = requests.get(url, timeout=10, allow_redirects=False) + if (resp.is_redirect or resp.is_permanent_redirect) and resp.headers.get("Location"): + url = urljoin(url, resp.headers["Location"]) + continue + return resp + raise codec_ssrf.SSRFError("too many redirects") + + def run(task: str, context: str = "") -> str: try: m = re.search(r"https?://\S+", (task or "") + " " + (context or "")) if not m: return "web_fetch failed: no http(s) URL found in task" url = m.group(0).rstrip(").,;'\"") - - response = requests.get(url, timeout=10) + + # Fix #7 (H1) + re-audit N3: SSRF guard BEFORE the request AND on every + # redirect hop. The fetched body flows back into the chat/LLM transcript, + # so a read of an internal/metadata host is an exfil path. + import codec_ssrf + try: + response = _get_validating_redirects(url) + except codec_ssrf.SSRFError as e: + return f"web_fetch failed: blocked URL ({e})" + response.raise_for_status() content_type = response.headers.get("content-type", "") diff --git a/tests/test_a12_invariant.py b/tests/test_a12_invariant.py new file mode 100644 index 0000000..d46f26e --- /dev/null +++ b/tests/test_a12_invariant.py @@ -0,0 +1,75 @@ +"""Fix #10: CI guard for the A-12 invariant. + +A-12 (see AGENTS.md §2) routed every chat/completions *text* call site onto +codec_llm. The only inline `requests.post(.../chat/completions)` calls left +are vision sites (pending A-11 cleanup) and codec_core's generated session +script. This guard fails if a NEW inline chat/completions POST appears +anywhere else — i.e. someone bypassed codec_llm. + +The detector matches the precise anti-pattern: a `.post(` whose first argument +literally contains `chat/completions`. URL-in-a-variable callers (codec_llm, +codec_vision) don't match and don't need allowlisting; that's intended — the +guard targets the literal inline-POST shape that bypasses the canonical caller. +""" +import re +from pathlib import Path + +REPO = Path(__file__).resolve().parent.parent + +# Files permitted to contain an inline `.post(...chat/completions...)`. +# Every entry is a vision site (pending A-11 migration onto codec_vision) or +# codec_core's build_session_script, which EMITS the call as a string into the +# generated session script (not a live POST in codec_core itself). +_ALLOWLIST = { + "codec_dashboard.py", # screen-vision POSTs (A-11 pending) + "codec_watcher.py", # screen-vision POST (A-11 pending) + "codec_imessage.py", # bridge vision POST (A-11 pending) + "codec_telegram.py", # bridge vision POST (A-11 pending) + "skills/screenshot_text.py", # OCR vision POST (A-11 pending) + "codec_core.py", # generated session-script string, not a live POST +} + +_INLINE_POST_RE = re.compile(r"\.post\s*\([^)]*chat/completions") +_SKIP_PREFIXES = ("tests/", ".claude/", "scripts/") + + +def _scan(root: Path) -> set: + """Return the set of repo-relative .py paths containing an inline + chat/completions POST.""" + found = set() + for p in root.rglob("*.py"): + rel = p.relative_to(root).as_posix() + if rel.startswith(_SKIP_PREFIXES) or "__pycache__" in rel: + continue + try: + text = p.read_text(encoding="utf-8") + except OSError: + continue + if _INLINE_POST_RE.search(text): + found.add(rel) + return found + + +def test_no_new_inline_llm_post_outside_codec_llm(): + found = _scan(REPO) + offenders = found - _ALLOWLIST + assert not offenders, ( + "New inline chat/completions POST(s) outside codec_llm " + f"(A-12 invariant violated): {sorted(offenders)}.\n" + "Route LLM text calls through codec_llm.call/stream/acall/astream. " + "If this is a legitimate vision site pending A-11, add it to the " + "documented _ALLOWLIST in this test with a reason." + ) + + +def test_a12_guard_actually_detects_a_violation(tmp_path): + # Proves the detector is not a no-op: a synthetic rogue inline POST is found. + (tmp_path / "rogue_skill.py").write_text( + "import requests\n" + "def run(t):\n" + ' return requests.post("http://127.0.0.1:8090/v1/chat/completions", json={}).text\n' + ) + (tmp_path / "innocent.py").write_text("x = 1\n") + found = _scan(tmp_path) + assert "rogue_skill.py" in found, "guard failed to detect an inline chat/completions POST" + assert "innocent.py" not in found, "guard false-positived on an unrelated file" diff --git a/tests/test_chat_helpers.py b/tests/test_chat_helpers.py new file mode 100644 index 0000000..e4ee200 --- /dev/null +++ b/tests/test_chat_helpers.py @@ -0,0 +1,45 @@ +"""Fix #8: unit tests for the chat-handler helpers extracted from +chat_completion. The extraction (_build_chat_system_prompt, _chat_vision_response) +made these independently testable in isolation — that testability is the point +of reducing the god-module's hottest handler. Behavior is locked here; the +broader /api/chat behavior stays covered by test_chat_stream / test_step_budget / +test_chat_escalation / test_dashboard_api. +""" +import codec_dashboard as dash + +_LOCAL_CFG = {"llm_base_url": "http://localhost:8083/v1"} + + +def _budget(): + return dash._StepBudget(route="chat", correlation_id="abcdef123456") + + +def test_vision_response_none_without_images(): + assert dash._chat_vision_response({"messages": []}, []) is None + + +def test_build_system_prompt_adds_attachment_note(): + out = dash._build_chat_system_prompt(_LOCAL_CFG, _budget(), has_attachment=True, + last_user_text="look at this") + assert "attached a file or image" in out + assert "DO NOT emit [SKILL:...]" in out + + +def test_build_system_prompt_adds_content_rewrite_note(): + out = dash._build_chat_system_prompt(_LOCAL_CFG, _budget(), has_attachment=False, + last_user_text="please rewrite this email") + assert "rewritten content as plain prose" in out + + +def test_build_system_prompt_plain_request_has_no_turn_overrides(): + out = dash._build_chat_system_prompt(_LOCAL_CFG, _budget(), has_attachment=False, + last_user_text="what's the weather like?") + assert "attached a file or image" not in out + assert "rewritten content as plain prose" not in out + + +def test_build_system_prompt_consumes_one_llm_call_step(): + b = _budget() + assert b.count == 0 + dash._build_chat_system_prompt(_LOCAL_CFG, b, has_attachment=False, last_user_text="hi") + assert b.count == 1, "the llm_call step must be consumed inside the prompt builder" diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 0000000..9d630e8 --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,80 @@ +"""Tests for codec_concurrency.run_with_timeout (Fix #4 / C4). + +The bug being fixed: `with ThreadPoolExecutor() as ex: fut.result(timeout=T)` +defeats its own timeout — the context-manager __exit__ calls +shutdown(wait=True), which blocks until the runaway task finishes. So a 50ms +"timeout" on a 5s task actually takes ~5s. + +run_with_timeout must surface the timeout PROMPTLY and never block on the +abandoned worker (daemon thread, no shutdown(wait=True)). +""" +import concurrent.futures +import threading +import time + +import pytest + +import codec_concurrency + + +def test_returns_result_for_fast_fn(): + assert codec_concurrency.run_with_timeout(lambda: 42, 1.0) == 42 + + +def test_passes_through_args_and_kwargs(): + def add(a, b, c=0): + return a + b + c + + assert codec_concurrency.run_with_timeout(add, 1.0, 2, 3, c=4) == 9 + + +def test_timeout_raises_TimeoutError(): + def slow(): + time.sleep(5) + return "should-not-return" + + with pytest.raises(TimeoutError): + codec_concurrency.run_with_timeout(slow, 0.05) + + +def test_timeout_returns_promptly_does_not_block_on_runaway(): + # THE C4 regression guard: a 5s task with a 50ms timeout must surface the + # timeout in well under the task duration. If run_with_timeout blocked on + # shutdown(wait=True) like the old ThreadPoolExecutor pattern, this would + # take ~5s and the assertion would fail. + started = threading.Event() + + def slow(): + started.set() + time.sleep(5) + + t0 = time.monotonic() + with pytest.raises(TimeoutError): + codec_concurrency.run_with_timeout(slow, 0.05) + elapsed = time.monotonic() - t0 + assert started.is_set(), "worker thread should have started" + assert elapsed < 1.0, ( + f"timeout took {elapsed:.2f}s — it blocked on the runaway task; " + "C4 (shutdown(wait=True)) not actually fixed" + ) + + +def test_reraises_fn_exception_with_type_and_message(): + def boom(): + raise ValueError("kaboom") + + with pytest.raises(ValueError, match="kaboom"): + codec_concurrency.run_with_timeout(boom, 1.0) + + +def test_timeouterror_is_caught_as_concurrent_futures_timeout(): + # The migrated call sites catch concurrent.futures.TimeoutError. On + # py3.11+ that is an alias of builtin TimeoutError, so a helper raising + # builtin TimeoutError is still caught. Guard that contract explicitly. + assert concurrent.futures.TimeoutError is TimeoutError + + def slow(): + time.sleep(5) + + with pytest.raises(concurrent.futures.TimeoutError): + codec_concurrency.run_with_timeout(slow, 0.05) diff --git a/tests/test_consent.py b/tests/test_consent.py new file mode 100644 index 0000000..c2112d4 --- /dev/null +++ b/tests/test_consent.py @@ -0,0 +1,91 @@ +"""re-audit consent gate (Decision C): a skill is destructive if it declares +SKILL_DESTRUCTIVE=True (registry flag, extensible), is in _HTTP_BLOCKED +(backstop), or is a known high-power built-in. codec_consent is the shared +classifier for the chat (codec_dispatch.run_skill) and MCP (codec_mcp.tool_fn) +gates. +""" +import codec_consent + + +class _FakeReg: + def __init__(self, destructive_names): + self._d = set(destructive_names) + + def get_destructive(self, name): + return name in self._d + + +def test_destructive_via_registry_flag(): + reg = _FakeReg({"my_user_skill"}) + assert codec_consent.is_destructive_skill("my_user_skill", registry=reg) is True + assert codec_consent.is_destructive_skill("weather", registry=reg) is False + + +def test_destructive_via_http_blocked_backstop(): + reg = _FakeReg(set()) + # python_exec / terminal are in codec_config._HTTP_BLOCKED + assert codec_consent.is_destructive_skill("python_exec", registry=reg) is True + assert codec_consent.is_destructive_skill("terminal", registry=reg) is True + + +def test_destructive_known_builtins(): + reg = _FakeReg(set()) + for s in ("file_ops", "file_write", "imessage_send", "pilot", "skill_forge"): + assert codec_consent.is_destructive_skill(s, registry=reg) is True, s + + +def test_benign_skills_not_destructive(): + reg = _FakeReg(set()) + for s in ("weather", "calculator", "web_search", "create_skill", "time"): + assert codec_consent.is_destructive_skill(s, registry=reg) is False, s + + +def test_empty_toolname_not_destructive(): + assert codec_consent.is_destructive_skill("", registry=_FakeReg(set())) is False + assert codec_consent.is_destructive_skill(None, registry=_FakeReg(set())) is False + + +def test_gate_kill_switch(monkeypatch): + monkeypatch.delenv("CONSENT_GATE_ENABLED", raising=False) + assert codec_consent.gate_enabled() is True + monkeypatch.setenv("CONSENT_GATE_ENABLED", "false") + assert codec_consent.gate_enabled() is False + + +# ── chat_consent_ok (A2: reuse the AskUserQuestion PWA panel) ──────────────── +def test_chat_consent_nondestructive_runs_without_prompt(monkeypatch): + import codec_ask_user + + called = {"n": 0} + monkeypatch.setattr(codec_ask_user, "ask", lambda *a, **k: called.__setitem__("n", called["n"] + 1)) + assert codec_consent.chat_consent_ok("weather", "x", registry=_FakeReg(set())) is True + assert called["n"] == 0, "non-destructive skills must not prompt for consent" + + +def test_chat_consent_destructive_granted(monkeypatch): + import codec_ask_user + + monkeypatch.setattr(codec_ask_user, "ask", lambda *a, **k: "delete") # verb-matched approval + assert codec_consent.chat_consent_ok("file_ops", "delete x", registry=_FakeReg(set())) is True + + +def test_chat_consent_destructive_timeout_blocks(monkeypatch): + import codec_ask_user + + monkeypatch.setattr(codec_ask_user, "ask", lambda *a, **k: codec_ask_user.TIMEOUT_SENTINEL) + assert codec_consent.chat_consent_ok("file_ops", "x", registry=_FakeReg(set())) is False + + +def test_chat_consent_gate_off_runs(monkeypatch): + monkeypatch.setenv("CONSENT_GATE_ENABLED", "false") + assert codec_consent.chat_consent_ok("file_ops", "x", registry=_FakeReg(set())) is True + + +def test_chat_consent_fails_closed_on_error(monkeypatch): + import codec_ask_user + + def _boom(*a, **k): + raise RuntimeError("ask broke") + + monkeypatch.setattr(codec_ask_user, "ask", _boom) + assert codec_consent.chat_consent_ok("file_ops", "x", registry=_FakeReg(set())) is False diff --git a/tests/test_fastmcp_compat.py b/tests/test_fastmcp_compat.py new file mode 100644 index 0000000..82d7272 --- /dev/null +++ b/tests/test_fastmcp_compat.py @@ -0,0 +1,30 @@ +"""Fix #6: guard the fastmcp floor ↔ code-import consistency. + +The deployed python3.13 was found running fastmcp 3.1.1 while requirements.txt +declares >=3.3.1 — and an in-place upgrade to 3.3.1 left a broken install state +on that interpreter. In a CLEAN env the code imports fine on 3.3.1, so the fix +is a clean reinstall (not a code port). This test runs in CI (where fastmcp is +installed from the requirements floor) and would have caught both the version +drift and any future fastmcp API move that breaks codec_mcp / codec_oauth. +""" + + +def test_fastmcp_meets_safe_floor(): + import importlib.metadata as md + + raw = md.version("fastmcp") + parts = [] + for chunk in raw.split(".")[:3]: + num = "".join(c for c in chunk if c.isdigit()) + parts.append(int(num) if num else 0) + assert tuple(parts) >= (3, 3, 1), f"fastmcp {raw} is below the CVE-safe floor 3.3.1" + + +def test_codec_mcp_modules_import_on_declared_fastmcp(): + # The exact import paths codec_mcp + codec_oauth_provider depend on. + from fastmcp import FastMCP # noqa: F401 + from fastmcp.server.auth.providers.in_memory import InMemoryOAuthProvider # noqa: F401 + + # And the real modules load against the installed fastmcp. + import codec_oauth_provider # noqa: F401 + import codec_mcp # noqa: F401 diff --git a/tests/test_file_symlink.py b/tests/test_file_symlink.py new file mode 100644 index 0000000..e7ce704 --- /dev/null +++ b/tests/test_file_symlink.py @@ -0,0 +1,68 @@ +"""N4/N10 (re-audit, High): file_write/file_ops resolved only the PARENT dir +via realpath, then re-joined the raw basename — so a symlinked FINAL component +(e.g. ~/Documents/notes.md -> ~/.zshrc or ~/.codec/oauth_state.json) slipped +past the blocklist and open() followed it (write-through / read-exfil over MCP). + +The safety check must realpath the FULL path so a symlinked basename resolves +to its (blocked) target. +""" +import importlib.util +import os +from pathlib import Path + + +def _load(modname): + p = Path(__file__).resolve().parent.parent / "skills" / modname + spec = importlib.util.spec_from_file_location(f"_under_test_{modname}", p) + m = importlib.util.module_from_spec(spec) + spec.loader.exec_module(m) + return m + + +def test_file_write_is_safe_target_rejects_symlinked_basename(tmp_path, monkeypatch): + wf = _load("file_write.py") + blocked = tmp_path / "blocked" + blocked.mkdir() + (blocked / "s.txt").write_text("ORIGINAL") + docs = tmp_path / "home" / "d" + docs.mkdir(parents=True) + link = docs / "notes.txt" + link.symlink_to(blocked / "s.txt") # final component is a symlink to a blocked target + + monkeypatch.setattr(wf, "_HOME_REAL", os.path.realpath(str(tmp_path / "home"))) + monkeypatch.setattr(wf, "_TMP_REAL", os.path.realpath("/tmp")) + monkeypatch.setattr(wf, "_BLOCKED_ROOTS_REAL", [os.path.realpath(str(blocked))]) + monkeypatch.setattr(wf, "_BLOCKED_ROOTS", wf._BLOCKED_ROOTS_REAL) + + safe, reason = wf._is_safe_target(str(link)) + assert safe is False, f"symlink-to-blocked target must be unsafe (got safe; reason={reason!r})" + + +def test_file_ops_is_safe_path_rejects_symlinked_basename(tmp_path, monkeypatch): + fo = _load("file_ops.py") + blocked = tmp_path / "blocked" + blocked.mkdir() + (blocked / "oauth_state.json").write_text("SECRET") + docs = tmp_path / "home" / "d" + docs.mkdir(parents=True) + link = docs / "x.json" + link.symlink_to(blocked / "oauth_state.json") + + monkeypatch.setattr(fo, "_BLOCKED_ROOTS_REAL", [os.path.realpath(str(blocked))]) + monkeypatch.setattr(fo, "_BLOCKED_PATHS", fo._BLOCKED_ROOTS_REAL) + + safe, reason = fo._is_safe_path(str(link)) + assert safe is False, f"symlink-to-blocked target must be unsafe (got safe; reason={reason!r})" + + +def test_file_write_still_allows_plain_new_file(tmp_path, monkeypatch): + # Regression: a normal (non-symlink) new file under home stays writable. + wf = _load("file_write.py") + home = tmp_path / "home" + (home / "d").mkdir(parents=True) + monkeypatch.setattr(wf, "_HOME_REAL", os.path.realpath(str(home))) + monkeypatch.setattr(wf, "_TMP_REAL", os.path.realpath("/tmp")) + monkeypatch.setattr(wf, "_BLOCKED_ROOTS_REAL", [os.path.realpath(str(tmp_path / "blocked"))]) + monkeypatch.setattr(wf, "_BLOCKED_ROOTS", wf._BLOCKED_ROOTS_REAL) + safe, reason = wf._is_safe_target(str(home / "d" / "newfile.txt")) + assert safe is True, f"plain new file under home must stay writable (reason={reason!r})" diff --git a/tests/test_jsonstore.py b/tests/test_jsonstore.py new file mode 100644 index 0000000..c668dfc --- /dev/null +++ b/tests/test_jsonstore.py @@ -0,0 +1,96 @@ +"""Fix #9 Phase 0: harden codec_jsonstore primitives. + +- atomic_write_json gains optional `default=` (custom JSON serializer, e.g. + str for datetime) and `sort_keys=` passthrough, so it can subsume the + hand-rolled helpers (codec_ask_user._atomic_write_text uses default=str). +- new read_modify_write(path, fn) standardizes the lock+load+mutate+atomic-write + pattern so a future RMW site can't forget the file_lock. +""" +import json +import os +import stat +import threading +import time +from datetime import datetime, timezone + +import pytest + +import codec_jsonstore + + +def test_atomic_write_json_roundtrip_and_0600(tmp_path): + p = tmp_path / "x.json" + codec_jsonstore.atomic_write_json(p, {"a": 1}) + assert json.loads(p.read_text()) == {"a": 1} + assert stat.S_IMODE(os.stat(p).st_mode) == 0o600 + + +def test_atomic_write_json_default_serializer(tmp_path): + p = tmp_path / "dt.json" + dt = datetime(2026, 5, 29, tzinfo=timezone.utc) + codec_jsonstore.atomic_write_json(p, {"when": dt}, default=str) + assert "2026-05-29" in p.read_text() + + +def test_atomic_write_json_raises_without_default_and_leaves_no_file(tmp_path): + p = tmp_path / "bad.json" + with pytest.raises(TypeError): + codec_jsonstore.atomic_write_json(p, {"when": datetime(2026, 5, 29)}) + assert not p.exists(), "a failed write must not leave a partial target file" + + +def test_atomic_write_json_sort_keys(tmp_path): + p = tmp_path / "s.json" + codec_jsonstore.atomic_write_json(p, {"b": 1, "a": 2}, sort_keys=True) + text = p.read_text() + assert text.index('"a"') < text.index('"b"') + + +def test_read_modify_write_applies_and_persists(tmp_path): + p = tmp_path / "rmw.json" + codec_jsonstore.atomic_write_json(p, {"n": 0}) + + def bump(d): + d["n"] += 1 + return d + + out = codec_jsonstore.read_modify_write(p, bump) + assert out == {"n": 1} + assert json.loads(p.read_text()) == {"n": 1} + + +def test_read_modify_write_missing_file_uses_default_factory(tmp_path): + p = tmp_path / "missing.json" + + def add(d): + d["k"] = "v" + return d + + out = codec_jsonstore.read_modify_write(p, add, default_factory=dict) + assert out == {"k": "v"} + + +def test_read_modify_write_no_clobber_under_concurrency(tmp_path): + p = tmp_path / "conc.json" + codec_jsonstore.atomic_write_json(p, {"items": []}) + n = 16 + barrier = threading.Barrier(n) + + def worker(i): + barrier.wait() + + def add(d): + time.sleep(0.005) # widen the read-modify-write window + d["items"] = d.get("items", []) + [i] + return d + + codec_jsonstore.read_modify_write(p, add) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)] + for t in threads: + t.start() + for t in threads: + t.join() + + items = json.loads(p.read_text())["items"] + assert sorted(items) == list(range(n)), f"read_modify_write clobbered: {sorted(items)}" diff --git a/tests/test_notifications_lock.py b/tests/test_notifications_lock.py new file mode 100644 index 0000000..4835727 --- /dev/null +++ b/tests/test_notifications_lock.py @@ -0,0 +1,55 @@ +"""Fix #9 Phase 2: _save_notification cross-process safety. + +routes/_shared._save_notification does a load->insert->write RMW under the +in-process `_notif_lock` only. The notification writers run in SEPARATE PM2 +daemons (dashboard, scheduler, heartbeat, autopilot), which do NOT share that +threading lock — so concurrent saves across processes clobber each other. + +This test simulates separate processes by neutralizing the in-process lock, +then asserts no notification is lost. It fails before the cross-process +file_lock is added and passes after. +""" +import contextlib +import json +import threading +import time + +import codec_jsonstore +import routes._shared as shared + + +def test_save_notification_no_clobber_across_processes(tmp_path, monkeypatch): + notifs_path = tmp_path / "notifications.json" + monkeypatch.setattr(shared, "NOTIFICATIONS_PATH", str(notifs_path)) + codec_jsonstore.atomic_write_json(notifs_path, []) # start empty (no sample seed) + + # Simulate independent processes: each daemon has its OWN process, so the + # in-process _notif_lock does not serialize them. nullcontext models that. + monkeypatch.setattr(shared, "_notif_lock", contextlib.nullcontext()) + + # Widen the read-modify-write window so an unlocked path reliably clobbers. + real_write = shared._write_notifications + + def slow_write(notifs): + time.sleep(0.01) + return real_write(notifs) + + monkeypatch.setattr(shared, "_write_notifications", slow_write) + + n = 12 + barrier = threading.Barrier(n) + + def worker(i): + barrier.wait() + shared._save_notification(f"t{i}", "body") + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)] + for t in threads: + t.start() + for t in threads: + t.join() + + titles = {x["title"] for x in json.loads(notifs_path.read_text())} + assert titles == {f"t{i}" for i in range(n)}, ( + f"notifications clobbered across processes: {len(titles)}/{n} survived" + ) diff --git a/tests/test_oauth_provider.py b/tests/test_oauth_provider.py index ff95143..671f9bf 100644 --- a/tests/test_oauth_provider.py +++ b/tests/test_oauth_provider.py @@ -154,6 +154,60 @@ def _counting_fsync(fd): assert mode == 0o600, f"fallback file must be 0600; got {oct(mode)}" +# ── Fix #10: OAuth scope-escalation regression coverage ────────────────────── +def test_refresh_rejects_scope_escalation(tmp_path): + """A refresh token issued for ['read'] must NOT be exchangeable for + ['read','write'] — exchange_refresh_token enforces requested ⊆ original. + Pins the scope-escalation defense (codec_oauth_provider.py:244-249) so a + future refactor that drops the subset check fails CI.""" + import time + + from mcp.server.auth.provider import RefreshToken, TokenError + + p = PersistentOAuthProvider( + base_url="https://test.example.com", + client_registration_options=ClientRegistrationOptions(enabled=True), + state_path=tmp_path / "state.json", + ) + client = _make_client("escal") + asyncio.run(p.register_client(client)) + rt = RefreshToken( + token="codec_rt_orig", + client_id="escal", + scopes=["read"], + expires_at=int(time.time() + 3600), + ) + with pytest.raises(TokenError) as exc: + asyncio.run(p.exchange_refresh_token(client, rt, ["read", "write"])) + assert "scope" in str(exc.value).lower() + + +def test_refresh_allows_subset_scopes(tmp_path): + """A refresh for a SUBSET of the original scopes succeeds, and the new + access token carries only the narrowed scopes (no silent widening).""" + import time + + from mcp.server.auth.provider import RefreshToken + + p = PersistentOAuthProvider( + base_url="https://test.example.com", + client_registration_options=ClientRegistrationOptions(enabled=True), + state_path=tmp_path / "state.json", + ) + client = _make_client("narrow") + asyncio.run(p.register_client(client)) + rt = RefreshToken( + token="codec_rt_orig2", + client_id="narrow", + scopes=["read", "write"], + expires_at=int(time.time() + 3600), + ) + p.refresh_tokens[rt.token] = rt # register so the internal revoke is clean + token = asyncio.run(p.exchange_refresh_token(client, rt, ["read"])) + assert "read" in (token.scope or ""), "narrowed scope must include the requested scope" + assert "write" not in (token.scope or ""), "new token must not carry the dropped scope" + + if __name__ == "__main__": import tempfile with tempfile.TemporaryDirectory() as d: diff --git a/tests/test_reaudit_low.py b/tests/test_reaudit_low.py new file mode 100644 index 0000000..cf8907a --- /dev/null +++ b/tests/test_reaudit_low.py @@ -0,0 +1,49 @@ +"""re-audit LOW cluster: N19 (AST gate misses `import builtins`) + N20 +(_pinned_builtin_names fails OPEN on a manifest read error, so the skill-approve +guard blocks nothing during a transient failure).""" +import asyncio + +import routes.skills as sr + +_BENIGN = ( + 'SKILL_NAME = "x"\n' + 'SKILL_DESCRIPTION = "x"\n' + 'SKILL_TRIGGERS = []\n' + 'def run(task, app="", ctx=""):\n' + ' return "ok"\n' +) + + +class _Req: + def __init__(self, payload): + self._p = payload + + async def json(self): + return self._p + + +# ── N19: builtins.exec/eval bypassed the AST gate (builtins not a dangerous module) ── +def test_ast_gate_flags_builtins_import(): + from codec_config import is_dangerous_skill_code + + bad, reason = is_dangerous_skill_code("import builtins\nbuiltins.exec('1')") + assert bad is True, f"`import builtins` must be flagged (AST gate bypass); reason={reason!r}" + + +def test_ast_gate_still_allows_safe_code(): + from codec_config import is_dangerous_skill_code + + bad, _ = is_dangerous_skill_code("import json\nx = json.dumps({'a': 1})\n") + assert bad is False, "benign stdlib code must still pass" + + +# ── N20: skill_approve must FAIL CLOSED when the pinned-builtin manifest can't be read ── +def test_skill_approve_fails_closed_when_manifest_unreadable(tmp_path, monkeypatch): + monkeypatch.setattr(sr, "_get_skills_dir", lambda: str(tmp_path)) + monkeypatch.setattr(sr, "_pinned_builtin_names", lambda: None) # simulate read failure + rid = "rev_failclosed" + sr._pending_skills[rid] = {"code": _BENIGN, "filename": "my_helper.py"} + resp = asyncio.run(sr.skill_approve(_Req({"review_id": rid}))) + status = getattr(resp, "status_code", 200) + assert status == 400, "approve must fail closed when the pinned-builtin manifest can't be verified" + assert not (tmp_path / "my_helper.py").exists() diff --git a/tests/test_reaudit_mediums.py b/tests/test_reaudit_mediums.py new file mode 100644 index 0000000..de720cc --- /dev/null +++ b/tests/test_reaudit_mediums.py @@ -0,0 +1,27 @@ +"""re-audit MEDIUMs. + +M-consent (partial, CHAIN-002): skill_forge writes forged code straight to disk +(bypassing the review-and-approve flow PR-1B mandated) yet was on +CHAT_SKILL_ALLOWLIST — so a prompt-injected [SKILL:skill_forge:...] tag could +persist an unsandboxed skill. It must not be auto-firable from chat. create_skill +STAYS (it stages for human review, never writes without approval). +""" +import codec_dashboard + + +def test_skill_forge_not_auto_firable_from_chat(): + assert "skill_forge" not in codec_dashboard.CHAT_SKILL_ALLOWLIST, ( + "skill_forge writes code to disk without the review gate — it must not be " + "auto-firable from a chat message (CHAIN-002)" + ) + + +def test_phantom_ask_codec_to_build_removed(): + # No skills/ask_codec_to_build.py exists — a stale allowlist entry. + assert "ask_codec_to_build" not in codec_dashboard.CHAT_SKILL_ALLOWLIST + + +def test_create_skill_still_allowed_review_gated_path(): + # create_skill routes through /api/skill/review (never writes without + # approval), so it remains the safe chat-reachable skill-creation path. + assert "create_skill" in codec_dashboard.CHAT_SKILL_ALLOWLIST diff --git a/tests/test_research_jobs_eviction.py b/tests/test_research_jobs_eviction.py new file mode 100644 index 0000000..7d8216b --- /dev/null +++ b/tests/test_research_jobs_eviction.py @@ -0,0 +1,33 @@ +"""N9 (re-audit, High): _research_jobs had NO lock and NO eviction (unlike +_agent_jobs after H-4) — an unbounded memory leak plus a 'dict changed size' +race between deep_research_start's insert and the worker's update. Add a TTL +eviction mirroring _evict_stale_agent_jobs. +""" +from datetime import datetime, timedelta + +import routes._shared as shared + + +def test_evict_stale_research_jobs(monkeypatch): + monkeypatch.setattr(shared, "_research_jobs", {}, raising=False) + now = datetime.now() + old = (now - timedelta(hours=48)).isoformat() + fresh = now.isoformat() + shared._research_jobs["old_done"] = {"status": "done", "started": old} + shared._research_jobs["old_running"] = {"status": "running", "started": old} + shared._research_jobs["fresh_done"] = {"status": "done", "started": fresh} + + removed = shared._evict_stale_research_jobs(now=now) + + assert removed == 1, "only the old terminal job should be evicted" + assert "old_done" not in shared._research_jobs + assert "old_running" in shared._research_jobs, "running jobs must never be evicted" + assert "fresh_done" in shared._research_jobs + + +def test_evict_stale_research_jobs_keeps_unparseable_started(monkeypatch): + monkeypatch.setattr(shared, "_research_jobs", {}, raising=False) + shared._research_jobs["bad_ts"] = {"status": "done", "started": "not-a-date"} + removed = shared._evict_stale_research_jobs() + assert removed == 0 + assert "bad_ts" in shared._research_jobs, "never lose data on an unparseable timestamp" diff --git a/tests/test_session_token_valid.py b/tests/test_session_token_valid.py new file mode 100644 index 0000000..600da56 --- /dev/null +++ b/tests/test_session_token_valid.py @@ -0,0 +1,50 @@ +"""N5 (re-audit, High): the ?s= query-param session fallback in AuthMiddleware +skipped the TOTP-verified check (unlike the cookie path via +_verify_biometric_session), so a pre-TOTP / stolen-pre-TOTP token could reach +any GET /api endpoint by appending ?s=, bypassing 2FA. + +Fix: both the cookie path and the ?s= path route through one +_session_token_valid() that enforces existence + age + TOTP. Unit-tested here. +""" +from datetime import datetime, timedelta + +import pytest + +import routes._shared as shared + + +@pytest.fixture(autouse=True) +def _clean_sessions(monkeypatch): + monkeypatch.setattr(shared, "_auth_sessions", {}, raising=False) + monkeypatch.setattr(shared, "_save_sessions", lambda: None, raising=False) + yield + + +def test_missing_token_invalid(): + assert shared._session_token_valid("") is False + assert shared._session_token_valid(None) is False + + +def test_fresh_session_no_totp_valid(monkeypatch): + monkeypatch.setattr(shared, "_is_totp_enabled", lambda: False, raising=False) + shared._auth_sessions["t"] = {"created": datetime.now()} + assert shared._session_token_valid("t") is True + + +def test_totp_enabled_unverified_invalid(monkeypatch): + monkeypatch.setattr(shared, "_is_totp_enabled", lambda: True, raising=False) + shared._auth_sessions["t"] = {"created": datetime.now()} # no totp_verified + assert shared._session_token_valid("t") is False, "?s= path must enforce TOTP (N5)" + + +def test_totp_enabled_verified_valid(monkeypatch): + monkeypatch.setattr(shared, "_is_totp_enabled", lambda: True, raising=False) + shared._auth_sessions["t"] = {"created": datetime.now(), "totp_verified": True} + assert shared._session_token_valid("t") is True + + +def test_expired_session_invalid(monkeypatch): + monkeypatch.setattr(shared, "_is_totp_enabled", lambda: False, raising=False) + old = datetime.now() - timedelta(hours=shared.AUTH_SESSION_HOURS + 1) + shared._auth_sessions["t"] = {"created": old} + assert shared._session_token_valid("t") is False diff --git a/tests/test_skill_overwrite.py b/tests/test_skill_overwrite.py new file mode 100644 index 0000000..f8433d1 --- /dev/null +++ b/tests/test_skill_overwrite.py @@ -0,0 +1,52 @@ +"""Fix #7b (H2/H6): skill approve must not overwrite/shadow a hash-pinned built-in. + +The review-and-approve flow (routes/skills.py) writes an approved skill to the +skills dir by basename. Without a guard, an approved skill named after a +manifest-pinned built-in (e.g. calculator.py, file_write.py) takes that +trusted name — shadowing the real built-in (or overwriting it if the write +dir is the repo skills dir). The approve gate must refuse pinned names. +""" +import asyncio + +import routes.skills as skills_routes + + +class _FakeReq: + def __init__(self, payload): + self._p = payload + + async def json(self): + return self._p + + +_BENIGN = ( + 'SKILL_NAME = "x"\n' + 'SKILL_DESCRIPTION = "x"\n' + 'SKILL_TRIGGERS = []\n' + 'def run(task, app="", ctx=""):\n' + ' return "ok"\n' +) + + +def _approve(payload): + return asyncio.run(skills_routes.skill_approve(_FakeReq(payload))) + + +def test_skill_approve_refuses_pinned_builtin_name(tmp_path, monkeypatch): + monkeypatch.setattr(skills_routes, "_get_skills_dir", lambda: str(tmp_path)) + rid = "rev_pinned" + skills_routes._pending_skills[rid] = {"code": _BENIGN, "filename": "calculator.py"} + resp = _approve({"review_id": rid}) + status = getattr(resp, "status_code", 200) + assert status == 400, f"approving a pinned built-in name must be 400, got {status}: {resp}" + assert not (tmp_path / "calculator.py").exists(), "a pinned built-in was written to disk" + + +def test_skill_approve_allows_non_pinned_name(tmp_path, monkeypatch): + monkeypatch.setattr(skills_routes, "_get_skills_dir", lambda: str(tmp_path)) + rid = "rev_ok" + skills_routes._pending_skills[rid] = {"code": _BENIGN, "filename": "my_unique_helper.py"} + resp = _approve({"review_id": rid}) + status = getattr(resp, "status_code", 200) + assert status == 200, f"a non-pinned skill should approve, got {status}: {resp}" + assert (tmp_path / "my_unique_helper.py").exists() diff --git a/tests/test_ssrf.py b/tests/test_ssrf.py new file mode 100644 index 0000000..0481ec5 --- /dev/null +++ b/tests/test_ssrf.py @@ -0,0 +1,190 @@ +"""Fix #7a (H1/H2/H6): SSRF guard for outbound URL fetches. + +codec_ssrf.validate_url(url) must reject URLs that would let an +attacker-supplied link (chat injection / clipboard / crew task) reach +internal services or the cloud metadata endpoint, while allowing ordinary +public hosts. + +Network-free by design: IP-literal + bad-scheme cases need no DNS; the +allow / mixed-resolution cases monkeypatch socket.getaddrinfo. +""" +import pytest + +import codec_ssrf + + +# IP literals + bad schemes — no DNS lookup needed, fully deterministic. +@pytest.mark.parametrize( + "url", + [ + "http://127.0.0.1/", + "http://169.254.169.254/latest/meta-data/", # cloud metadata + "http://10.0.0.5/internal", + "http://192.168.1.1/", + "http://172.16.0.1/", + "http://[::1]/", + "http://0.0.0.0/", + "file:///etc/passwd", + "ftp://example.com/x", + "gopher://evil/", + "", + "not-a-url", + ], +) +def test_validate_url_rejects_unsafe(url): + with pytest.raises(codec_ssrf.SSRFError): + codec_ssrf.validate_url(url) + + +def test_validate_url_allows_public_host(monkeypatch): + monkeypatch.setattr( + codec_ssrf.socket, + "getaddrinfo", + lambda *a, **k: [(2, 1, 6, "", ("93.184.216.34", 443))], + ) + assert codec_ssrf.validate_url("https://example.com/page") == "https://example.com/page" + + +def test_validate_url_rejects_when_any_resolved_ip_is_internal(monkeypatch): + # DNS-rebinding-style defense: a hostname that resolves to BOTH a public + # and an internal address must be rejected (ANY blocked → reject). + monkeypatch.setattr( + codec_ssrf.socket, + "getaddrinfo", + lambda *a, **k: [ + (2, 1, 6, "", ("93.184.216.34", 80)), + (2, 1, 6, "", ("169.254.169.254", 80)), + ], + ) + with pytest.raises(codec_ssrf.SSRFError): + codec_ssrf.validate_url("http://rebind.example/") + + +def test_validate_url_rejects_dns_failure(monkeypatch): + import socket as _socket + + def _boom(*a, **k): + raise _socket.gaierror("name does not resolve") + + monkeypatch.setattr(codec_ssrf.socket, "getaddrinfo", _boom) + with pytest.raises(codec_ssrf.SSRFError): + codec_ssrf.validate_url("http://nonexistent.invalid/") + + +# ── Wiring: the guard must run BEFORE the HTTP client at every fetch site ──── +def _load_web_fetch_skill(): + import importlib.util + from pathlib import Path + + path = Path(__file__).resolve().parent.parent / "skills" / "web_fetch.py" + spec = importlib.util.spec_from_file_location("_wf_under_test", path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def test_web_fetch_skill_does_not_request_blocked_url(monkeypatch): + web_fetch = _load_web_fetch_skill() + calls = {"n": 0} + + def _boom(*a, **k): + calls["n"] += 1 + raise AssertionError("requests.get must not be called for a blocked URL") + + monkeypatch.setattr(web_fetch.requests, "get", _boom) + result = web_fetch.run("fetch http://169.254.169.254/latest/meta-data/") + assert calls["n"] == 0, "web_fetch reached the network for an SSRF-blocked URL" + assert "block" in result.lower(), f"expected an SSRF-block message, got: {result!r}" + + +def test_agents_web_fetch_tool_does_not_request_blocked_url(monkeypatch): + import codec_agents + + calls = {"n": 0} + + def _boom(*a, **k): + calls["n"] += 1 + raise AssertionError("_sync_http.get must not be called for a blocked URL") + + monkeypatch.setattr(codec_agents._sync_http, "get", _boom) + result = codec_agents._web_fetch("http://127.0.0.1/internal") + assert calls["n"] == 0, "_web_fetch reached the network for an SSRF-blocked URL" + assert "block" in result.lower(), f"expected an SSRF-block message, got: {result!r}" + + +# ── N3 (re-audit): the guard must re-validate EVERY redirect hop ───────────── +def _public_dns(monkeypatch): + """Resolve any hostname to a public IP; IP literals resolve to themselves + (so 169.254/127.0.0.1 still trip _is_blocked_ip).""" + def _gai(host, *a, **k): + ip = "93.184.216.34" if any(c.isalpha() for c in host) else host + return [(2, 1, 6, "", (ip, 0))] + monkeypatch.setattr(codec_ssrf.socket, "getaddrinfo", _gai) + + +class _ReqResp: + def __init__(self, status, location=None, text="", ctype="text/html"): + self.status_code = status + self.headers = {"content-type": ctype} + if location: + self.headers["Location"] = location + self.text = text + + @property + def is_redirect(self): + return self.status_code in (301, 302, 303, 307, 308) and "Location" in self.headers + + @property + def is_permanent_redirect(self): + return self.status_code in (301, 308) + + def raise_for_status(self): + pass + + +def test_web_fetch_revalidates_redirect_hop_to_internal(monkeypatch): + web_fetch = _load_web_fetch_skill() + _public_dns(monkeypatch) + + def fake_get(u, allow_redirects=True, **k): + if "evil" in u: + if allow_redirects: + # models the VULN: requests auto-follows the 302 to the metadata host + return _ReqResp(200, text="INTERNAL-METADATA-SECRET", ctype="text/plain") + return _ReqResp(302, location="http://169.254.169.254/latest/meta-data/") + return _ReqResp(200, text="INTERNAL-METADATA-SECRET", ctype="text/plain") + + monkeypatch.setattr(web_fetch.requests, "get", fake_get) + result = web_fetch.run("fetch http://evil.example/redir") + assert "INTERNAL-METADATA-SECRET" not in result, "SSRF guard bypassed via redirect hop" + assert "block" in result.lower(), f"expected an SSRF-block message, got: {result!r}" + + +class _HxResp: + def __init__(self, status, location=None, text=""): + self.status_code = status + self.headers = {"location": location} if location else {} + self.text = text + + @property + def is_redirect(self): + return self.status_code in (301, 302, 303, 307, 308) and "location" in self.headers + + +def test_agents_web_fetch_revalidates_redirect_hop_to_internal(monkeypatch): + import codec_agents + + _public_dns(monkeypatch) + + def fake_get(u, follow_redirects=True, **k): + u = str(u) + if "evil" in u: + if follow_redirects: + return _HxResp(200, text="INTERNAL-METADATA-SECRET") + return _HxResp(302, location="http://169.254.169.254/latest/meta-data/") + return _HxResp(200, text="INTERNAL-METADATA-SECRET") + + monkeypatch.setattr(codec_agents._sync_http, "get", fake_get) + result = codec_agents._web_fetch("http://evil.example/redir") + assert "INTERNAL-METADATA-SECRET" not in result, "crew fetch SSRF bypassed via redirect hop" + assert "block" in result.lower(), f"expected an SSRF-block message, got: {result!r}" diff --git a/tests/test_state_locking.py b/tests/test_state_locking.py new file mode 100644 index 0000000..c1b6e53 --- /dev/null +++ b/tests/test_state_locking.py @@ -0,0 +1,151 @@ +"""Fix #5 (C5 / H14 / M6): SQLite + JSON state locking. + +Two race classes the audit flagged: + +1. CodecMemory shares one sqlite3 connection across threads + (`check_same_thread=False`) with no app-level lock — concurrent save() + calls interleave on the same connection. +2. read-modify-write of ~/.codec JSON state without a cross-process + file_lock — concurrent writers clobber each other (grant_permission on + grants.json; _write_question_notification on notifications.json). + +Each test below fails against the pre-fix code for the RIGHT reason (observed +overlap / lost write), and passes once the lock is added. +""" +import threading +import time + +import codec_agent_plan +import codec_memory + + +# ── 1. SQLite connection serialization ────────────────────────────────────── +def test_concurrent_save_serializes_db_access(tmp_path): + # NOTE: a fully MOCK connection is used deliberately. Letting real + # concurrent execute() calls hit the same sqlite3 connection segfaults the + # interpreter (that crash IS the C5 bug) — which would kill the test runner + # rather than produce a clean assertion. The mock measures whether + # CodecMemory.save serializes access to the connection object, which is + # exactly what the RLock fix provides, without invoking the C layer. + mem = codec_memory.CodecMemory(db_path=str(tmp_path / "mem.db")) + + class _OverlapProbe: + """Stands in for the live connection. Flags any concurrent re-entry of + execute(); a widened window (sleep) makes overlap observable when the + caller is NOT serializing access.""" + + def __init__(self): + self._n = 0 + self.max_concurrent = 0 + self._rowid = 0 + self._cl = threading.Lock() + + def execute(self, *a, **k): + with self._cl: + self._n += 1 + self.max_concurrent = max(self.max_concurrent, self._n) + self._rowid += 1 + rowid = self._rowid + try: + time.sleep(0.003) + finally: + with self._cl: + self._n -= 1 + return type("_Cur", (), {"lastrowid": rowid})() + + def commit(self): + pass + + probe = _OverlapProbe() + mem._conn = probe # _get_conn() returns this since it's non-None + + def worker(i): + for j in range(4): + mem.save(f"s{i}", "user", f"m{i}-{j}") + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert probe.max_concurrent == 1, ( + f"CodecMemory.save overlapped on the shared connection " + f"(max concurrent execute={probe.max_concurrent}); not serialized (C5)" + ) + + +# ── 2. grants.json read-modify-write under file_lock ───────────────────────── +def test_concurrent_grant_permission_no_clobber(tmp_path, monkeypatch): + from routes.agents import GrantBody, grant_permission + + agents_dir = tmp_path / "agents" + monkeypatch.setattr(codec_agent_plan, "_AGENTS_DIR", agents_dir) + + # Widen the read-modify-write window so an unlocked path reliably clobbers. + real_save = codec_agent_plan.save_grants + + def slow_save(agent_id, grants): + time.sleep(0.02) + return real_save(agent_id, grants) + + monkeypatch.setattr(codec_agent_plan, "save_grants", slow_save) + + aid = "agent_test" + codec_agent_plan.save_manifest(aid, {"id": aid, "status": "running", "title": "t"}) + codec_agent_plan.save_grants( + aid, + {"skills": ["web_search"], "network_domains": [], "read_paths": [], "write_paths": []}, + ) + + n = 12 + barrier = threading.Barrier(n) + errors = [] + + def worker(i): + try: + barrier.wait() # maximize contention on the load->modify->save window + grant_permission( + aid, GrantBody(kind="network_domains", value=f"d{i}.example.com"), request=None + ) + except Exception as e: # noqa: BLE001 + errors.append(repr(e)) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"grant_permission raised: {errors}" + final = codec_agent_plan.load_grants(aid)["network_domains"] + assert sorted(final) == sorted(f"d{i}.example.com" for i in range(n)), ( + f"grants clobbered: expected {n} domains, got {len(final)}: {sorted(final)}" + ) + + +# ── 3. notifications.json write holds the cross-process file_lock ──────────── +def test_question_notification_uses_cross_process_file_lock(tmp_path, monkeypatch): + import codec_ask_user + + notifs_path = tmp_path / "notifications.json" + monkeypatch.setattr(codec_ask_user, "NOTIFICATIONS_PATH", notifs_path) + + locked_paths = [] + real_file_lock = codec_ask_user.codec_jsonstore.file_lock + + def spy_file_lock(path): + locked_paths.append(str(path)) + return real_file_lock(path) + + monkeypatch.setattr(codec_ask_user.codec_jsonstore, "file_lock", spy_file_lock) + + codec_ask_user._write_question_notification( + {"id": "q_abc", "question": "proceed?", "agent": None, "options": None, + "deadline": None, "consent_strict": False} + ) + + assert str(notifs_path) in locked_paths, ( + "_write_question_notification must hold codec_jsonstore.file_lock on " + "notifications.json across its read-modify-write (cross-process safety)" + ) diff --git a/tests/test_ws_auth.py b/tests/test_ws_auth.py new file mode 100644 index 0000000..22a29d4 --- /dev/null +++ b/tests/test_ws_auth.py @@ -0,0 +1,69 @@ +"""N1 (re-audit, Critical): the /ws/voice WebSocket must enforce the same auth +gate as HTTP endpoints. Starlette's BaseHTTPMiddleware (which AuthMiddleware +subclasses) never runs on the websocket scope, so the handshake was previously +unauthenticated — exposing the voice→skill pipeline (terminal/file_ops/pilot) +to anyone who can reach the host when the dashboard is exposed. + +We test the pure gate `routes.websocket._ws_authorized` against fake WS objects. +""" +from datetime import datetime + +import pytest + +import routes._shared as shared +import routes.websocket as ws + + +class _FakeWS: + def __init__(self, cookies=None, query=None, headers=None): + self.cookies = cookies or {} + self.query_params = query or {} + self.headers = headers or {} + + +@pytest.fixture(autouse=True) +def _reset_auth(monkeypatch): + # Default: nothing configured. Individual tests opt into token/biometric. + monkeypatch.setattr(shared, "AUTH_ENABLED", False, raising=False) + monkeypatch.setattr(shared, "_auth_available", lambda: False, raising=False) + monkeypatch.setattr("codec_config.get_dashboard_token", lambda: "", raising=False) + yield + + +def test_open_when_nothing_configured(): + # Loopback dev posture: no token, no biometric → allow (matches HTTP Layer 0). + assert ws._ws_authorized(_FakeWS()) is True + + +def test_biometric_enabled_rejects_without_session(monkeypatch): + monkeypatch.setattr(shared, "AUTH_ENABLED", True, raising=False) + monkeypatch.setattr(shared, "_auth_available", lambda: True, raising=False) + assert ws._ws_authorized(_FakeWS()) is False + + +def test_biometric_enabled_accepts_valid_session_cookie(monkeypatch): + monkeypatch.setattr(shared, "AUTH_ENABLED", True, raising=False) + monkeypatch.setattr(shared, "_auth_available", lambda: True, raising=False) + monkeypatch.setattr(shared, "_is_totp_enabled", lambda: False, raising=False) + tok = "sess_valid" + monkeypatch.setitem(shared._auth_sessions, tok, {"created": datetime.now()}) + cookie = _FakeWS(cookies={shared.AUTH_COOKIE_NAME: tok}) + assert ws._ws_authorized(cookie) is True + + +def test_biometric_enabled_rejects_when_totp_unverified(monkeypatch): + monkeypatch.setattr(shared, "AUTH_ENABLED", True, raising=False) + monkeypatch.setattr(shared, "_auth_available", lambda: True, raising=False) + monkeypatch.setattr(shared, "_is_totp_enabled", lambda: True, raising=False) + tok = "sess_no_totp" + monkeypatch.setitem(shared._auth_sessions, tok, {"created": datetime.now()}) # no totp_verified + assert ws._ws_authorized(_FakeWS(cookies={shared.AUTH_COOKIE_NAME: tok})) is False + + +def test_token_configured_requires_match(monkeypatch): + monkeypatch.setattr("codec_config.get_dashboard_token", lambda: "secrettoken", raising=False) + assert ws._ws_authorized(_FakeWS()) is False + assert ws._ws_authorized(_FakeWS(query={"token": "wrong"})) is False + assert ws._ws_authorized(_FakeWS(query={"token": "secrettoken"})) is True + # also via Authorization header + assert ws._ws_authorized(_FakeWS(headers={"authorization": "Bearer secrettoken"})) is True