Skip to content

Conversation

@kausmeows
Copy link
Contributor

@kausmeows kausmeows commented Nov 16, 2025

What does this PR do?

  • Adds a workflow wrapper for instrumenting Agno's Workflows.
  • For the following kind of workflow config-
basic_workflow = Workflow(
    name="Basic Linear Workflow",
    description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
    steps=[
        research_step,
        summarize_step,
        Condition(
            name="fact_check_condition",
            description="Check if fact-checking is needed",
            evaluator=needs_fact_checking,
            steps=[fact_check_step],
        ),
        write_article,
    ],
)
  • We get the structure like this-
image

Note

Add Agno workflow/step/parallel instrumentation with context propagation and graph-node spans, plus examples and tests.

  • Instrumentation (Agno):
    • Add wrappers for Workflow.run/arun, Step.execute/execute_stream/aexecute/aexecute_stream, and Parallel.execute/execute_stream/aexecute/aexecute_stream with context propagation, graph node attributes, and streaming support.
    • Extract shared helpers to openinference.instrumentation.agno.utils and refactor _RunWrapper to use them; minor adjustments to streaming output handling.
  • Examples:
    • Add workflow samples: workflow_with_condition_step.py, workflow_with_custom_function_step.py, workflow_with_parallel_step.py, workflow_with_structure_io.py.
    • Update examples/requirements.txt (upgrade agno, add arize-phoenix, openai).
  • Tests:
    • Add test_workflow_instrumentation.py covering basic, multi-step, async, conditional, and graph hierarchy cases; minor fix in existing tests.
  • Project config:
    • Add fastapi to test dependencies.

Written by Cursor Bugbot for commit edd117d. This will update automatically on new commits. Configure here.

@kausmeows kausmeows requested a review from a team as a code owner November 16, 2025 10:18
@dosubot dosubot bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Nov 16, 2025
result = wrapped(*args, **kwargs)

# Check if result is an iterator (streaming)
is_streaming = hasattr(result, "__iter__") and not isinstance(result, (str, bytes))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Non-streaming iterables incorrectly detected as streams

The streaming detection uses hasattr(result, "__iter__") which matches any iterable including lists, tuples, dicts, and sets. While it excludes strings and bytes, it doesn't exclude other non-streaming collections. If a step returns a list or dict, it will be incorrectly treated as a stream, causing the wrong instrumentation path and potentially incorrect span attributes. The check should use isinstance(result, Iterator) from the typing module to properly detect actual iterators.

Fix in Cursor Fix in Web

@kausmeows kausmeows changed the title feat(agno): [WIP] add workflow wrapper for instrumentation feat(agno): add workflow wrapper for instrumentation Nov 30, 2025
return str(response.content)
else:
return str(response.model_dump_json())
return ""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unchecked method call may raise AttributeError

The _extract_output function assumes that any response without a content attribute has a model_dump_json method (Pydantic models). If a response object has neither attribute, calling response.model_dump_json() raises an AttributeError. Since this instrumentation code wraps user workflows, unexpected response types could crash the instrumentation and potentially disrupt the user's workflow execution. The function also has unreachable code at line 101 since all prior branches return.

Fix in Cursor Fix in Web

# Check if result is an async iterator (streaming)
if hasattr(result, "__aiter__"):
# Streaming mode - return async generator that continues with this span
return self._arun_stream_continue(result, span, workflow_token, instance)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async methods missing error handling for span cleanup

The arun methods in _WorkflowWrapper, _StepWrapper, and aexecute in _ParallelWrapper lack try/except/finally around the initial wrapped(*args, **kwargs) call. Unlike their sync counterparts which have proper exception handling, these async methods start a span but if wrapped() raises a synchronous exception before returning a coroutine, the span is never ended and the error is not recorded. This causes orphaned spans in the tracing system.

Additional Locations (2)

Fix in Cursor Fix in Web

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Dec 8, 2025
@caroger caroger merged commit 89699cd into Arize-ai:main Dec 8, 2025
11 checks passed
@github-project-automation github-project-automation bot moved this from In Review to Done in Instrumentation Dec 8, 2025
@github-actions github-actions bot mentioned this pull request Dec 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

lgtm This PR has been approved by a maintainer size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

3 participants