Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 18, 2025

📄 33% (0.33x) speedup for AsyncOperationPool._get_operation in skyvern/forge/async_operations.py

⏱️ Runtime : 73.7 microseconds 55.3 microseconds (best of 250 runs)

📝 Explanation and details

The optimization replaces exception-based dictionary access with defensive .get() calls, eliminating the overhead of Python's exception handling mechanism.

Key Changes:

  • Avoided exception overhead: The original code used try/except KeyError which creates expensive exception objects when keys don't exist. The optimized version uses dict.get(key) which returns None for missing keys without exceptions.
  • Two-step defensive lookup: Instead of self._operations[task_id][agent_phase] (which can raise KeyError twice), the code first gets the task's operations with .get(task_id), then gets the specific phase with .get(agent_phase).

Why This is Faster:
Python exception handling involves significant overhead - creating exception objects, unwinding the call stack, and executing exception handlers. The profiler shows 156 KeyError exceptions out of 229 calls (68% miss rate), meaning most lookups were triggering this expensive path. Dictionary .get() operations are much faster as they're implemented in C and avoid the exception machinery entirely.

Performance Impact:
The test results show this optimization is particularly effective for "not found" scenarios:

  • Empty operations dict: 69.4% faster (693ns → 409ns)
  • Missing task_id: 60-80% faster in various test cases
  • Missing agent_phase: 36-53% faster
  • Found cases: Slight overhead (5-12% slower) due to additional variable assignment and conditional check

Best Use Cases:
This optimization excels when the AsyncOperationPool frequently queries for non-existent operations, which appears common in this codebase given the high miss rate in profiling. The tradeoff of slightly slower "found" cases for dramatically faster "not found" cases results in a net 33% speedup overall.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 259 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio
from enum import Enum, auto

# imports
import pytest  # used for our unit tests
from skyvern.forge.async_operations import AsyncOperationPool

# --- Mocked dependencies for the test environment ---

class AgentPhase(Enum):
    INIT = auto()
    RUNNING = auto()
    FINISHED = auto()
    FAILED = auto()
    # Add more as needed for test coverage

class AsyncOperation:
    def __init__(self, name):
        self.name = name
    def __repr__(self):
        return f"AsyncOperation({self.name!r})"
from skyvern.forge.async_operations import AsyncOperationPool

# BASIC TEST CASES

def test_get_operation_returns_operation_when_exists():
    """Test that _get_operation returns the correct operation when present."""
    pool = AsyncOperationPool()
    op = AsyncOperation("op1")
    AsyncOperationPool._operations = {"task1": {AgentPhase.INIT: op}}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT); result = codeflash_output # 571ns -> 644ns (11.3% slower)

def test_get_operation_returns_none_when_task_id_missing():
    """Test that _get_operation returns None if task_id is not in _operations."""
    pool = AsyncOperationPool()
    AsyncOperationPool._operations = {"task2": {AgentPhase.INIT: AsyncOperation("op2")}}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT); result = codeflash_output # 712ns -> 445ns (60.0% faster)

def test_get_operation_returns_none_when_agent_phase_missing():
    """Test that _get_operation returns None if agent_phase is not present for the task_id."""
    pool = AsyncOperationPool()
    AsyncOperationPool._operations = {"task1": {AgentPhase.RUNNING: AsyncOperation("op3")}}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT); result = codeflash_output # 870ns -> 638ns (36.4% faster)

def test_get_operation_with_multiple_phases():
    """Test that _get_operation returns correct operation for each phase."""
    pool = AsyncOperationPool()
    op_init = AsyncOperation("init")
    op_run = AsyncOperation("run")
    AsyncOperationPool._operations = {
        "task1": {AgentPhase.INIT: op_init, AgentPhase.RUNNING: op_run}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 585ns -> 624ns (6.25% slower)
    codeflash_output = pool._get_operation("task1", AgentPhase.RUNNING) # 255ns -> 271ns (5.90% slower)

def test_get_operation_with_multiple_tasks():
    """Test that _get_operation returns correct operation for each task."""
    pool = AsyncOperationPool()
    op1 = AsyncOperation("op1")
    op2 = AsyncOperation("op2")
    AsyncOperationPool._operations = {
        "task1": {AgentPhase.INIT: op1},
        "task2": {AgentPhase.INIT: op2}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 550ns -> 605ns (9.09% slower)
    codeflash_output = pool._get_operation("task2", AgentPhase.INIT) # 257ns -> 258ns (0.388% slower)

# EDGE TEST CASES

def test_get_operation_with_empty_operations_dict():
    """Test that _get_operation returns None if _operations is empty."""
    pool = AsyncOperationPool()
    AsyncOperationPool._operations = {}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT); result = codeflash_output # 730ns -> 405ns (80.2% faster)

def test_get_operation_with_empty_agent_phase_dict():
    """Test that _get_operation returns None if task_id exists but has empty agent_phase dict."""
    pool = AsyncOperationPool()
    AsyncOperationPool._operations = {"task1": {}}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT); result = codeflash_output # 937ns -> 650ns (44.2% faster)

def test_get_operation_with_unusual_task_id():
    """Test with unusual task_id (empty string, special chars)."""
    pool = AsyncOperationPool()
    op = AsyncOperation("special")
    AsyncOperationPool._operations = {"": {AgentPhase.INIT: op}, "!@#$": {AgentPhase.FINISHED: op}}
    codeflash_output = pool._get_operation("", AgentPhase.INIT) # 555ns -> 597ns (7.04% slower)
    codeflash_output = pool._get_operation("!@#$", AgentPhase.FINISHED) # 413ns -> 440ns (6.14% slower)
    codeflash_output = pool._get_operation("!@#$", AgentPhase.INIT) # 547ns -> 241ns (127% faster)

def test_get_operation_with_non_enum_agent_phase():
    """Test that _get_operation returns None if agent_phase is not an AgentPhase."""
    pool = AsyncOperationPool()
    op = AsyncOperation("op")
    AsyncOperationPool._operations = {"task1": {AgentPhase.INIT: op}}
    codeflash_output = pool._get_operation("task1", "INIT"); result = codeflash_output # 895ns -> 634ns (41.2% faster)

def test_get_operation_with_none_arguments():
    """Test that _get_operation returns None if passed None as arguments."""
    pool = AsyncOperationPool()
    AsyncOperationPool._operations = {None: {AgentPhase.INIT: AsyncOperation("op")}}
    codeflash_output = pool._get_operation(None, AgentPhase.INIT); result = codeflash_output # 577ns -> 629ns (8.27% slower)
    codeflash_output = pool._get_operation("task1", None); result2 = codeflash_output # 600ns -> 322ns (86.3% faster)

def test_get_operation_with_duplicate_operations():
    """Test that _get_operation can handle two phases with the same operation object."""
    pool = AsyncOperationPool()
    op = AsyncOperation("shared")
    AsyncOperationPool._operations = {
        "task1": {AgentPhase.INIT: op, AgentPhase.FINISHED: op}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 520ns -> 570ns (8.77% slower)
    codeflash_output = pool._get_operation("task1", AgentPhase.FINISHED) # 243ns -> 251ns (3.19% slower)

# LARGE SCALE TEST CASES

def test_get_operation_large_number_of_tasks_and_phases():
    """Test _get_operation performance and correctness with many tasks and phases."""
    pool = AsyncOperationPool()
    num_tasks = 500
    num_phases = len(AgentPhase)
    # Create operations for each task and phase
    AsyncOperationPool._operations = {
        f"task{i}": {phase: AsyncOperation(f"op{i}_{phase.name}") for phase in AgentPhase}
        for i in range(num_tasks)
    }
    # Check a subset of the operations
    for i in range(0, num_tasks, 100):
        for phase in AgentPhase:
            expected = AsyncOperationPool._operations[f"task{i}"][phase]
            codeflash_output = pool._get_operation(f"task{i}", phase); result = codeflash_output

def test_get_operation_performance_with_sparse_and_dense_data():
    """Test _get_operation with both sparse and dense _operations dicts."""
    pool = AsyncOperationPool()
    # Sparse: only a few entries
    AsyncOperationPool._operations = {f"task{i}": {AgentPhase.INIT: AsyncOperation(f"op{i}")} for i in range(5)}
    for i in range(5):
        codeflash_output = pool._get_operation(f"task{i}", AgentPhase.INIT) # 1.76μs -> 1.82μs (2.97% slower)
        codeflash_output = pool._get_operation(f"task{i}", AgentPhase.RUNNING)
    # Dense: every task has every phase
    AsyncOperationPool._operations = {
        f"task{i}": {phase: AsyncOperation(f"op{i}_{phase.name}") for phase in AgentPhase}
        for i in range(50)
    }
    for i in range(0, 50, 10):
        for phase in AgentPhase:
            codeflash_output = pool._get_operation(f"task{i}", phase)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import asyncio
from enum import Enum, auto

# imports
import pytest  # used for our unit tests
from skyvern.forge.async_operations import AsyncOperationPool

# --- Supporting classes for the function under test ---

# AgentPhase is an enum representing different phases of an agent's lifecycle
class AgentPhase(Enum):
    INIT = auto()
    RUNNING = auto()
    FINISHED = auto()
    ERROR = auto()
    CUSTOM = auto()

# AsyncOperation is a stand-in for whatever async operation object is used in the pool
class AsyncOperation:
    def __init__(self, name):
        self.name = name

    def __eq__(self, other):
        return isinstance(other, AsyncOperation) and self.name == other.name

    def __repr__(self):
        return f"AsyncOperation({self.name!r})"
from skyvern.forge.async_operations import AsyncOperationPool

# --- Unit tests ---
# Helper fixture to create a fresh pool for each test
@pytest.fixture
def pool():
    # Reset class-level dicts for isolation
    AsyncOperationPool._operations = {}
    AsyncOperationPool._aio_tasks = {}
    return AsyncOperationPool()

# 1. BASIC TEST CASES

def test_get_operation_basic_found(pool):
    # Test: operation exists for given task_id and agent_phase
    task_id = "task1"
    phase = AgentPhase.INIT
    op = AsyncOperation("op1")
    pool._operations[task_id] = {phase: op}
    codeflash_output = pool._get_operation(task_id, phase); result = codeflash_output # 600ns -> 667ns (10.0% slower)

def test_get_operation_basic_not_found_task(pool):
    # Test: task_id does not exist
    codeflash_output = pool._get_operation("unknown_task", AgentPhase.INIT); result = codeflash_output # 802ns -> 481ns (66.7% faster)

def test_get_operation_basic_not_found_phase(pool):
    # Test: agent_phase does not exist for known task_id
    task_id = "task2"
    pool._operations[task_id] = {AgentPhase.RUNNING: AsyncOperation("op2")}
    codeflash_output = pool._get_operation(task_id, AgentPhase.FINISHED); result = codeflash_output # 949ns -> 680ns (39.6% faster)

def test_get_operation_basic_multiple_phases(pool):
    # Test: multiple phases for one task_id
    task_id = "task3"
    ops = {AgentPhase.INIT: AsyncOperation("opA"),
           AgentPhase.RUNNING: AsyncOperation("opB")}
    pool._operations[task_id] = ops
    codeflash_output = pool._get_operation(task_id, AgentPhase.INIT) # 564ns -> 643ns (12.3% slower)
    codeflash_output = pool._get_operation(task_id, AgentPhase.RUNNING) # 244ns -> 270ns (9.63% slower)

def test_get_operation_basic_multiple_tasks(pool):
    # Test: multiple task_ids, each with phases
    pool._operations["taskA"] = {AgentPhase.INIT: AsyncOperation("opA")}
    pool._operations["taskB"] = {AgentPhase.FINISHED: AsyncOperation("opB")}
    codeflash_output = pool._get_operation("taskA", AgentPhase.INIT).name # 536ns -> 585ns (8.38% slower)
    codeflash_output = pool._get_operation("taskB", AgentPhase.FINISHED).name # 225ns -> 270ns (16.7% slower)
    codeflash_output = pool._get_operation("taskA", AgentPhase.FINISHED) # 588ns -> 275ns (114% faster)
    codeflash_output = pool._get_operation("taskB", AgentPhase.INIT) # 298ns -> 203ns (46.8% faster)

# 2. EDGE TEST CASES

def test_get_operation_edge_empty_operations(pool):
    # Test: _operations dict is completely empty
    codeflash_output = pool._get_operation("any_task", AgentPhase.INIT) # 693ns -> 409ns (69.4% faster)

def test_get_operation_edge_empty_phases_dict(pool):
    # Test: _operations contains task_id but its phases dict is empty
    pool._operations["taskX"] = {}
    codeflash_output = pool._get_operation("taskX", AgentPhase.INIT) # 886ns -> 646ns (37.2% faster)

def test_get_operation_edge_non_string_task_id(pool):
    # Test: non-string task_id (should be handled gracefully)
    pool._operations["123"] = {AgentPhase.INIT: AsyncOperation("opNum")}
    codeflash_output = pool._get_operation(123, AgentPhase.INIT); result = codeflash_output # 757ns -> 487ns (55.4% faster)

def test_get_operation_edge_custom_agent_phase(pool):
    # Test: custom AgentPhase value not present
    pool._operations["taskY"] = {AgentPhase.INIT: AsyncOperation("opY")}
    codeflash_output = pool._get_operation("taskY", AgentPhase.CUSTOM); result = codeflash_output # 952ns -> 620ns (53.5% faster)

def test_get_operation_edge_similar_task_ids(pool):
    # Test: task_ids that are substrings of each other
    pool._operations["task"] = {AgentPhase.RUNNING: AsyncOperation("opT")}
    pool._operations["task1"] = {AgentPhase.RUNNING: AsyncOperation("opT1")}
    codeflash_output = pool._get_operation("task", AgentPhase.RUNNING).name # 540ns -> 599ns (9.85% slower)
    codeflash_output = pool._get_operation("task1", AgentPhase.RUNNING).name # 369ns -> 337ns (9.50% faster)
    codeflash_output = pool._get_operation("task", AgentPhase.FINISHED) # 631ns -> 277ns (128% faster)

def test_get_operation_edge_phase_enum_vs_str(pool):
    # Test: using string instead of AgentPhase enum
    pool._operations["taskZ"] = {AgentPhase.INIT: AsyncOperation("opZ")}
    codeflash_output = pool._get_operation("taskZ", "INIT"); result = codeflash_output # 805ns -> 620ns (29.8% faster)

def test_get_operation_edge_none_task_id_and_phase(pool):
    # Test: None as task_id and agent_phase
    pool._operations[None] = {AgentPhase.INIT: AsyncOperation("opNone")}
    codeflash_output = pool._get_operation(None, AgentPhase.INIT).name # 541ns -> 605ns (10.6% slower)
    codeflash_output = pool._get_operation(None, None) # 644ns -> 361ns (78.4% faster)
    codeflash_output = pool._get_operation("None", AgentPhase.INIT) # 321ns -> 276ns (16.3% faster)

def test_get_operation_edge_operation_is_none(pool):
    # Test: operation value is None (should be returned, not treated as missing)
    pool._operations["taskNull"] = {AgentPhase.ERROR: None}
    codeflash_output = pool._get_operation("taskNull", AgentPhase.ERROR) # 523ns -> 585ns (10.6% slower)

# 3. LARGE SCALE TEST CASES

def test_get_operation_large_many_tasks_and_phases(pool):
    # Test: 1000 tasks, each with 5 phases
    num_tasks = 1000
    phases = [AgentPhase.INIT, AgentPhase.RUNNING, AgentPhase.FINISHED, AgentPhase.ERROR, AgentPhase.CUSTOM]
    for i in range(num_tasks):
        pool._operations[f"task{i}"] = {phase: AsyncOperation(f"op{i}_{phase.name}") for phase in phases}
    # Check a few random tasks and phases
    codeflash_output = pool._get_operation("task0", AgentPhase.INIT).name # 797ns -> 1.03μs (22.7% slower)
    codeflash_output = pool._get_operation("task999", AgentPhase.ERROR).name # 391ns -> 443ns (11.7% slower)
    codeflash_output = pool._get_operation("task500", AgentPhase.CUSTOM).name # 227ns -> 235ns (3.40% slower)
    # Check missing task
    codeflash_output = pool._get_operation("task1000", AgentPhase.INIT) # 1.01μs -> 242ns (319% faster)
    # Check missing phase
    codeflash_output = pool._get_operation("task1", AgentPhase.ERROR) # 266ns -> 264ns (0.758% faster)
    codeflash_output = pool._get_operation("task1", "ERROR") # 648ns -> 570ns (13.7% faster)

def test_get_operation_large_sparse_phases(pool):
    # Test: 500 tasks, each with only one random phase
    import random
    random.seed(42)
    phases = [AgentPhase.INIT, AgentPhase.RUNNING, AgentPhase.FINISHED, AgentPhase.ERROR, AgentPhase.CUSTOM]
    for i in range(500):
        phase = random.choice(phases)
        pool._operations[f"sparse_task{i}"] = {phase: AsyncOperation(f"sparse_op{i}_{phase.name}")}
    # Check that only the correct phase returns the operation
    for i in range(0, 500, 100):
        phase = list(pool._operations[f"sparse_task{i}"].keys())[0]
        codeflash_output = pool._get_operation(f"sparse_task{i}", phase).name # 1.59μs -> 1.68μs (5.37% slower)
        # Try a phase that's not present
        missing_phase = [p for p in phases if p != phase][0]
        codeflash_output = pool._get_operation(f"sparse_task{i}", missing_phase)

def test_get_operation_large_all_none(pool):
    # Test: 1000 tasks, all phases dicts are empty
    for i in range(1000):
        pool._operations[f"empty_task{i}"] = {}
    # All lookups should return None
    for i in range(0, 1000, 200):
        for phase in AgentPhase:
            codeflash_output = pool._get_operation(f"empty_task{i}", phase)

def test_get_operation_large_nonexistent(pool):
    # Test: lookup for task_ids and phases that never existed
    for i in range(100):
        codeflash_output = pool._get_operation(f"no_task_{i}", AgentPhase.ERROR) # 23.5μs -> 13.8μs (70.3% faster)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-AsyncOperationPool._get_operation-mjarm4ra and push.

Codeflash Static Badge

The optimization replaces exception-based dictionary access with defensive `.get()` calls, eliminating the overhead of Python's exception handling mechanism.

**Key Changes:**
- **Avoided exception overhead**: The original code used `try/except KeyError` which creates expensive exception objects when keys don't exist. The optimized version uses `dict.get(key)` which returns `None` for missing keys without exceptions.
- **Two-step defensive lookup**: Instead of `self._operations[task_id][agent_phase]` (which can raise KeyError twice), the code first gets the task's operations with `.get(task_id)`, then gets the specific phase with `.get(agent_phase)`.

**Why This is Faster:**
Python exception handling involves significant overhead - creating exception objects, unwinding the call stack, and executing exception handlers. The profiler shows 156 KeyError exceptions out of 229 calls (68% miss rate), meaning most lookups were triggering this expensive path. Dictionary `.get()` operations are much faster as they're implemented in C and avoid the exception machinery entirely.

**Performance Impact:**
The test results show this optimization is particularly effective for "not found" scenarios:
- **Empty operations dict**: 69.4% faster (693ns → 409ns)
- **Missing task_id**: 60-80% faster in various test cases
- **Missing agent_phase**: 36-53% faster
- **Found cases**: Slight overhead (5-12% slower) due to additional variable assignment and conditional check

**Best Use Cases:**
This optimization excels when the `AsyncOperationPool` frequently queries for non-existent operations, which appears common in this codebase given the high miss rate in profiling. The tradeoff of slightly slower "found" cases for dramatically faster "not found" cases results in a net 33% speedup overall.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 December 18, 2025 01:32
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant