Skip to content

Conversation

@georgi-l95
Copy link
Member

@georgi-l95 georgi-l95 commented Oct 7, 2025

Description

This PR aims to harden publisher and subsriber plugins.

Reviewer Checklist

  • The code follows the project's coding standards.
  • New and existing unit tests pass locally with my changes.
  • I have added necessary documentation (if applicable).
  • I have confirmed that this change does not introduce any new security vulnerabilities.

Summary by CodeRabbit

  • New Features

    • Configurable publish limits (max message size and max items per set), session-level tracking with end-of-session summaries, and a new publish-session duration metric.
    • Publish request validation with explicit error responses; subscribe streaming gains richer validation, failure-cause reporting, and improved live/historical handling.
  • Tests

    • E2E tests reorganized and expanded: publish and subscribe suites covering single-block, multi-publisher races, validation, edge cases, concurrency, and failure scenarios.
  • Chores

    • Added release workflow and an error-handling dependency.

Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
@georgi-l95 georgi-l95 self-assigned this Oct 7, 2025
@coderabbitai
Copy link

coderabbitai bot commented Oct 7, 2025

Walkthrough

Adds configurable publish limits, request validation, per-session counters and histogram metrics, session statistics logging, replaces an ingestion e2e suite with new publish/subscribe e2e tests, and updates related configs, small plugin APIs, and CI release workflow.

Changes

Cohort / File(s) Summary
E2E Config
config/config.e2e.toml
Adds publish_service.max_message_size_bytes = 33554432 and publish_service.max_items_per_set = 10000.
Core: Config & Metrics
crates/rock-node-core/src/config.rs, crates/rock-node-core/src/metrics.rs
Adds max_message_size_bytes and max_items_per_set to PublishServiceConfig defaults; adds and registers publish_session_duration_seconds: HistogramVec in MetricsRegistry::with_registry.
Publish Plugin: deps & wiring
crates/rock-node-publish-plugin/Cargo.toml, crates/rock-node-publish-plugin/src/lib.rs
Adds thiserror dependency; replace hard-coded 32MB with config-driven max_message_size_bytes for gRPC encode/decode; introduces error and validation modules.
Publish Plugin: Errors & Validation
crates/rock-node-publish-plugin/src/error.rs, crates/rock-node-publish-plugin/src/validation.rs
New PublishError enum and Result<T> alias; new RequestValidator enforcing non-empty sets, header presence for new blocks, and max items per set.
Publish Plugin: Session / Service
crates/rock-node-publish-plugin/src/session_manager.rs, crates/rock-node-publish-plugin/src/service.rs
SessionManager gains publisher_id, timing, validator, counters (blocks_received, blocks_accepted, blocks_skipped), and log_session_stats() which records histogram; handle_block_items integrates validation and updates counters; service invokes log_session_stats() on session end.
Subscriber Plugin: session & errors
crates/rock-node-subscriber-plugin/src/session.rs, crates/rock-node-subscriber-plugin/src/error.rs, crates/rock-node-subscriber-plugin/src/service.rs
Adds failure_cause and accessor; refactors validation flow, logging, and session lifecycle messages; refines SubscriberError::to_metric_label mapping and adjusts log levels.
Backfill Plugin
crates/rock-node-backfill-plugin/src/worker.rs
Changes run_continuous_stream_cycle parameter from &mut Vec<String> to &mut [String] (slice).
Block Access Plugin
crates/rock-node-block-access-plugin/src/service.rs
Maps block_response::Code::Error to "Error"; adds test coverage and allows clippy result_large_err on record_metrics.
E2E Tests: restructure & new suites
tests/e2e/src/tests/mod.rs, tests/e2e/src/tests/ingestion_suite.rs (removed), tests/e2e/src/tests/publish/*, tests/e2e/src/tests/subscribe/*
Removes old ingestion_suite; exposes publish and subscribe modules; adds publish tests (basic, multi_publisher_basic, validation_errors) and subscribe tests (basic, edge_cases, failure_scenarios, validation) with many new async test cases.
Release CI
.github/workflows/release.yml
Adds a release workflow to build/push multi-arch Docker images, cross-compile binaries, package artifacts, and upload release assets.
Config: runtime defaults
config/config.toml
Changes network bindings to 0.0.0.0 for gRPC and observability, and toggles several service enable flags and backfill peer address.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Pub as Publisher Client
    participant Svc as PublishService (gRPC)
    participant Sess as SessionManager
    participant Val as RequestValidator
    participant Met as MetricsRegistry

    Note over Svc: gRPC limits set from config (max_message_size_bytes)

    Pub->>Svc: PublishStreamRequest(BlockItemSet)
    activate Svc
    Svc->>Sess: handle_block_items(set, is_new_block)
    activate Sess
    Sess->>Val: validate_block_items(set, is_new_block)
    alt Validation fails
        Val-->>Sess: Err(PublishError::...)
        Sess-->>Pub: EndStream(Error | ValidationError)
        Sess->>Met: increment validation metric
        Sess->>Sess: terminate session
    else Validation passes
        Val-->>Sess: Ok
        Sess->>Sess: update counters (received/accepted/skipped)
        Sess-->>Pub: Acknowledgement | SkipBlock | broadcast
    end
    deactivate Sess

    Note over Sess,Met: On session end -> Sess.log_session_stats() -> record publish_session_duration_seconds
    Svc->>Met: observe duration histogram
    deactivate Svc
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60–90 minutes

Possibly related PRs

Suggested labels

type: tests

Poem

I’m a rabbit counting hops and bytes,
I validate headers through nights,
I log the session, histogram sings,
Tests race, subscribe, and many things.
Hop on — configs set, the pipeline delights! 🐇📦

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The description is missing the Related Issues and Changes sections required by the repository’s template and only provides a high-level goal and checklist without detailing the specific files or services modified. Add a Related Issues section with any linked issue numbers and a Changes section summarizing each service or file update to comply with the required template.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title accurately highlights the publisher plugin hardening but does not mention the subscriber plugin improvements and other significant changes in this pull request, making it only a partial summary of the main work.
Docstring Coverage ✅ Passed Docstring coverage is 81.25% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch improve-publisher

Comment @coderabbitai help to get the list of available commands and usage tips.

Signed-off-by: georgi-l95 <[email protected]>
@georgi-l95 georgi-l95 marked this pull request as ready for review October 8, 2025 12:37
Signed-off-by: georgi-l95 <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/rock-node-core/src/config.rs (1)

277-285: Test coverage incomplete for new configuration fields.

The test test_publish_service_config_default_values doesn't verify the newly added max_message_size_bytes and max_items_per_set fields.

Add assertions for the new fields:

     #[test]
     fn test_publish_service_config_default_values() {
         let config = PublishServiceConfig::default();
         assert_eq!(config.enabled, true);
         assert_eq!(config.max_concurrent_streams, 250);
         assert_eq!(config.persistence_ack_timeout_seconds, 60);
         assert_eq!(config.stale_winner_timeout_seconds, 10);
         assert_eq!(config.winner_cleanup_interval_seconds, 150);
         assert_eq!(config.winner_cleanup_threshold_blocks, 1000);
+        assert_eq!(config.max_message_size_bytes, 32 * 1024 * 1024);
+        assert_eq!(config.max_items_per_set, 10000);
     }
🧹 Nitpick comments (11)
crates/rock-node-subscriber-plugin/src/session.rs (1)

383-384: Consider using realistic test values instead of zero.

The new configuration fields are set to 0 in the test context. If any validation or business logic relies on these limits (especially max_items_per_set), setting them to zero could mask bugs or lead to unexpected behavior in tests.

Consider using the same defaults as production:

                 publish_service: PublishServiceConfig {
                     enabled: false,
                     max_concurrent_streams: 0,
                     persistence_ack_timeout_seconds: 0,
                     stale_winner_timeout_seconds: 0,
                     winner_cleanup_interval_seconds: 0,
                     winner_cleanup_threshold_blocks: 0,
-                    max_message_size_bytes: 0,
-                    max_items_per_set: 0,
+                    max_message_size_bytes: 32 * 1024 * 1024,
+                    max_items_per_set: 10000,
                 },
tests/e2e/src/tests/publish/basic.rs (1)

33-38: Close the client stream after sending to avoid dangling session.

Drop the mpsc sender after the send so the server observes end-of-stream. Other tests already do this.

     tx.send(PublishStreamRequest {
         request: Some(PublishRequest::BlockItems(BlockItemSet {
             block_items: block_proto.items,
         })),
     })
     .await?;
+    // Signal end-of-stream after sending the block
+    drop(tx);
crates/rock-node-publish-plugin/src/error.rs (1)

9-20: Unify block number types; avoid -1 sentinel.

Use u64 everywhere for block numbers and Option where “unknown” is possible. This removes casts/sentinels and yields a clearer API. Based on learnings.

-    #[error("Missing block header for new block {block_number}")]
-    MissingBlockHeader { block_number: i64 },
+    #[error("Missing block header (block: {block_number:?})")]
+    MissingBlockHeader { block_number: Option<u64> },

-    #[error("Block number mismatch: expected {expected}, got {received}")]
-    BlockNumberMismatch { expected: i64, received: i64 },
+    #[error("Block number mismatch: expected {expected}, got {received}")]
+    BlockNumberMismatch { expected: u64, received: u64 },

-    #[error("Invalid block number: {block_number}")]
-    InvalidBlockNumber { block_number: i64 },
+    #[error("Invalid block number: {block_number}")]
+    InvalidBlockNumber { block_number: u64 },

Optionally, carry underlying channel error for better diagnostics:

// Example alternative
#[error("Channel send error: {0}")]
ChannelSend(String),

Also applies to: 30-35

crates/rock-node-publish-plugin/src/validation.rs (1)

43-51: Align validator with Option block numbers and remove sentinels.

Return None when unknown; Some(..) when derivable; keep all block numbers u64.

-                    let block_number = self.extract_block_number_from_any_item(item_type);
-                    return Err(PublishError::MissingBlockHeader {
-                        block_number: block_number.unwrap_or(-1),
-                    });
+                    let block_number = self.extract_block_number_from_any_item(item_type);
+                    return Err(PublishError::MissingBlockHeader { block_number });
                 }
             } else {
-                return Err(PublishError::MissingBlockHeader { block_number: -1 });
+                return Err(PublishError::MissingBlockHeader { block_number: None });
             }
-    fn extract_block_number_from_any_item(&self, item: &BlockItemType) -> Option<i64> {
+    fn extract_block_number_from_any_item(&self, item: &BlockItemType) -> Option<u64> {
         match item {
-            BlockItemType::BlockHeader(header) => Some(header.number as i64),
-            BlockItemType::BlockProof(proof) => Some(proof.block as i64),
+            BlockItemType::BlockHeader(header) => Some(header.number),
+            BlockItemType::BlockProof(proof) => Some(proof.block),
             _ => None,
         }
     }

Also applies to: 57-63

tests/e2e/src/tests/publish/validation_errors.rs (2)

138-145: Don’t assume the proof is the last item; match by variant.

Make the test resilient to block layout changes by finding the BlockProof item explicitly.

-    // Send only proof item (last item), skip header
-    let proof_only = vec![block_proto.items.last().unwrap().clone()];
+    // Send only proof item (skip header)
+    let proof_item = block_proto
+        .items
+        .iter()
+        .find(|it| matches!(it.item, Some(rock_node_protobufs::com::hedera::hapi::block::stream::block_item::Item::BlockProof(_))))
+        .cloned()
+        .expect("Block should contain a proof item");
+    let proof_only = vec![proof_item];

266-273: Build oversized set without relying on last() being a proof.

Pick the proof by type; then clone it to exceed the limit.

-    // Create excessive number of items (more than max_items_per_set of 10000)
-    let mut large_items = vec![block_proto.items[0].clone()]; // header
-
-    // Add items to exceed the limit
-    for _ in 1..10001 {
-        large_items.push(block_proto.items.last().unwrap().clone());
-    }
+    // Create excessive number of items (more than max_items_per_set of 10000)
+    let mut large_items = vec![block_proto.items[0].clone()]; // header
+    let proof_item = block_proto
+        .items
+        .iter()
+        .find(|it| matches!(it.item, Some(rock_node_protobufs::com::hedera::hapi::block::stream::block_item::Item::BlockProof(_))))
+        .cloned()
+        .expect("Block should contain a proof item");
+    // Add items to exceed the limit
+    for _ in 1..10001 {
+        large_items.push(proof_item.clone());
+    }
tests/e2e/src/tests/publish/multi_publisher_basic.rs (2)

142-320: Consider the implications of the 60% fairness threshold.

The test asserts that no single publisher should win more than 60% of blocks (lines 307-314). While this prevents monopolization, the threshold appears somewhat arbitrary and may not catch significant fairness issues (e.g., a 59% win rate is still quite high).

Consider whether:

  1. A stricter threshold (e.g., 40-50%) would be more appropriate for validating fairness.
  2. Additional fairness metrics (e.g., standard deviation of win rates) would provide better coverage.
  3. The test comments should document the rationale for choosing 60%.

Based on the "Chill" review setting, this is flagged as optional since the current threshold does provide some fairness validation.


186-224: Complex timeout and response handling could be simplified.

The nested timeout and response-handling logic (lines 191-224) is intricate and could benefit from extraction into a helper function. This would improve readability and make the test logic easier to maintain.

Consider extracting this pattern:

async fn wait_for_skip_or_timeout(
    responses: &mut impl Stream<Item = Result<PublishStreamResponse, Status>>,
    block_num: u64,
    timeout_ms: u64,
) -> Result<bool, anyhow::Error> {
    // Returns Ok(true) if got SkipBlock, Ok(false) if timeout, Err on error
    // ... (implementation based on lines 191-224)
}

Then use: let got_skip = wait_for_skip_or_timeout(&mut responses, block_num, 500).await?;

crates/rock-node-publish-plugin/src/session_manager.rs (3)

90-142: Consider adding more specific validation error details to the log message.

The validation error log (lines 114-118) provides the session ID and error, but doesn't indicate which specific validation rule failed (empty items, missing header, too many items, etc.). This could make debugging validation failures more difficult.

Consider enriching the error message or adding a validation error type field:

         if let Err(e) = self
             .validator
             .validate_block_items(&block_item_set, is_new_block)
         {
             error!(
                 session_id = %self.id,
+                validation_type = ?std::any::type_name_of_val(&e),
                 error = %e,
                 "Validation failed for block items"
             );

This would help operators quickly identify which validation rule triggered the error.


882-906: Consider emitting the average header-to-proof duration as a metric.

The log_session_stats method calculates avg_header_to_proof_secs (lines 892-896) but only logs it rather than recording it as a metric. This valuable per-session performance indicator would be more useful for monitoring if exposed as a metric.

Consider adding a metric similar to the existing publish_average_header_to_proof_time_seconds gauge but scoped to session completion rather than ongoing updates. This would enable better historical analysis of publisher performance:

// In log_session_stats, after line 905:
if self.header_proof_count > 0 {
    self.context
        .metrics
        .publish_session_avg_header_to_proof_seconds  // New metric
        .with_label_values::<&str>(&[])
        .observe(self.header_proof_total_duration / self.header_proof_count as f64);
}

Note: This would require adding the corresponding metric definition to the MetricsRegistry.


1863-1927: Verify that the session duration metric is correctly recorded.

The test validates the session statistics counters (blocks_received, blocks_skipped) but doesn't verify that the publish_session_duration_seconds metric (recorded in log_session_stats at lines 901-905) is actually incremented.

Consider adding an assertion to verify the metric was recorded:

// After line 1926 (session.log_session_stats();):
// Verify the session duration metric was recorded
let session_duration_metric = session
    .context
    .metrics
    .publish_session_duration_seconds
    .with_label_values::<&str>(&[]);

// Note: Prometheus metrics don't expose a direct getter, so you might need to
// use the test_utils or metrics introspection to verify the observation was recorded

This would provide more complete test coverage of the log_session_stats functionality.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aa61794 and 7f2dcda.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (16)
  • config/config.e2e.toml (1 hunks)
  • crates/rock-node-core/src/config.rs (2 hunks)
  • crates/rock-node-core/src/metrics.rs (3 hunks)
  • crates/rock-node-publish-plugin/Cargo.toml (1 hunks)
  • crates/rock-node-publish-plugin/src/error.rs (1 hunks)
  • crates/rock-node-publish-plugin/src/lib.rs (2 hunks)
  • crates/rock-node-publish-plugin/src/service.rs (1 hunks)
  • crates/rock-node-publish-plugin/src/session_manager.rs (10 hunks)
  • crates/rock-node-publish-plugin/src/validation.rs (1 hunks)
  • crates/rock-node-subscriber-plugin/src/session.rs (1 hunks)
  • tests/e2e/src/tests/ingestion_suite.rs (0 hunks)
  • tests/e2e/src/tests/mod.rs (1 hunks)
  • tests/e2e/src/tests/publish/basic.rs (1 hunks)
  • tests/e2e/src/tests/publish/mod.rs (1 hunks)
  • tests/e2e/src/tests/publish/multi_publisher_basic.rs (1 hunks)
  • tests/e2e/src/tests/publish/validation_errors.rs (1 hunks)
💤 Files with no reviewable changes (1)
  • tests/e2e/src/tests/ingestion_suite.rs
🧰 Additional context used
🧬 Code graph analysis (7)
crates/rock-node-publish-plugin/src/lib.rs (1)
crates/rock-node-publish-plugin/src/session_manager.rs (3)
  • context (404-407)
  • context (901-904)
  • new (48-73)
crates/rock-node-core/src/metrics.rs (1)
crates/rock-node-publish-plugin/src/session_manager.rs (1)
  • new (48-73)
tests/e2e/src/tests/publish/multi_publisher_basic.rs (2)
tests/e2e/src/common/block_builder.rs (1)
  • items (432-434)
tests/e2e/src/common.rs (1)
  • access_client (156-171)
tests/e2e/src/tests/publish/validation_errors.rs (1)
crates/rock-node-publish-plugin/src/session_manager.rs (1)
  • new (48-73)
crates/rock-node-publish-plugin/src/session_manager.rs (3)
crates/rock-node-publish-plugin/src/state.rs (2)
  • new (56-62)
  • default (50-52)
crates/rock-node-persistence-plugin/src/cold_storage/archiver.rs (2)
  • metrics (99-102)
  • create_test_metrics (169-172)
crates/rock-node-core/src/config.rs (10)
  • default (23-31)
  • default (58-63)
  • default (76-83)
  • default (93-95)
  • default (115-126)
  • default (141-149)
  • default (165-167)
  • default (177-179)
  • default (189-191)
  • default (231-239)
crates/rock-node-publish-plugin/src/validation.rs (2)
crates/rock-node-publish-plugin/src/session_manager.rs (1)
  • new (48-73)
crates/rock-node-core/src/config.rs (10)
  • default (23-31)
  • default (58-63)
  • default (76-83)
  • default (93-95)
  • default (115-126)
  • default (141-149)
  • default (165-167)
  • default (177-179)
  • default (189-191)
  • default (231-239)
tests/e2e/src/tests/publish/basic.rs (1)
crates/rock-node-publish-plugin/src/session_manager.rs (1)
  • new (48-73)
🔇 Additional comments (22)
config/config.e2e.toml (1)

37-38: LGTM! E2E configuration properly mirrors defaults.

The values align with the defaults defined in PublishServiceConfig (32 MB and 10000 items).

crates/rock-node-core/src/metrics.rs (2)

38-38: LGTM! New session duration metric properly added.

The metric follows the existing patterns and is correctly registered.


210-218: LGTM! Histogram initialization is correct.

The histogram is properly configured with no labels, which is appropriate for tracking session durations across all sessions.

crates/rock-node-publish-plugin/Cargo.toml (1)

20-20: LGTM! Appropriate dependency for error handling.

The thiserror crate is the standard choice for implementing error types in Rust libraries.

Based on learnings.

crates/rock-node-core/src/config.rs (2)

108-111: LGTM! Configuration fields properly documented.

The new fields are well-documented with clear default values.


123-124: LGTM! Default values are reasonable.

32 MB for message size and 10,000 items per set are sensible defaults.

crates/rock-node-publish-plugin/src/lib.rs (3)

26-26: LGTM! New error module added.

The error module supports the new validation and error handling functionality.


30-30: LGTM! New validation module added.

The validation module introduces request validation capabilities.


139-146: LGTM! Message size now configurable.

Replacing the hardcoded value with a configuration parameter improves flexibility and aligns with the PR's hardening objectives.

crates/rock-node-publish-plugin/src/service.rs (1)

56-57: LGTM! Session statistics logging improves observability.

Logging session stats before cleanup provides valuable insights into publisher behavior and aligns with the new session duration metric.

tests/e2e/src/tests/publish/mod.rs (1)

1-3: LGTM! New test modules properly declared.

The new test modules expand coverage for publishing scenarios including basic flows, multi-publisher races, and validation errors.

tests/e2e/src/tests/mod.rs (1)

6-6: Publish module exposure verified — no stale ingestion_suite references remain.

tests/e2e/src/tests/publish/multi_publisher_basic.rs (2)

322-522: LGTM!

This test demonstrates excellent coordination between publishers using explicit synchronization channels. The sequential block publishing and verification logic is clear and well-structured.


16-140: Use a longer, configurable timeout for winner detection
The 500 ms window in each tokio::time::timeout may be too tight on CI runners. Increase it to 2–3 s or drive via an environment variable, and add logging around the timeout calls to aid debugging.

crates/rock-node-publish-plugin/src/session_manager.rs (8)

25-45: LGTM!

The new fields appropriately support session lifecycle tracking, validation, and metrics collection. The field organization is logical and well-documented.


47-73: LGTM!

The constructor correctly initializes all new fields, including obtaining max_items from the config for the validator. The initialization logic is sound.


1484-1560: LGTM!

This unit test effectively validates the multi-publisher race condition behavior. The test logic clearly demonstrates that only one publisher becomes Primary while the others receive SkipBlock responses.


1562-1612: LGTM!

These validation tests (empty block items and missing header) correctly verify that the RequestValidator rejects malformed requests and terminates the session. Good coverage of validation error paths.


1614-1668: LGTM!

This test appropriately validates that the max_items_per_set limit is enforced. The test construction is clean, creating a modified context with a lower limit (5 items) and verifying rejection when 10 items are sent.


1670-1789: LGTM!

This test effectively demonstrates that different publishers can win consecutive blocks, confirming that winner selection isn't "sticky" to a single publisher. The persistence simulation and assertions are appropriate.


1791-1861: LGTM!

This test correctly simulates session disconnection cleanup, including removing the winner entry and broadcasting ResendBlock to other sessions. The manual simulation approach is reasonable for unit testing this scenario.


1929-1985: LGTM!

This test effectively validates that multiple publishers can work on different blocks concurrently, demonstrating the system's ability to accept future blocks (block 102 while 101 is in progress). The test correctly verifies that both sessions are registered as winners for their respective blocks.

Signed-off-by: georgi-l95 <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/rock-node-publish-plugin/src/session_manager.rs (3)

768-771: Bug: moving message in loop and awaiting while holding DashMap guard.

ack_msg is moved on first send (won’t compile across iterations), and awaiting send() while iterating holds DashMap guards across .await. Collect senders first and clone the message per send.

-                for session_entry in self.shared_state.active_sessions.iter() {
-                    let _ = session_entry.value().send(Ok(ack_msg)).await;
-                }
+                // Collect senders first to avoid holding DashMap guards across await
+                let senders: Vec<_> = self
+                    .shared_state
+                    .active_sessions
+                    .iter()
+                    .map(|e| e.value().clone())
+                    .collect();
+                for tx in senders {
+                    let _ = tx.send(Ok(ack_msg.clone())).await;
+                }

809-813: Bug: same broadcast issue on verification failure.

Move-free and avoid holding DashMap guard across .await.

-                for session_entry in self.shared_state.active_sessions.iter() {
-                    if *session_entry.key() != self.id {
-                        let _ = session_entry.value().send(Ok(resend_msg)).await;
-                    }
-                }
+                let senders: Vec<_> = self
+                    .shared_state
+                    .active_sessions
+                    .iter()
+                    .filter(|e| *e.key() != self.id)
+                    .map(|e| e.value().clone())
+                    .collect();
+                for tx in senders {
+                    let _ = tx.send(Ok(resend_msg.clone())).await;
+                }

855-859: Bug: same broadcast issue on timeout path.

Collect senders and clone the message per send.

-                for session_entry in self.shared_state.active_sessions.iter() {
-                    if *session_entry.key() != self.id {
-                        let _ = session_entry.value().send(Ok(resend_msg)).await;
-                    }
-                }
+                let senders: Vec<_> = self
+                    .shared_state
+                    .active_sessions
+                    .iter()
+                    .filter(|e| *e.key() != self.id)
+                    .map(|e| e.value().clone())
+                    .collect();
+                for tx in senders {
+                    let _ = tx.send(Ok(resend_msg.clone())).await;
+                }
🧹 Nitpick comments (13)
crates/rock-node-block-access-plugin/src/service.rs (1)

210-216: Unnecessary clippy suppression?

result_large_err typically flags large Err types; here Err=tonic::Status (small), and this function always returns Ok. Consider removing #[allow(clippy::result_large_err)] or add a short comment why it's needed.

tests/e2e/src/tests/subscribe/basic.rs (2)

20-31: DRY: extract header_number helper to common test util.

The same helper appears in multiple modules. Move it to tests/e2e/src/common.rs and reuse.


55-73: Add timeouts to avoid hanging tests.

Wrap stream.next() with tokio::time::timeout to prevent indefinite hangs if the server misbehaves.

-    while let Some(msg) = stream.next().await {
-        let msg = msg?;
+    while let Some(Ok(msg)) =
+        tokio::time::timeout(Duration::from_secs(5), stream.next()).await?
+    {
tests/e2e/src/tests/subscribe/validation.rs (3)

11-22: DRY: extract header_number helper to common test util.

Same helper exists in other subscribe tests. Centralize it in tests/e2e/src/common.rs.


43-59: Guard against hangs with timeouts.

Use tokio::time::timeout around the first stream.next() to avoid indefinite waits in failure scenarios.

-    let first = stream.next().await.expect("Expected a response")?;
+    let first = tokio::time::timeout(Duration::from_secs(5), stream.next())
+        .await?
+        .expect("Expected a response")?;

228-240: Apply timeouts when draining responses.

Add a timeout to the while-let loop to avoid test flakiness or hangs if the status never arrives.

-    while let Some(Ok(msg)) = stream.next().await {
+    while let Some(Ok(msg)) =
+        tokio::time::timeout(Duration::from_secs(5), stream.next()).await?
+    {
tests/e2e/src/tests/subscribe/edge_cases.rs (2)

17-28: DRY: extract header_number helper to common test util.

Consolidate this helper in tests/e2e/src/common.rs to avoid duplication.


56-66: Add timeout to initial receive loop.

Use tokio::time::timeout to avoid a potential hang when awaiting the first 6 blocks.

-    for _ in 0..6 {
-        if let Some(Ok(msg)) = stream.next().await {
+    for _ in 0..6 {
+        if let Some(Ok(msg)) =
+            tokio::time::timeout(Duration::from_secs(5), stream.next()).await?
+        {
crates/rock-node-publish-plugin/src/session_manager.rs (2)

399-427: Add proof–block number consistency check.

Validate that BlockProof.block matches self.current_block_number before completing the block. Fail fast on mismatch.

-                    BlockItemType::BlockProof(_) => {
+                    BlockItemType::BlockProof(proof) => {
+                        // Ensure proof targets the current block
+                        if proof.block as i64 != self.current_block_number {
+                            error!(
+                                session_id = %self.id,
+                                expected = self.current_block_number,
+                                received = proof.block,
+                                "BlockProof refers to mismatched block"
+                            );
+                            self.current_block_action = Some(BlockAction::EndError);
+                            self.handle_end_error(BlockAction::EndError).await;
+                            return true; // Terminate on mismatch
+                        }
                         // Measure header -> proof duration
                         if let Some(start) = self.block_start_time {
                             let duration = start.elapsed().as_secs_f64();

93-99: Metric naming clarity.

blocks_received increments per BlockItems request, not per logical block. Consider renaming (e.g., requests_received) or documenting semantics to avoid confusion.

tests/e2e/src/tests/subscribe/failure_scenarios.rs (1)

10-49: Weak test coverage acknowledged in comments.

The test verifies only that client disconnect doesn't cause a panic (line 46), without checking metrics or session cleanup state. While the comment at line 45 acknowledges this limitation, consider enhancing this test when feasible to verify:

  • Active session count decrements
  • Metrics reflect the disconnection
  • Session resources are properly cleaned up
crates/rock-node-subscriber-plugin/src/session.rs (2)

207-340: Consider refactoring for reduced complexity.

The validate_and_check_fulfillment method spans 133 lines with multiple nested branches handling various start/end combinations. While the logic appears correct, the cognitive complexity is high. Consider extracting helper methods for the distinct cases:

  • validate_pure_live_streaming() for start=MAX, end=MAX (lines 266-270)
  • validate_earliest_to_end() for start=MAX, end=N (lines 272-295)
  • validate_explicit_range() for start=N, end=M (lines 297-340)

This would improve readability and make the validation logic easier to maintain and test independently.


33-33: Remove unused #[allow(clippy::result_large_err)]
Clippy does not emit this warning for the Result<Self, tonic::Status> in new, so the suppression can be deleted.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7f2dcda and 21d25b0.

📒 Files selected for processing (15)
  • .github/workflows/release.yml (1 hunks)
  • config/config.toml (5 hunks)
  • crates/rock-node-backfill-plugin/src/worker.rs (1 hunks)
  • crates/rock-node-block-access-plugin/src/service.rs (3 hunks)
  • crates/rock-node-publish-plugin/src/error.rs (1 hunks)
  • crates/rock-node-publish-plugin/src/session_manager.rs (10 hunks)
  • crates/rock-node-subscriber-plugin/src/error.rs (1 hunks)
  • crates/rock-node-subscriber-plugin/src/service.rs (2 hunks)
  • crates/rock-node-subscriber-plugin/src/session.rs (11 hunks)
  • tests/e2e/src/tests/mod.rs (1 hunks)
  • tests/e2e/src/tests/subscribe/basic.rs (1 hunks)
  • tests/e2e/src/tests/subscribe/edge_cases.rs (1 hunks)
  • tests/e2e/src/tests/subscribe/failure_scenarios.rs (1 hunks)
  • tests/e2e/src/tests/subscribe/mod.rs (1 hunks)
  • tests/e2e/src/tests/subscribe/validation.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/rock-node-publish-plugin/src/error.rs
🧰 Additional context used
🧬 Code graph analysis (6)
crates/rock-node-publish-plugin/src/session_manager.rs (3)
crates/rock-node-publish-plugin/src/state.rs (2)
  • new (56-62)
  • default (50-52)
crates/rock-node-persistence-plugin/src/cold_storage/archiver.rs (2)
  • metrics (99-102)
  • create_test_metrics (169-172)
crates/rock-node-core/src/config.rs (10)
  • default (23-31)
  • default (58-63)
  • default (76-83)
  • default (93-95)
  • default (115-126)
  • default (141-149)
  • default (165-167)
  • default (177-179)
  • default (189-191)
  • default (231-239)
tests/e2e/src/tests/subscribe/failure_scenarios.rs (3)
tests/e2e/src/common.rs (1)
  • publish_blocks (241-280)
crates/rock-node-subscriber-plugin/src/session.rs (3)
  • new (34-64)
  • new (484-486)
  • drop (489-491)
crates/rock-node-subscriber-plugin/src/service.rs (1)
  • new (24-30)
tests/e2e/src/tests/subscribe/basic.rs (1)
tests/e2e/src/common.rs (1)
  • publish_blocks (241-280)
tests/e2e/src/tests/subscribe/edge_cases.rs (1)
tests/e2e/src/common.rs (1)
  • publish_blocks (241-280)
tests/e2e/src/tests/subscribe/validation.rs (2)
tests/e2e/src/common.rs (2)
  • publish_blocks (241-280)
  • with_config (74-79)
crates/rock-node-subscriber-plugin/src/session.rs (4)
  • new (34-64)
  • new (484-486)
  • test_validation_start_too_far_in_future (813-842)
  • test_validation_from_earliest_but_end_before_earliest (892-920)
crates/rock-node-subscriber-plugin/src/session.rs (2)
crates/rock-node-backfill-plugin/src/worker.rs (6)
  • new (190-226)
  • new (619-621)
  • new (670-676)
  • new (720-725)
  • new (843-853)
  • create_test_context (759-814)
crates/rock-node-core/src/config.rs (10)
  • default (23-31)
  • default (58-63)
  • default (76-83)
  • default (93-95)
  • default (115-126)
  • default (141-149)
  • default (165-167)
  • default (177-179)
  • default (189-191)
  • default (231-239)
🪛 actionlint (1.7.7)
.github/workflows/release.yml

100-100: the runner of "softprops/action-gh-release@v1" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build and Basic Quality Checks
  • GitHub Check: Security Audit
🔇 Additional comments (15)
crates/rock-node-backfill-plugin/src/worker.rs (1)

262-262: LGTM! Excellent refactor to accept slice instead of Vec.

Changing the parameter from &mut Vec<String> to &mut [String] follows Rust best practices by accepting any type that can be coerced to a mutable slice. This makes the API more flexible while maintaining backward compatibility at call sites (Vec automatically coerces to slice).

crates/rock-node-block-access-plugin/src/service.rs (1)

27-28: Good: added Error mapping and test coverage.

The code_to_string mapping for Code::Error is correct and covered by tests.

crates/rock-node-publish-plugin/src/session_manager.rs (2)

52-71: Good: validator wiring and per‑session metrics fields.

Initializing RequestValidator with max_items_per_set and tracking session stats is a solid addition.


881-905: Call log_session_stats on session teardown.

Ensure the service invokes log_session_stats() when a session terminates (on all terminal paths) so metrics are recorded.

tests/e2e/src/tests/subscribe/validation.rs (1)

67-114: Verify port contention with custom config.

The config fixes grpc_port=50051. With #[serial], tests run serially in-crate, but confirm no external processes/tests use the same port concurrently in CI.

crates/rock-node-subscriber-plugin/src/service.rs (1)

12-12: LGTM! Improved logging and failure tracking.

The logging level adjustments (info → debug for normal lifecycle events) reduce verbosity while maintaining visibility for failures. The conditional failure cause logging at line 75-77 provides clear diagnostics when sessions end with errors.

Also applies to: 70-81

tests/e2e/src/tests/mod.rs (1)

6-6: LGTM! Test suite reorganization.

The module reorganization from ingestion_suite to separate publish and subscribe modules aligns with the PR's goal of enhanced test coverage.

Also applies to: 10-10

tests/e2e/src/tests/subscribe/mod.rs (1)

1-4: LGTM! New test module structure.

Clear organization for the subscribe test suite with logical module separation by test category.

crates/rock-node-subscriber-plugin/src/error.rs (1)

44-59: LGTM! Enhanced metric granularity for validation errors.

The refined to_metric_label implementation for the Validation variant provides better observability by distinguishing between different validation failure types. The fallback case handles future Code variants safely.

tests/e2e/src/tests/subscribe/failure_scenarios.rs (2)

94-156: LGTM! Comprehensive multi-subscriber test.

The test correctly verifies that when one subscriber disconnects, others continue operating normally. The logic properly tracks that subscriber B receives all 31 blocks (0-30 inclusive) despite subscriber A's early disconnect.


51-92: Ignored tests are appropriately documented placeholders.

The #[ignore] markers and detailed TODO comments clearly explain the infrastructure requirements needed to implement these tests (server shutdown capabilities, custom config support, etc.). This is good practice for tracking future work.

Also applies to: 158-177

crates/rock-node-subscriber-plugin/src/session.rs (4)

29-29: LGTM! Useful failure cause tracking.

The new failure_cause field and get_failure_cause() accessor provide valuable diagnostics for session failures, enabling better observability and debugging.

Also applies to: 62-62, 66-69


312-323: Verify max_future_block_lookahead behavior when no blocks exist.

The max_future_block_lookahead check at lines 313-322 only applies when latest is Some. This means if no blocks have been persisted yet, there's no upper bound on the start_block request.

This is likely intentional to allow subscribing to future blocks when the node is starting up, but verify this behavior aligns with the expected semantics, especially for edge cases where the node has just started and has no data.


72-90: LGTM! Enhanced logging and error handling.

The expanded logging throughout the session lifecycle provides excellent observability:

  • Session start/completion logs with success flag
  • Per-error-type logging in finalize_stream with appropriate levels (debug for disconnects/shutdown, warn for timeouts/lag, error for persistence/internal failures)
  • Average inter-block time statistics

This will significantly aid debugging and monitoring in production.

Also applies to: 97-102, 107-121, 386-442


810-1003: LGTM! Comprehensive validation test coverage.

The seven new tests thoroughly exercise the validation logic:

  • Future block lookahead boundaries
  • Pure live streaming mode
  • Earliest-to-end ranges
  • Invalid range combinations
  • No-data edge cases
  • Failure cause tracking

Excellent coverage for the new validation paths.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
crates/rock-node-publish-plugin/src/session_manager.rs (1)

108-141: Past review comment has been addressed.

The EndStream error response now correctly uses the extracted block number when available via block_number.unwrap_or(self.current_block_number) at line 129. This resolves the concern raised in the previous review about reporting 0 for new blocks with headers.

🧹 Nitpick comments (4)
.github/workflows/release.yml (1)

63-66: Pin cross installation to a tagged release.

Pulling cross from the git default branch makes the release job fragile—any upstream change can suddenly break reproducible builds. Please install a fixed release (e.g. via --version or --git ... --tag) so the workflow stays stable.

-          cargo install cross --git https://github.com/cross-rs/cross
+          cargo install cross --version 0.2.5 --locked

Adjust the version to the exact tag you rely on.

crates/rock-node-publish-plugin/src/session_manager.rs (3)

89-141: Validation logic is well-implemented with defensive error handling.

The validation flow correctly:

  • Extracts the block number from headers when present
  • Validates requests before processing
  • Terminates the session with appropriate error responses on validation failures
  • Records validation errors in metrics

Optional consideration: The blocks_received counter at line 93 is incremented before validation, meaning invalid requests also increment this counter. This provides full observability of all received requests, but if you prefer to track only valid requests, you could move the increment to after line 141. The current approach is more comprehensive for monitoring, so this is truly optional.


636-638: Consider renaming blocks_accepted for clarity.

The blocks_accepted counter is incremented at line 637, which occurs before verification (line 662) and persistence (line 674). If verification fails, the block is still counted as "accepted." Consider renaming this counter to blocks_completed to more accurately reflect that it counts blocks that completed the publish phase, regardless of verification outcome.

If the current naming is intentional (i.e., "accepted" means "accepted by this session manager for processing"), then this is acceptable, but the distinction should be clear in the code or comments.


1479-1984: Excellent test coverage for the publisher hardening features!

The new comprehensive tests verify:

  • ✅ Multi-publisher race conditions and winner selection
  • ✅ Validation error handling (empty items, missing headers, item count limits)
  • ✅ Sequential and concurrent block processing from multiple publishers
  • ✅ Winner cleanup on session disconnect
  • ✅ Session statistics tracking

These tests significantly strengthen confidence in the hardening changes, particularly around race conditions and validation scenarios.

Minor enhancement: The session_stats_recorded_correctly test at line 1920 could additionally verify that blocks_accepted equals 0, since no blocks are completed in that test scenario. This would make the test more complete.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 21d25b0 and ba6f5dd.

📒 Files selected for processing (3)
  • .github/workflows/release.yml (1 hunks)
  • config/config.toml (5 hunks)
  • crates/rock-node-publish-plugin/src/session_manager.rs (10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • config/config.toml
🧰 Additional context used
🧬 Code graph analysis (1)
crates/rock-node-publish-plugin/src/session_manager.rs (3)
crates/rock-node-publish-plugin/src/state.rs (2)
  • new (56-62)
  • default (50-52)
crates/rock-node-persistence-plugin/src/cold_storage/archiver.rs (2)
  • metrics (99-102)
  • create_test_metrics (169-172)
crates/rock-node-core/src/config.rs (10)
  • default (23-31)
  • default (58-63)
  • default (76-83)
  • default (93-95)
  • default (115-126)
  • default (141-149)
  • default (165-167)
  • default (177-179)
  • default (189-191)
  • default (231-239)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build and Basic Quality Checks
  • GitHub Check: Security Audit
🔇 Additional comments (5)
crates/rock-node-publish-plugin/src/session_manager.rs (5)

1-4: LGTM!

The new import for RequestValidator is appropriately added to support the validation logic introduced in this PR.


24-44: LGTM!

The new fields appropriately support session tracking, validation, and metrics collection. The optional publisher_id is well-suited for scenarios where the publisher identifier may not be immediately available.


46-72: LGTM!

Constructor correctly initializes all new fields. The validator is properly configured with the max_items limit from the configuration.


437-452: LGTM!

The blocks_skipped counter is correctly incremented when blocks are skipped, providing accurate session statistics.


880-905: LGTM!

The log_session_stats method provides comprehensive session telemetry:

  • Calculates and logs session duration
  • Includes all relevant counters (received, accepted, skipped)
  • Safely handles the average calculation with a zero-check
  • Records the histogram metric for session duration

This will be valuable for monitoring publisher behavior and diagnosing issues.

@georgi-l95 georgi-l95 merged commit cdf2fd2 into main Oct 10, 2025
7 checks passed
@georgi-l95 georgi-l95 deleted the improve-publisher branch October 10, 2025 11:21
@codecov
Copy link

codecov bot commented Oct 10, 2025

Codecov Report

❌ Patch coverage is 92.79609% with 59 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.90%. Comparing base (aa61794) to head (ba6f5dd).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
crates/rock-node-subscriber-plugin/src/session.rs 88.47% 31 Missing ⚠️
...es/rock-node-publish-plugin/src/session_manager.rs 98.09% 8 Missing ⚠️
crates/rock-node-publish-plugin/src/lib.rs 0.00% 7 Missing ⚠️
crates/rock-node-subscriber-plugin/src/service.rs 0.00% 5 Missing ⚠️
crates/rock-node-publish-plugin/src/validation.rs 96.00% 4 Missing ⚠️
crates/rock-node-subscriber-plugin/src/error.rs 60.00% 2 Missing ⚠️
crates/rock-node-core/src/metrics.rs 85.71% 1 Missing ⚠️
crates/rock-node-publish-plugin/src/service.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #123      +/-   ##
==========================================
+ Coverage   78.67%   79.90%   +1.22%     
==========================================
  Files          55       56       +1     
  Lines        9706    10493     +787     
==========================================
+ Hits         7636     8384     +748     
- Misses       2070     2109      +39     
Files with missing lines Coverage Δ
crates/rock-node-backfill-plugin/src/worker.rs 55.67% <100.00%> (ø)
...rates/rock-node-block-access-plugin/src/service.rs 92.61% <100.00%> (+0.30%) ⬆️
crates/rock-node-core/src/config.rs 100.00% <100.00%> (ø)
crates/rock-node-core/src/metrics.rs 90.95% <85.71%> (-0.09%) ⬇️
crates/rock-node-publish-plugin/src/service.rs 0.00% <0.00%> (ø)
crates/rock-node-subscriber-plugin/src/error.rs 50.00% <60.00%> (ø)
crates/rock-node-publish-plugin/src/validation.rs 96.00% <96.00%> (ø)
crates/rock-node-subscriber-plugin/src/service.rs 0.00% <0.00%> (ø)
crates/rock-node-publish-plugin/src/lib.rs 0.00% <0.00%> (ø)
...es/rock-node-publish-plugin/src/session_manager.rs 90.64% <98.09%> (+3.74%) ⬆️
... and 1 more
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants