Skip to content

Conversation

@jianjunzhong
Copy link
Contributor

@jianjunzhong jianjunzhong commented Oct 23, 2025

Summary by CodeRabbit

  • Tests

    • Added comprehensive test suite for async storage manager, including initialization, data operations, and error handling across multiple storage scenarios.
    • Refactored storage unit tests to validate behavior using production components instead of mocked infrastructure.
  • Chores

    • Updated pytest configuration to filter PyTorch-related warnings.

@coderabbitai
Copy link

coderabbitai bot commented Oct 23, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

This PR adds pytest configuration to suppress PyTorch nested tensor warnings, introduces a comprehensive test suite for AsyncSimpleStorageManager with mocked ZMQ interactions, and refactors test_simple_storage_unit.py to use production SimpleStorageUnit actors directly instead of mocked components with updated method signatures.

Changes

Cohort / File(s) Summary
Configuration Updates
pyproject.toml
Added pytest configuration under [tool.pytest.ini_options] with filterwarnings entry to suppress PyTorch nested tensors prototype UserWarnings.
New AsyncSimpleStorageManager Tests
tests/test_async_simple_storage_manager.py
Comprehensive test suite covering initialization validation, basic operations (put_data, get_data, clear_data) with mocks, round-robin mapping logic across multiple storage units, error handling with exception propagation, and multiple storage unit scenarios. Patches ZMQ socket creation, polling, and mocks handshake responses for controller communication.
Refactored SimpleStorageUnit Tests
tests/test_simple_storage_unit.py
Replaced mocked ZMQ/controller testing with production SimpleStorageUnit actor instantiation. Updated actor signature from storage_size to storage_unit_size parameter. Simplified test helpers: PUT requests now only take local_indexes and field_data (removed global_indexes). Introduced MockStorageClient with put/get/clear methods. Added direct StorageUnitData usage, clear operation validation, and extended coverage for nested-tensor and single-item data pathways.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Rationale: Multiple heterogeneous changes across test files with significant scope: new comprehensive AsyncSimpleStorageManager test suite requiring understanding of mocking patterns and ZMQ interactions; substantial refactoring of SimpleStorageUnit tests with public API signature changes (storage_sizestorage_unit_size), architectural shift from mocked to production actors, new test helper implementations, and extended test coverage. Requires careful validation of refactoring correctness and signature compatibility.

Poem

🐰 Hops of joy through testing grounds,
Pytest hushes warning sounds,
Mocks replaced with real ZMQ flair,
Async managers handled with care!
Storage units now shine so bright, 🌟

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.43% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "feat: update tests for SimpleStorageUnit and add new tests for AsyncSimpleStorageManager" directly and clearly describes the main changes in the changeset. The title accurately captures the two primary objectives: updating the existing test suite for SimpleStorageUnit (which involves refactoring tests to use the production actor directly instead of mocks) and adding a comprehensive new test suite for AsyncSimpleStorageManager. While the PR also includes a minor configuration change to pyproject.toml (pytest filterwarnings), this is a supporting change that enables the tests rather than a primary objective, which is consistent with the PR summary that focuses on the test updates. The title is specific, concise, and free of vague language or unnecessary noise.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@0oshowero0 0oshowero0 requested a review from Copilot October 24, 2025 08:51
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR updates the test suite for SimpleStorageUnit and introduces comprehensive tests for AsyncSimpleStorageManager. The main changes involve removing the mock controller infrastructure, simplifying test fixtures, and adding new test coverage for async storage operations.

Key Changes:

  • Removed complex MockController infrastructure and simplified to use SimpleStorageUnit directly
  • Added comprehensive test suite for AsyncSimpleStorageManager with mocked ZMQ operations
  • Added pytest configuration to suppress PyTorch nested tensor warnings

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
tests/test_simple_storage_unit.py Refactored tests to use simplified SimpleStorageUnit directly, removed mock controller, and added new test cases for clear operations and direct StorageUnitData testing
tests/test_async_simple_storage_manager.py New test file with comprehensive async storage manager tests including initialization, operations, mapping functions, and error handling
pyproject.toml Added pytest configuration to filter PyTorch nested tensor warnings

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.


# Use larger batch size and more complex index mapping
global_indexes = list(range(i * batch_size, (i + 1) * batch_size))
# Use batch size and index mapping
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment 'Use batch size and index mapping' is vague and doesn't add value. Consider removing it or making it more specific about what the code is doing (e.g., 'Generate sequential local indexes for batch').

Suggested change
# Use batch size and index mapping
# Generate sequential local indexes for this batch

Copilot uses AI. Check for mistakes.
# Retrieve larger batch of data
indices = list(range(i * batch_size, (i + 1) * batch_size)) # Retrieve batch_size indices of data each time
response = client.send_get(0, indices, ["log_probs", "rewards"])
# Retrieve batch of data
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] This comment is redundant as the code clearly shows data retrieval. Consider removing it or making it more descriptive (e.g., 'Retrieve the same batch that was just stored').

Suggested change
# Retrieve batch of data
# Retrieve the same batch that was just stored

Copilot uses AI. Check for mistakes.
@0oshowero0
Copy link
Owner

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Oct 24, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
tests/test_async_simple_storage_manager.py (1)

153-157: Remove unused mock_recv.

Function is defined but never used.

-    async def mock_recv():
-        # This is a simplified mock - in real scenarios you'd need more sophisticated
-        # response handling based on the request type
-        return put_response.serialize()
tests/test_simple_storage_unit.py (2)

33-33: Remove unused logger.
Variable isn’t used.

-logger = None

218-221: Clarify the comment.

Make the intent explicit.

-        # Use batch size and index mapping
+        # Generate sequential local indexes for this batch
🧹 Nitpick comments (7)
pyproject.toml (1)

80-87: Scope the warning filter to torch to avoid hiding unrelated UserWarnings.

Narrow the filter to the torch module so other UserWarnings aren’t muted by accident.

 [tool.pytest.ini_options]
 filterwarnings = [
-    "ignore:.*PyTorch API of nested tensors.*prototype.*:UserWarning",
+    "ignore:.*PyTorch API of nested tensors.*prototype.*:UserWarning:torch",
 ]
tests/test_async_simple_storage_manager.py (2)

24-27: Avoid sys.path mutation in tests.

Prefer installing the package (pip install -e .) or a tests/conftest.py that sets PYTHONPATH for imports.


67-81: Use distinct socket mocks and signal poll readiness.

Reusing one socket for all endpoints can hide ordering/interaction bugs; also have poll() report readiness to match production flow.

-    with patch("transfer_queue.storage.create_zmq_socket") as mock_create_socket, patch("zmq.Poller") as mock_poller:
-        # Create mock socket with proper sync methods
-        mock_socket = AsyncMock()
-        mock_socket.connect = Mock()  # sync method
-        mock_socket.send = Mock()  # sync method
-        mock_socket.recv = AsyncMock()  # async method
-        mock_create_socket.return_value = mock_socket
+    with patch("transfer_queue.storage.create_zmq_socket") as mock_create_socket, patch("zmq.Poller") as mock_poller:
+        # Create distinct sockets per endpoint
+        handshake_sock = AsyncMock()
+        status_sock = AsyncMock()
+        for s in (handshake_sock, status_sock):
+            s.connect = Mock()
+            s.send = Mock()
+            s.recv = AsyncMock()
+        mock_create_socket.side_effect = [handshake_sock, status_sock]
         # Mock poller with sync methods
-        mock_poller_instance = Mock()
-        mock_poller_instance.register = Mock()  # sync method
-        mock_poller_instance.poll = Mock(return_value=[])  # sync method
+        mock_poller_instance = Mock()
+        mock_poller_instance.register = Mock()
+        # Indicate data available on handshake socket
+        mock_poller_instance.poll = Mock(return_value=[(handshake_sock, 1)])  # 1 == zmq.POLLIN
         mock_poller.return_value = mock_poller_instance

If you prefer explicit constants, add import zmq at the top and use zmq.POLLIN.

Also applies to: 215-229

tests/test_simple_storage_unit.py (4)

26-29: Avoid sys.path hacks.

Install the package (editable) or use a tests/conftest.py to adjust import path.


39-44: Improve ZMQ client resource hygiene (shared context, LINGER=0).

Per‑client contexts and default LINGER can hang on teardown and slow CI.

 class MockStorageClient:
@@
-    def __init__(self, storage_put_get_address):
-        self.context = zmq.Context()
-        self.socket = self.context.socket(zmq.DEALER)
-        self.socket.setsockopt(zmq.RCVTIMEO, 5000)  # 5 second timeout
+    def __init__(self, storage_put_get_address):
+        self.context = ZMQ_CTX  # shared context
+        self.socket = self.context.socket(zmq.DEALER)
+        self.socket.setsockopt(zmq.LINGER, 0)
+        self.socket.setsockopt(zmq.RCVTIMEO, 5000)  # 5 second timeout
         self.socket.connect(storage_put_get_address)
@@
     def close(self):
-        self.socket.close()
-        self.context.term()
+        # Fast, non-blocking close; do not term the shared context
+        self.socket.close(0)

Add at the top of the file (outside the class):

# Shared ZMQ context for tests
ZMQ_CTX = zmq.Context.instance()

Also applies to: 72-75


86-103: Reset global tensordict flag and avoid fixed sleep.

  • set_list_to_stack is global; reset it in teardown.
  • Replace sleep with a light readiness probe to deflake.
 def storage_setup(ray_setup):
@@
-    tensordict.set_list_to_stack(True).set()
+    tensordict.set_list_to_stack(True).set()
@@
-    time.sleep(1)  # Wait for socket to be ready
+    # Optional: probe readiness instead of fixed sleep
+    # (e.g., attempt a short-lived client connect or poll)
+    # time.sleep kept as fallback if probe is not added.
+    time.sleep(0.2)
@@
-    yield storage_actor, put_get_address
-
-    # Cleanup
-    ray.kill(storage_actor)
+    try:
+        yield storage_actor, put_get_address
+    finally:
+        ray.kill(storage_actor)
+        tensordict.set_list_to_stack(False).set()

204-258: Mark perf test as slow and reduce flakiness.

Performance assertions can be noisy in CI. Gate it or mark slow; consider percentile over mean.

-def test_performance_basic(storage_setup):
+@pytest.mark.slow
+def test_performance_basic(storage_setup):
@@
-    num_puts = 20  # Reduced for faster testing
-    batch_size = 32  # Reduced for faster testing
+    num_puts = 10
+    batch_size = 16
@@
-    num_gets = 20
+    num_gets = 10
@@
-    # More lenient performance thresholds for testing environment
-    assert avg_put_latency < 1000, f"Avg PUT latency {avg_put_latency}ms exceeds threshold"
-    assert avg_get_latency < 1000, f"Avg GET latency {avg_get_latency}ms exceeds threshold"
+    # Lenient thresholds to avoid CI flakiness
+    assert avg_put_latency < 1500, f"Avg PUT latency {avg_put_latency}ms exceeds threshold"
+    assert avg_get_latency < 1500, f"Avg GET latency {avg_get_latency}ms exceeds threshold"
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 705bd27 and ccf4bce.

📒 Files selected for processing (3)
  • pyproject.toml (1 hunks)
  • tests/test_async_simple_storage_manager.py (1 hunks)
  • tests/test_simple_storage_unit.py (12 hunks)
🔇 Additional comments (1)
tests/test_async_simple_storage_manager.py (1)

91-93: The review comment is incorrect because AsyncSimpleStorageManager has no aclose() or close() methods.

The suggested fix assumes these cleanup methods exist, but they don't. The hasattr checks would both return False, and the suggested code would not execute any cleanup. The ZMQ context and sockets created in the parent class TransferQueueStorageManager are never explicitly closed, but calling non-existent methods won't solve this.

If resource cleanup is actually needed, the AsyncSimpleStorageManager class would need to implement proper cleanup methods first (e.g., an aclose() method for async cleanup or a context manager), and then the fixture could use them.

Likely an incorrect or invalid review comment.

…ters for faster execution

Signed-off-by: jianjunzhong <[email protected]>
@0oshowero0 0oshowero0 merged commit 6042b7b into 0oshowero0:han/unified_storage_abstract Oct 25, 2025
2 checks passed
MissFishY pushed a commit to MissFishY/TransferQueue that referenced this pull request Dec 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants