Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 18, 2025

📄 17,056% (170.56x) speedup for find_last_node in src/algorithms/graph.py

⏱️ Runtime : 81.8 milliseconds 477 microseconds (best of 114 runs)

📝 Explanation and details

The optimization transforms an O(N*M) algorithm into an O(N+M) algorithm by replacing repeated linear searches with a single set-based lookup.

Key Changes:

  1. Pre-compute edge sources: Creates a set {e["source"] for e in edges} containing all edge source IDs (O(M) time)
  2. Replace nested loop with set lookup: Changes from checking all(e["source"] != n["id"] for e in edges) for each node to a simple n["id"] not in edge_sources check (O(1) per node vs O(M) per node)
  3. Early return optimization: Uses explicit loop with early return instead of generator expression with next()

Why It's Faster:
The original code had quadratic complexity - for each of the N nodes, it scanned all M edges to check if the node appears as a source. This results in N*M operations. The optimized version builds the edge sources set once (M operations) then performs N constant-time lookups, totaling N+M operations.

Performance Impact:
The 170x speedup (from 81.8ms to 477µs) demonstrates the dramatic improvement, especially evident in the large-scale test cases. The optimization excels when:

  • Large edge counts: More edges make the set pre-computation cost worthwhile
  • Many nodes to check: Linear scanning becomes expensive with more nodes
  • Dense graphs: When most nodes are sources, early termination is less likely in the original approach

This optimization is particularly valuable for graph analysis workloads where finding sink nodes (nodes with no outgoing edges) is a common operation in larger datasets.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 44 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
from __future__ import annotations

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

# unit tests

# ----------------------
# Basic Test Cases
# ----------------------


def test_single_node_no_edges():
    # One node, no edges; should return the node itself
    nodes = [{"id": 1}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_two_nodes_one_edge():
    # Two nodes, one edge from node 1 to node 2; last node should be node 2
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_three_nodes_linear_chain():
    # Three nodes in a chain: 1 -> 2 -> 3; last node is 3
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_three_nodes_branching():
    # Three nodes, 1 branches to 2 and 3; both 2 and 3 are last nodes (no outgoing edges)
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 1, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# ----------------------
# Edge Test Cases
# ----------------------


def test_empty_nodes_and_edges():
    # No nodes and no edges; should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_nodes_no_edges():
    # Multiple nodes, no edges; should return the first node (all are last nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_cycle_graph():
    # Nodes form a cycle: 1->2->3->1; all have outgoing edges, so result should be None
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 2, "target": 3},
        {"source": 3, "target": 1},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_disconnected_nodes():
    # Some nodes are disconnected (no edges at all)
    nodes = [{"id": "X"}, {"id": "Y"}, {"id": "Z"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_multiple_last_nodes():
    # Multiple nodes with no outgoing edges; should return the first found
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}]
    # Node 2 and 3 have no outgoing edges
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edge_with_nonexistent_node():
    # Edge refers to a node not in nodes; should not affect result
    nodes = [{"id": 1}, {"id": 2}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 3, "target": 1},
    ]  # source 3 not in nodes
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_nodes_with_non_integer_ids():
    # Nodes with string IDs
    nodes = [{"id": "foo"}, {"id": "bar"}]
    edges = [{"source": "foo", "target": "bar"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_duplicate_node_ids():
    # Duplicate node IDs; should return first occurrence with no outgoing edge
    nodes = [{"id": 1}, {"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_node_with_self_loop():
    # Node with an edge to itself; should not be last node
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edge_dict_with_extra_keys():
    # Edges contain extra keys; should still work
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2, "weight": 10}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# ----------------------
# Large Scale Test Cases
# ----------------------


def test_large_linear_chain():
    # Large chain of nodes: 0->1->2->...->999; last node is 999
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": i, "target": i + 1} for i in range(N - 1)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_branching_graph():
    # 500 nodes, each node branches to two nodes (except last two)
    N = 500
    nodes = [{"id": i} for i in range(N)]
    edges = []
    for i in range(N - 2):
        edges.append({"source": i, "target": i + 1})
        edges.append({"source": i, "target": i + 2})
    # Only nodes N-1 and N-2 have no outgoing edges
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_graph_with_disconnected_nodes():
    # 1000 nodes, only first 10 are connected; rest are disconnected
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": i, "target": i + 1} for i in range(9)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_graph_all_nodes_in_cycle():
    # 1000 nodes in a cycle; should return None
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [{"source": i, "target": (i + 1) % N} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_graph_with_multiple_last_nodes():
    # 1000 nodes, every even node has no outgoing edges
    N = 1000
    nodes = [{"id": i} for i in range(N)]
    edges = [
        {"source": i, "target": i + 1} for i in range(0, N - 1, 2)
    ]  # only even nodes have outgoing edges
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output
    # Odd nodes (except N-1 if N is odd) have no outgoing edges
    odd_nodes = [{"id": i} for i in range(1, N, 2)]


# ----------------------
# Mutation Testing - Negative Test
# ----------------------


def test_mutation_wrong_behavior():
    # If function returns a node with outgoing edges, fail
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_mutation_none_when_last_node_exists():
    # If function returns None when there is a last node, fail
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
from __future__ import annotations

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

# unit tests

# ---- Basic Test Cases ----


def test_single_node_no_edges():
    # Only one node, no edges. That node should be returned.
    nodes = [{"id": 1, "label": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_two_nodes_one_edge():
    # Two nodes, one edge from node 1 to node 2. Node 2 should be returned.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_three_nodes_linear_chain():
    # Three nodes in a chain: 1->2->3. Node 3 should be returned.
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_multiple_last_nodes():
    # Two nodes are not sources in any edge, should return the first such node.
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}]
    # Both node 2 and 3 are not sources, but node 2 comes first
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_no_nodes():
    # No nodes at all, should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# ---- Edge Test Cases ----


def test_all_nodes_are_sources():
    # Every node is a source in at least one edge, so should return None
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 2, "target": 3},
        {"source": 3, "target": 1},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_node_with_multiple_incoming_edges():
    # Node 3 has multiple incoming edges, but is not a source, so should be returned
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 3}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_node_with_self_loop():
    # Node 1 has a self loop, so it's a source. Node 2 is not a source, so should be returned.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edges_with_extra_keys():
    # Edges have extra keys, should not affect the result
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2, "weight": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_nodes_with_extra_keys():
    # Nodes have extra keys, should not affect the result
    nodes = [{"id": 1, "name": "A"}, {"id": 2, "name": "B"}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_no_edges_some_nodes():
    # Multiple nodes, no edges. Should return the first node.
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edges_with_nonexistent_source():
    # Edge refers to a source not in nodes. Should ignore and return the first node not a source.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 99, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_duplicate_node_ids():
    # Duplicate node IDs, function should return the first one not a source
    nodes = [{"id": 1}, {"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_empty_edges_list():
    # Edges list is empty, should return the first node
    nodes = [{"id": 10}, {"id": 20}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_nodes_with_non_integer_ids():
    # Node IDs are strings
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edges_with_non_integer_ids():
    # Edge sources/targets are strings
    nodes = [{"id": "X"}, {"id": "Y"}, {"id": "Z"}]
    edges = [{"source": "X", "target": "Y"}, {"source": "Y", "target": "Z"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_nodes_with_none_id():
    # Node with None id, should be handled gracefully
    nodes = [{"id": None}, {"id": 2}]
    edges = [{"source": 2, "target": None}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_edges_with_none_source():
    # Edge with source None, should not match any node id except None
    nodes = [{"id": None}, {"id": 1}]
    edges = [{"source": None, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# ---- Large Scale Test Cases ----


def test_large_linear_chain():
    # 1000 nodes in a chain, last node should be returned
    nodes = [{"id": i} for i in range(1000)]
    edges = [{"source": i, "target": i + 1} for i in range(999)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_fully_connected_graph():
    # 100 nodes, every node is a source, should return None
    nodes = [{"id": i} for i in range(100)]
    edges = [
        {"source": i, "target": j} for i in range(100) for j in range(100) if i != j
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_sparse_graph():
    # 1000 nodes, only first 10 are sources, should return the first node not a source (id=10)
    nodes = [{"id": i} for i in range(1000)]
    edges = [{"source": i, "target": i + 1} for i in range(10)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_disconnected_nodes():
    # 1000 nodes, no edges. Should return the first node.
    nodes = [{"id": i} for i in range(1000)]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


def test_large_graph_with_multiple_last_nodes():
    # 1000 nodes, only even nodes are sources, so first odd node (id=1) is returned
    nodes = [{"id": i} for i in range(1000)]
    edges = [{"source": i, "target": i + 1} for i in range(0, 1000, 2) if i + 1 < 1000]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mjarp9cm and push.

Codeflash Static Badge

The optimization transforms an O(N*M) algorithm into an O(N+M) algorithm by replacing repeated linear searches with a single set-based lookup.

**Key Changes:**
1. **Pre-compute edge sources**: Creates a set `{e["source"] for e in edges}` containing all edge source IDs (O(M) time)
2. **Replace nested loop with set lookup**: Changes from checking `all(e["source"] != n["id"] for e in edges)` for each node to a simple `n["id"] not in edge_sources` check (O(1) per node vs O(M) per node)
3. **Early return optimization**: Uses explicit loop with early return instead of generator expression with `next()`

**Why It's Faster:**
The original code had quadratic complexity - for each of the N nodes, it scanned all M edges to check if the node appears as a source. This results in N*M operations. The optimized version builds the edge sources set once (M operations) then performs N constant-time lookups, totaling N+M operations.

**Performance Impact:**
The 170x speedup (from 81.8ms to 477µs) demonstrates the dramatic improvement, especially evident in the large-scale test cases. The optimization excels when:
- **Large edge counts**: More edges make the set pre-computation cost worthwhile
- **Many nodes to check**: Linear scanning becomes expensive with more nodes
- **Dense graphs**: When most nodes are sources, early termination is less likely in the original approach

This optimization is particularly valuable for graph analysis workloads where finding sink nodes (nodes with no outgoing edges) is a common operation in larger datasets.
@codeflash-ai codeflash-ai bot requested a review from KRRT7 December 18, 2025 01:34
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant