Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 17, 2025

📄 21,817% (218.17x) speedup for find_last_node in src/algorithms/graph.py

⏱️ Runtime : 181 milliseconds 826 microseconds (best of 132 runs)

📝 Explanation and details

The optimization dramatically improves performance by eliminating quadratic complexity through a fundamental algorithmic change.

Key Optimization:
The original code uses a nested loop structure: for each node, it checks against ALL edges to verify if that node is a source. This creates O(n × m) complexity where n = nodes and m = edges. The optimized version pre-computes a set of all source IDs once, then performs constant-time lookups.

Specific Changes:

  1. Pre-computation: source_ids = {e["source"] for e in edges} creates a hash set of all source node IDs in O(m) time
  2. Fast lookup: n["id"] not in source_ids uses O(1) hash set membership testing instead of O(m) linear search through all edges

Why This Works:

  • Hash set creation is O(m) vs. the original's O(n × m) repeated edge scanning
  • Set membership testing (in/not in) is O(1) average case vs. O(m) for the all() generator
  • Total complexity drops from O(n × m) to O(n + m)

Performance Impact:
The 218x speedup (from 181ms to 826μs) demonstrates the dramatic difference between quadratic and linear algorithms. This optimization is particularly effective for:

  • Large graphs: Performance gains increase exponentially with graph size (as shown in large-scale test cases with 1000+ nodes)
  • Dense graphs: More edges mean greater savings from avoiding repeated edge iteration
  • Star topologies: The large star graph test case especially benefits since it has many edges from one central node

The optimization maintains identical behavior while being significantly more scalable for real-world graph processing workloads.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 40 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
from __future__ import annotations

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

# unit tests

# ----------- BASIC TEST CASES -----------


def test_single_node_no_edges():
    # Single node, no edges: node should be returned as last node
    nodes = [{"id": 1}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_two_nodes_one_edge():
    # Two nodes, one edge from node 1 to node 2: node 2 should be last node
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_three_nodes_linear_flow():
    # Linear flow: 1->2->3; node 3 is last node
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_multiple_end_nodes():
    # Multiple nodes not acting as source: return the first found
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}]
    # node 2 and node 3 are not sources
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# ----------- EDGE TEST CASES -----------


def test_empty_nodes():
    # No nodes: should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_empty_edges():
    # Nodes present, no edges: should return first node
    nodes = [{"id": "a"}, {"id": "b"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_cycle_graph():
    # Cycle: 1->2->3->1; no last node (all are sources)
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 2, "target": 3},
        {"source": 3, "target": 1},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_disconnected_nodes():
    # Some nodes not connected at all; should return one of them
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}]
    # node 3 is disconnected
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_node_with_multiple_incoming_edges():
    # Node 3 has multiple incoming edges, but is not a source
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 3}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_node_with_multiple_outgoing_edges():
    # Node 1 has multiple outgoing edges, so is not last node
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 1, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_non_integer_node_ids():
    # Node IDs are strings
    nodes = [{"id": "x"}, {"id": "y"}]
    edges = [{"source": "x", "target": "y"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_missing_source_in_edge():
    # Edge missing 'source' key should raise KeyError
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"target": 2}]
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)


# ----------- LARGE SCALE TEST CASES -----------


def test_large_linear_graph():
    # Large linear graph: 1->2->...->1000; last node should be 1000
    N = 1000
    nodes = [{"id": i} for i in range(1, N + 1)]
    edges = [{"source": i, "target": i + 1} for i in range(1, N)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_star_graph():
    # Star graph: node 0 points to all others; last nodes are 1..N
    N = 999
    nodes = [{"id": 0}] + [{"id": i} for i in range(1, N + 1)]
    edges = [{"source": 0, "target": i} for i in range(1, N + 1)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_disconnected_graph():
    # 500 connected, 500 disconnected nodes; should return one of disconnected
    N = 1000
    nodes = [{"id": i} for i in range(1, N + 1)]
    edges = [{"source": i, "target": i + 1} for i in range(1, 500)]
    disconnected = set(range(501, N + 1))
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_cycle_graph():
    # Large cycle: all nodes are sources, should return None
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": i, "target": (i + 1) % N} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_graph_with_multiple_last_nodes():
    # Graph with several nodes not acting as source
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    # First 500 nodes are sources, rest are not
    edges = [{"source": i, "target": i + 1} for i in range(0, 499)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
from __future__ import annotations

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

# unit tests

# 1. Basic Test Cases


def test_single_node_no_edges():
    # One node, no edges. Should return the node itself.
    nodes = [{"id": 1, "value": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_two_nodes_one_edge():
    # Two nodes, one edge from 1 -> 2. Should return node 2 as last node.
    nodes = [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_three_nodes_linear_chain():
    # 1 -> 2 -> 3. Should return node 3.
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_three_nodes_multiple_endings():
    # 1 -> 2, 1 -> 3. Both 2 and 3 are valid as last nodes, function should return first found (2).
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 1, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_no_edges_multiple_nodes():
    # No edges, all nodes are last nodes, should return first node.
    nodes = [{"id": 10}, {"id": 20}, {"id": 30}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# 2. Edge Test Cases


def test_empty_nodes_and_edges():
    # No nodes, no edges. Should return None.
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edges_but_no_nodes():
    # Edges exist but no nodes. Should return None.
    nodes = []
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_all_nodes_have_outgoing_edges():
    # All nodes have outgoing edges, so no last node. Should return None.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_self_loop():
    # Node with a self-loop. Should not be a last node.
    nodes = [{"id": 1}]
    edges = [{"source": 1, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_multiple_nodes_with_cycles():
    # 1 -> 2, 2 -> 3, 3 -> 1 (cycle). No last node.
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 2, "target": 3},
        {"source": 3, "target": 1},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_node_with_incoming_but_no_outgoing():
    # Node 3 has incoming edge but no outgoing, should be detected as last node.
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_duplicate_node_ids():
    # Duplicate node ids, should return first node with no outgoing edge.
    nodes = [{"id": 1}, {"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edge_with_nonexistent_source():
    # Edge references a source not in nodes, should not affect result.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 3, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edge_with_nonexistent_target():
    # Edge references a target not in nodes, should not affect result.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_nodes_with_non_integer_ids():
    # Node ids as strings.
    nodes = [{"id": "a"}, {"id": "b"}]
    edges = [{"source": "a", "target": "b"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_nodes_with_none_ids():
    # Node id is None, edge references None.
    nodes = [{"id": None}, {"id": 2}]
    edges = [{"source": None, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_missing_source_key_in_edge():
    # Edge dict without 'source' key, should raise KeyError
    nodes = [{"id": 1}]
    edges = [{"target": 1}]
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)


# 3. Large Scale Test Cases


def test_large_linear_chain():
    # 1000 nodes in a linear chain: 0->1->2->...->999, last node is 999
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": i, "target": i + 1} for i in range(N - 1)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_star_graph():
    # One central node (0) points to 999 nodes (1..999), all leaves are last nodes, first leaf returned
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": 0, "target": i} for i in range(1, N)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_graph_no_edges():
    # 1000 nodes, no edges, all are last nodes, first returned
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_cycle():
    # 1000 nodes in a cycle: 0->1->2->...->999->0, no last node
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": i, "target": (i + 1) % N} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_disconnected_components():
    # 500 nodes in chain, 500 isolated nodes, should return first isolated node
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": i, "target": i + 1} for i in range(499)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mjamo8dz and push.

Codeflash Static Badge

The optimization dramatically improves performance by **eliminating quadratic complexity** through a fundamental algorithmic change.

**Key Optimization:**
The original code uses a nested loop structure: for each node, it checks against ALL edges to verify if that node is a source. This creates O(n × m) complexity where n = nodes and m = edges. The optimized version pre-computes a set of all source IDs once, then performs constant-time lookups.

**Specific Changes:**
1. **Pre-computation**: `source_ids = {e["source"] for e in edges}` creates a hash set of all source node IDs in O(m) time
2. **Fast lookup**: `n["id"] not in source_ids` uses O(1) hash set membership testing instead of O(m) linear search through all edges

**Why This Works:**
- Hash set creation is O(m) vs. the original's O(n × m) repeated edge scanning
- Set membership testing (`in`/`not in`) is O(1) average case vs. O(m) for the `all()` generator
- Total complexity drops from O(n × m) to O(n + m)

**Performance Impact:**
The 218x speedup (from 181ms to 826μs) demonstrates the dramatic difference between quadratic and linear algorithms. This optimization is particularly effective for:
- **Large graphs**: Performance gains increase exponentially with graph size (as shown in large-scale test cases with 1000+ nodes)
- **Dense graphs**: More edges mean greater savings from avoiding repeated edge iteration
- **Star topologies**: The large star graph test case especially benefits since it has many edges from one central node

The optimization maintains identical behavior while being significantly more scalable for real-world graph processing workloads.
@codeflash-ai codeflash-ai bot requested a review from KRRT7 December 17, 2025 23:13
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 17, 2025
@KRRT7 KRRT7 closed this Dec 18, 2025
@codeflash-ai codeflash-ai bot deleted the codeflash/optimize-find_last_node-mjamo8dz branch December 18, 2025 01:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants