Skip to content
Closed
15 changes: 15 additions & 0 deletions codec_agent_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,21 @@ def set_grants_hash(agent_id: str) -> str:
return h


def grants_lock(agent_id: str):
"""C5 (Fix #5): cross-process flock around the per-agent grants.json
read-modify-write (mirrors _status_lock for the manifest CAS). The /grant
endpoint holds this across load_grants -> modify -> save_grants ->
set_grants_hash so two concurrent grants can't clobber each other. Falls
back to a no-op context if codec_jsonstore is unavailable (headless/CI) —
never breaks grant_permission."""
try:
import codec_jsonstore
return codec_jsonstore.file_lock(_agent_dir(agent_id) / "grants.json")
except Exception:
import contextlib
return contextlib.nullcontext()


# ── Skill-registry validation ─────────────────────────────────────────────────
def validate_plan_skills(plan: Plan, registry=None) -> Tuple[bool, List[str]]:
"""Walk every checkpoint's skills_needed; return (ok, missing_skills).
Expand Down
8 changes: 8 additions & 0 deletions codec_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ def _web_search(query: str) -> str:

def _web_fetch(url: str) -> str:
try:
# Fix #7 (H1): SSRF guard BEFORE the request. The fetched text is
# returned to the agent/LLM, so a read of an internal/metadata host is
# an exfil path even though the crew tool has no explicit egress allow.
import codec_ssrf
try:
codec_ssrf.validate_url(url.strip())
except codec_ssrf.SSRFError as e:
return f"Fetch error: blocked URL ({e})"
r = _sync_http.get(url.strip())
if r.status_code in (401, 403):
return f"Blocked by site (HTTP {r.status_code}). Site requires JavaScript or blocks automated access."
Expand Down
12 changes: 10 additions & 2 deletions codec_ask_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ def _write_question_notification(record: dict) -> None:
surface; failures here log + continue.
"""
try:
with _FILE_LOCK:
# C5 (Fix #5): hold the cross-process file_lock across the whole
# read-modify-write so concurrent daemons (dashboard / voice /
# agent-runner) can't clobber each other's append. _FILE_LOCK is the
# in-process guard; codec_jsonstore.file_lock is the cross-process one —
# same pairing the PENDING_QUESTIONS read-modify-write already uses.
with _FILE_LOCK, codec_jsonstore.file_lock(NOTIFICATIONS_PATH):
try:
with open(NOTIFICATIONS_PATH) as f:
notifs = json.load(f)
Expand Down Expand Up @@ -648,7 +653,10 @@ def submit_answer(qid: str, answer: str, *, answered_via: str = "pwa") -> dict:

# Mark the matching notification as read.
try:
with _FILE_LOCK:
# C5 (Fix #5): same cross-process file_lock as the question-write path
# above, so a concurrent _write_question_notification and this
# mark-read can't clobber each other on notifications.json.
with _FILE_LOCK, codec_jsonstore.file_lock(NOTIFICATIONS_PATH):
try:
with open(NOTIFICATIONS_PATH) as f:
notifs = json.load(f)
Expand Down
72 changes: 72 additions & 0 deletions codec_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""codec_concurrency — small, dependency-free concurrency helpers.

`run_with_timeout` runs a callable in a daemon thread with a hard wall-clock
timeout that ACTUALLY bounds wall-clock time.

Motivation (audit C4): the common idiom

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(fn)
return fut.result(timeout=T)

defeats its own timeout. When `fut.result(timeout=T)` raises TimeoutError,
the `with` block's __exit__ calls `executor.shutdown(wait=True)`, which BLOCKS
until the runaway task finishes. So a 50ms "timeout" on a 5s task takes ~5s —
the MCP tool dispatch (codec_mcp) and the observer OCR call (codec_observer)
could hang on a slow skill / screencapture popup.

This helper uses a daemon thread + `join(timeout=...)` and never calls
shutdown(wait=True): on timeout it abandons the still-running worker and
raises promptly. daemon=True means an abandoned worker never blocks process
shutdown. Same shape as the proven `codec_hooks._run_hook_with_timeout`.
"""
import queue
import threading
from typing import Any, Callable

__all__ = ["run_with_timeout"]


def run_with_timeout(
fn: Callable[..., Any],
timeout: float,
*args: Any,
**kwargs: Any,
) -> Any:
"""Run ``fn(*args, **kwargs)`` in a daemon thread, bounded by ``timeout`` seconds.

Returns ``fn``'s return value on success. Re-raises, in the calling thread,
any exception ``fn`` raised (original type, message, and instance preserved).
Raises ``TimeoutError`` if ``fn`` does not complete within ``timeout`` —
promptly, without waiting for the (abandoned, still-running) worker to finish.

On Python 3.11+ ``concurrent.futures.TimeoutError`` is an alias of the
builtin ``TimeoutError``, so call sites catching either are satisfied.
"""
result_q: "queue.Queue[Any]" = queue.Queue(maxsize=1)
exc_q: "queue.Queue[BaseException]" = queue.Queue(maxsize=1)

def _runner() -> None:
try:
result_q.put(fn(*args, **kwargs))
except BaseException as e: # noqa: BLE001 — propagate ANY error to the caller
try:
exc_q.put(e)
except Exception:
pass

t = threading.Thread(target=_runner, daemon=True, name="codec-run-with-timeout")
t.start()
t.join(timeout=timeout)

if t.is_alive():
# Abandon the worker — daemon=True so it never blocks shutdown. No
# shutdown(wait=True), so we return control to the caller immediately.
raise TimeoutError(f"operation exceeded {timeout}s timeout")
if not exc_q.empty():
raise exc_q.get_nowait()
if not result_q.empty():
return result_q.get_nowait()
# Thread finished without putting a result or exception — only reachable if
# the result put itself failed. Treat as a None return.
return None
233 changes: 121 additions & 112 deletions codec_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,118 @@ def _should_escalate_to_project(user_text: str, session_id: str) -> dict:
}


def _chat_vision_response(body: dict, messages: list):
"""If the request carries images, route to the vision model and return the
response dict; else return None. Fix #8 (intra-file CC reduction):
extracted verbatim from chat_completion, behavior-preserving. The inline
vision POST is an A-11-pending site and stays in codec_dashboard."""
images = body.get("images", [])
if not images:
return None
import requests as rq2
config2 = {}
try:
with open(CONFIG_PATH) as f:
config2 = json.load(f)
except (OSError, json.JSONDecodeError) as e:
log.warning(f"Config read failed; proceeding without overrides: {e}")
vision_url = config2.get("vision_base_url", "http://localhost:8083/v1")
vision_model = config2.get("vision_model", "mlx-community/Qwen2.5-VL-7B-Instruct-4bit")
# Build multimodal message: last user text + all images
last_text = ""
for m in reversed(messages):
if m.get("role") == "user" and isinstance(m.get("content"), str):
last_text = m["content"]
break
if not last_text:
last_text = "Describe and analyze this image in detail."
mm_content = []
for img_b64 in images:
mm_content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}})
mm_content.append({"type": "text", "text": last_text})
v_payload = {
"model": vision_model,
"messages": [{"role": "user", "content": mm_content}],
"max_tokens": 4000,
"temperature": 0.7
}
vr = rq2.post(f"{vision_url}/chat/completions", json=v_payload, headers={"Content-Type": "application/json"}, timeout=120)
vdata = vr.json()
vanswer = vdata["choices"][0]["message"]["content"].strip()
import re as re2
vanswer = re2.sub(r'<think>[\s\S]*?</think>', '', vanswer).strip()
return {"response": vanswer, "model": vision_model}


def _build_chat_system_prompt(config: dict, budget, has_attachment: bool,
last_user_text: str) -> str:
"""Build the chat system prompt: override-aware base + per-turn step-budget
warnings + attachment / content-rewrite / observer-injection suffixes.

Fix #8 (intra-file CC reduction): extracted verbatim from chat_completion;
behavior-preserving. `budget` is mutated exactly as before — warn_now() and
consume('llm_call') happen here, once, where they ran inline.
"""
from datetime import datetime as _dt
_overrides = _load_prompt_overrides()
_chat_prompt = _overrides.get("chat", CHAT_SYSTEM_PROMPT)
sys_prompt = _chat_prompt.format(date=_dt.now().strftime("%A, %B %d, %Y"))
if budget.warn_now():
sys_prompt += (
"\n\n⚠ 1 step remaining in this turn. Wrap up — do NOT "
"emit additional [SKILL:...] tags."
)
budget.consume("llm_call")
if budget.at_limit():
sys_prompt += (
"\n\n## Step Budget Exhausted\n"
"You've hit the per-turn step budget. Summarize what you "
"accomplished and any blockers in one short paragraph. "
"DO NOT emit [SKILL:...] tags or call additional tools."
)
if has_attachment:
sys_prompt += (
"\n\n## This Turn\n"
"The user has attached a file or image and its content is already "
"embedded in their message between [IMAGE ANALYSIS]/[DOCUMENT] markers. "
"Respond conversationally about the attached content. "
"DO NOT emit [SKILL:...] tool-calling tags in this response."
)
_u_text_lower = (last_user_text or "").lower()
_content_rewrite_intent = any(
kw in _u_text_lower for kw in (
"format my email", "format this email", "format my message",
"reformat", "rewrite", "reword", "redraft", "polish",
"proofread", "edit my email", "fix my email", "fix the grammar",
"make this sound", "translate this", "translate the following",
"draft a reply", "draft an email", "draft this",
)
)
if _content_rewrite_intent:
sys_prompt += (
"\n\n## This Turn\n"
"The user is asking you to generate or rewrite text directly "
"(format/edit/draft/translate/polish their email or message). "
"Respond with the rewritten content as plain prose. "
"DO NOT emit [SKILL:...] tool-calling tags in this response — "
"the answer IS the rewritten text, no tools needed."
)
try:
from codec_observer import maybe_inject_observation_summary
_obs_transport = "local" if "localhost" in (config.get("llm_base_url") or "") else "chat"
_obs_summary, _obs_reason = maybe_inject_observation_summary(
user_prompt=last_user_text or "",
transport=_obs_transport,
skill_name=None, # post-LLM tag path, no skill resolved yet
skill_module=None,
)
if _obs_summary:
sys_prompt += f"\n\n{_obs_summary}"
except Exception as _e:
log.debug(f"[observer] injection failed (non-fatal): {_e}")
return sys_prompt


@app.post("/api/chat")
async def chat_completion(request: Request):
"""Direct LLM chat with full context window + tool calling"""
Expand Down Expand Up @@ -2730,41 +2842,10 @@ async def _skill_stream():
else:
return {"response": f"**⚡ {skill_name}**: {skill_result}", "skill": skill_name}

# Check for images — route to vision model
images = body.get("images", [])
if images:
import requests as rq2
config2 = {}
try:
with open(CONFIG_PATH) as f: config2 = json.load(f)
except (OSError, json.JSONDecodeError) as e:
log.warning(f"Config read failed; proceeding without overrides: {e}")
vision_url = config2.get("vision_base_url", "http://localhost:8083/v1")
vision_model = config2.get("vision_model", "mlx-community/Qwen2.5-VL-7B-Instruct-4bit")
# Build multimodal message: last user text + all images
last_text = ""
for m in reversed(messages):
if m.get("role") == "user" and isinstance(m.get("content"), str):
last_text = m["content"]
break
if not last_text:
last_text = "Describe and analyze this image in detail."
mm_content = []
for img_b64 in images:
mm_content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}})
mm_content.append({"type": "text", "text": last_text})
v_payload = {
"model": vision_model,
"messages": [{"role": "user", "content": mm_content}],
"max_tokens": 4000,
"temperature": 0.7
}
vr = rq2.post(f"{vision_url}/chat/completions", json=v_payload, headers={"Content-Type": "application/json"}, timeout=120)
vdata = vr.json()
vanswer = vdata["choices"][0]["message"]["content"].strip()
import re as re2
vanswer = re2.sub(r'<think>[\s\S]*?</think>', '', vanswer).strip()
return {"response": vanswer, "model": vision_model}
# Check for images — route to vision model (extracted, Fix #8)
vision_resp = _chat_vision_response(body, messages)
if vision_resp is not None:
return vision_resp

try:
config = {}
Expand All @@ -2783,84 +2864,12 @@ async def _skill_stream():
force_search = body.get("force_search", False)
messages = _enrich_messages(messages, config, force_search=bool(force_search))

# Inject CODEC system prompt (use override if user edited it)
from datetime import datetime as _dt
_overrides = _load_prompt_overrides()
_chat_prompt = _overrides.get("chat", CHAT_SYSTEM_PROMPT)
sys_prompt = _chat_prompt.format(date=_dt.now().strftime("%A, %B %d, %Y"))
# Phase 1 Step 3 §3 — consume one step for the LLM call itself.
# If we're now at limit-1, append the "1 step remaining" warning
# to the system prompt so the LLM wraps up. If we're already
# exhausted, switch to forced-summary mode.
if _budget.warn_now():
sys_prompt += (
"\n\n⚠ 1 step remaining in this turn. Wrap up — do NOT "
"emit additional [SKILL:...] tags."
)
_budget.consume("llm_call")
if _budget.at_limit():
sys_prompt += (
"\n\n## Step Budget Exhausted\n"
"You've hit the per-turn step budget. Summarize what you "
"accomplished and any blockers in one short paragraph. "
"DO NOT emit [SKILL:...] tags or call additional tools."
)
# Bugfix 2026-04-16: when the user attaches a file/image, the default
# system prompt still teaches the LLM to emit [SKILL:...] tags, which
# then leak through the streaming path. Force conversational mode for
# this turn so the LLM analyses the attached content directly.
if has_attachment:
sys_prompt += (
"\n\n## This Turn\n"
"The user has attached a file or image and its content is already "
"embedded in their message between [IMAGE ANALYSIS]/[DOCUMENT] markers. "
"Respond conversationally about the attached content. "
"DO NOT emit [SKILL:...] tool-calling tags in this response."
)

# Bugfix 2026-04-27: detect content-rewriting intents (format/draft/
# rewrite/reword/proofread an email/message/text) — these are pure-text
# generation tasks, NOT skill calls. Past failure: LLM emitted
# [SKILL:translate:<email body>] for "format my email", which ran the
# translate skill on the email and returned "Translation failed."
_u_text_lower = (last_user_text or "").lower()
_content_rewrite_intent = any(
kw in _u_text_lower for kw in (
"format my email", "format this email", "format my message",
"reformat", "rewrite", "reword", "redraft", "polish",
"proofread", "edit my email", "fix my email", "fix the grammar",
"make this sound", "translate this", "translate the following",
"draft a reply", "draft an email", "draft this",
)
# Build the system prompt (override + step-budget + attachment /
# content-rewrite / observer suffixes) — extracted to a helper for
# readability (Fix #8). Consumes the llm_call step budget internally.
sys_prompt = _build_chat_system_prompt(
config, _budget, has_attachment, last_user_text
)
if _content_rewrite_intent:
sys_prompt += (
"\n\n## This Turn\n"
"The user is asking you to generate or rewrite text directly "
"(format/edit/draft/translate/polish their email or message). "
"Respond with the rewritten content as plain prose. "
"DO NOT emit [SKILL:...] tool-calling tags in this response — "
"the answer IS the rewritten text, no tools needed."
)
# Phase 2 Step 5 — Observer summary injection (gated per §X).
# Local Qwen always injects; cloud transports (this chat path uses
# local-by-default but may be cloud-routed by the user — pass the
# detected transport tag) gate on possessive / continuation /
# skill-flag patterns. Returns (summary_or_None, reason); audit
# emit fires inside the helper ONLY when summary non-None.
try:
from codec_observer import maybe_inject_observation_summary
_obs_transport = "local" if "localhost" in (config.get("llm_base_url") or "") else "chat"
_obs_summary, _obs_reason = maybe_inject_observation_summary(
user_prompt=last_user_text or "",
transport=_obs_transport,
skill_name=None, # post-LLM tag path, no skill resolved yet
skill_module=None,
)
if _obs_summary:
sys_prompt += f"\n\n{_obs_summary}"
except Exception as _e:
log.debug(f"[observer] injection failed (non-fatal): {_e}")

# Prepend system message (or replace existing one)
if messages and messages[0].get("role") == "system":
Expand Down
Loading
Loading