Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions run_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,44 @@ def _run_codex_stream(self, api_kwargs: dict, client: Any = None):
)
return self._run_codex_create_stream_fallback(api_kwargs, client=active_client)
raise
except TypeError as exc:
# The OpenAI SDK's responses.stream() accumulator calls
# parse_response() on every snapshot event, which iterates
# `response.output`. The ChatGPT Codex backend
# (chatgpt.com/backend-api/codex) emits snapshot events with
# `output=None`, so the accumulator raises
# "'NoneType' object is not iterable" on the first event.
# Fall back to manual event iteration, which never invokes the
# accumulator and rebuilds output from output_item.done events.
logger.debug(
"Responses stream accumulator failed (TypeError: %s); falling "
"back to manual create(stream=True) iteration. %s",
exc,
self._client_log_context(),
)
return self._run_codex_create_stream_fallback(api_kwargs, client=active_client)

@staticmethod
def _reconstruct_codex_output(response: Any, collected_items: list) -> None:
"""Populate ``response.output`` from streamed ``output_item.done`` items.

The ChatGPT Codex Responses backend omits the consolidated ``output``
list on terminal stream events, so downstream ``.output`` /
``.output_text`` access raises "'NoneType' object is not iterable".
Rebuild it from the individual output items captured during streaming.
"""
if not collected_items:
return
try:
current = getattr(response, "output", None)
except Exception:
current = None
if current:
return
try:
response.output = list(collected_items)
except Exception:
pass

def _run_codex_create_stream_fallback(self, api_kwargs: dict, client: Any = None):
"""Fallback path for stream completion edge cases on Codex-style Responses backends."""
Expand All @@ -2248,18 +2286,33 @@ def _run_codex_create_stream_fallback(self, api_kwargs: dict, client: Any = None
return stream_or_response

terminal_response = None
collected_items: list = []
try:
for event in stream_or_response:
event_type = getattr(event, "type", None)
if not event_type and isinstance(event, dict):
event_type = event.get("type")

# The Codex backend never populates the consolidated `output`
# list on the terminal response; completed output items arrive
# only via `response.output_item.done` events. Capture them so
# `_reconstruct_codex_output` can rebuild `response.output`.
if event_type == "response.output_item.done":
item = getattr(event, "item", None)
if item is None and isinstance(event, dict):
item = event.get("item")
if item is not None:
collected_items.append(item)
continue

if event_type not in {"response.completed", "response.incomplete", "response.failed"}:
continue

terminal_response = getattr(event, "response", None)
if terminal_response is None and isinstance(event, dict):
terminal_response = event.get("response")
if terminal_response is not None:
self._reconstruct_codex_output(terminal_response, collected_items)
return terminal_response
finally:
close_fn = getattr(stream_or_response, "close", None)
Expand All @@ -2270,6 +2323,7 @@ def _run_codex_create_stream_fallback(self, api_kwargs: dict, client: Any = None
pass

if terminal_response is not None:
self._reconstruct_codex_output(terminal_response, collected_items)
return terminal_response
raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.")

Expand Down