From 1ecd643cd1fd35489499712d3a77d59769916654 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 21 Apr 2026 18:14:01 +0000 Subject: [PATCH 1/4] fix: drop stale routing entries on NoShardsAvailable failures When an index was deleted and recreated, the router's per-ingester routing entry for the old incarnation could stay marked as having open shards because the ingester's piggybacked routing update only covers sources it still holds. Persist retries then kept picking the dead entry and the request surfaced as a 503 until Chitchat eventually caught up. Treat a `NoShardsAvailable` failure as a signal that this (leader, index_uid, source_id) has no reachable shard and zero it out in the routing table. If no nodes remain for that (index_id, source_id) the next attempt re-queries the control plane, which returns the fresh incarnation's shards. Fixes #6324 --- .../quickwit-ingest/src/ingest_v2/router.rs | 230 +++++++++++++++++- 1 file changed, 218 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index ecf5f707642..0770b5f3e0a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -35,7 +35,7 @@ use quickwit_proto::ingest::router::{ IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, RateLimitingCause}; -use quickwit_proto::types::{NodeId, SubrequestId}; +use quickwit_proto::types::{IndexUid, NodeId, SourceId, SubrequestId}; use serde_json::{Value as JsonValue, json}; use tokio::sync::{Mutex, Semaphore}; use tokio::time::error::Elapsed; @@ -286,6 +286,7 @@ impl IngestRouter { match persist_result { Ok(persist_response) => { let leader_id = NodeId::from_str(&persist_response.leader_id); + let mut no_shards_entries: Vec<(IndexUid, SourceId)> = Vec::new(); for persist_success in persist_response.successes { workbench.record_persist_success(persist_success); @@ -295,9 +296,18 @@ impl IngestRouter { match persist_failure.reason() { PersistFailureReason::NoShardsAvailable => { - // For non-critical failures, we don't mark the nodes unavailable; - // a routing update is piggybacked on PersistResponses, so shard - // counts and capacity scores will be fresh on the next try. + // The ingester reported no shard for this (index_uid, + // source_id). The piggybacked routing update only covers + // sources the ingester still holds, so a removed source + // (e.g. after an index was deleted and recreated) would + // leave this node's entry stale and trap retries on a + // dead shard. Mark the node as having zero open shards + // for this entry; when no nodes remain, the next attempt + // re-queries the control plane. + no_shards_entries.push(( + persist_failure.index_uid().clone(), + persist_failure.source_id.clone(), + )); } PersistFailureReason::NodeUnavailable | PersistFailureReason::WalFull @@ -308,22 +318,35 @@ impl IngestRouter { } } - if let Some(routing_update) = persist_response.routing_update { + if !no_shards_entries.is_empty() || persist_response.routing_update.is_some() { // Since we just talked to the node, we take advantage and use the // opportunity to get a fresh routing update. let mut state_guard = self.state.lock().await; - for shard_update in routing_update.source_shard_updates { + + for (index_uid, source_id) in no_shards_entries { state_guard.routing_table.apply_capacity_update( leader_id.clone(), - shard_update.index_uid().clone(), - shard_update.source_id, - routing_update.capacity_score as usize, - shard_update.open_shard_count as usize, + index_uid, + source_id, + 0, + 0, ); } - drop(state_guard); - workbench.closed_shards.extend(routing_update.closed_shards); + if let Some(routing_update) = persist_response.routing_update { + for shard_update in routing_update.source_shard_updates { + state_guard.routing_table.apply_capacity_update( + leader_id.clone(), + shard_update.index_uid().clone(), + shard_update.source_id, + routing_update.capacity_score as usize, + shard_update.open_shard_count as usize, + ); + } + drop(state_guard); + + workbench.closed_shards.extend(routing_update.closed_shards); + } } } Err(persist_error) => { @@ -1860,4 +1883,187 @@ mod tests { .unwrap(); assert_eq!(node.node_id, NodeId::from_str("test-ingester-0")); } + + #[tokio::test] + async fn test_no_shards_available_clears_stale_routing_entry() { + // Regression test for https://github.com/quickwit-oss/quickwit/issues/6324. + // + // When an index is deleted and recreated, the ingester closes its old shards and + // stops advertising them. A persist request issued against the stale routing + // entry fails with NoShardsAvailable, and the piggybacked routing update no + // longer covers the removed source — so before the fix the router kept picking + // the dead entry forever and ingests returned 503 until Chitchat caught up. + let ingester_pool = IngesterPool::default(); + let router = IngestRouter::new( + "test-router".into(), + ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), + ingester_pool.clone(), + 1, + EventBroker::default(), + Some("test-az".to_string()), + ); + let stale_index_uid = IndexUid::for_test("test-index", 0); + { + let mut state_guard = router.state.lock().await; + state_guard.routing_table.merge_from_shards( + stale_index_uid.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(stale_index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), + ..Default::default() + }], + ); + } + ingester_pool.insert( + "test-ingester-0".into(), + IngesterPoolEntry::mocked_ingester(), + ); + + // Sanity-check: the stale entry is currently considered valid for routing. + { + let state_guard = router.state.lock().await; + assert!(state_guard.routing_table.has_open_nodes( + "test-index", + "test-source", + &ingester_pool, + &HashSet::new(), + )); + } + + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + + let persist_futures = FuturesUnordered::new(); + let stale_index_uid_clone = stale_index_uid.clone(); + persist_futures.push(async move { + let summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: Some(stale_index_uid_clone), + source_id: "test-source".to_string(), + reason: PersistFailureReason::NoShardsAvailable as i32, + }], + // The ingester has no shard for this source anymore, so the piggybacked + // routing update omits it entirely. + routing_update: Some(RoutingUpdate { + capacity_score: 6, + source_shard_updates: Vec::new(), + ..Default::default() + }), + }); + (summary, result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + + // The stale routing entry must no longer look routable — otherwise retries would + // keep hammering the dead shard and surface as a 503. + let state_guard = router.state.lock().await; + assert!(!state_guard.routing_table.has_open_nodes( + "test-index", + "test-source", + &ingester_pool, + &HashSet::new(), + )); + } + + #[tokio::test] + async fn test_no_shards_available_preserves_entry_when_routing_update_refreshes_it() { + // When a shard is merely rate-limited the ingester still returns NoShardsAvailable + // but includes a fresh routing update saying the shard is open. The update must + // take precedence so the node remains routable for the next retry. + let ingester_pool = IngesterPool::default(); + let router = IngestRouter::new( + "test-router".into(), + ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), + ingester_pool.clone(), + 1, + EventBroker::default(), + Some("test-az".to_string()), + ); + let index_uid = IndexUid::for_test("test-index", 0); + { + let mut state_guard = router.state.lock().await; + state_guard.routing_table.merge_from_shards( + index_uid.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), + ..Default::default() + }], + ); + } + ingester_pool.insert( + "test-ingester-0".into(), + IngesterPoolEntry::mocked_ingester(), + ); + + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + + let persist_futures = FuturesUnordered::new(); + let index_uid_clone = index_uid.clone(); + persist_futures.push(async move { + let summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: Some(index_uid_clone.clone()), + source_id: "test-source".to_string(), + reason: PersistFailureReason::NoShardsAvailable as i32, + }], + routing_update: Some(RoutingUpdate { + capacity_score: 6, + source_shard_updates: vec![SourceShardUpdate { + index_uid: Some(index_uid_clone), + source_id: "test-source".to_string(), + open_shard_count: 1, + }], + ..Default::default() + }), + }); + (summary, result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + + let state_guard = router.state.lock().await; + assert!(state_guard.routing_table.has_open_nodes( + "test-index", + "test-source", + &ingester_pool, + &HashSet::new(), + )); + } } From 599835a8b9c6ced4178393709bbef8f5c396bc50 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 21 Apr 2026 20:19:40 +0000 Subject: [PATCH 2/4] fix: document routing_update invariant on NoShardsAvailable fix Clarifies the hidden contract the fix leans on: the zero-out and piggybacked routing update run under the same lock, which is what keeps the rate-limited subcase of NoShardsAvailable correct. --- quickwit/quickwit-ingest/src/ingest_v2/router.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 0770b5f3e0a..a606c90eb3f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -320,7 +320,11 @@ impl IngestRouter { if !no_shards_entries.is_empty() || persist_response.routing_update.is_some() { // Since we just talked to the node, we take advantage and use the - // opportunity to get a fresh routing update. + // opportunity to get a fresh routing update. Both the zero-out and the + // piggybacked update run under the same lock so the rate-limited + // subcase of NoShardsAvailable — where the shard still exists — is + // immediately restored by the routing update that follows (the ingester + // only returns routing_update=None on the NodeUnavailable fast path). let mut state_guard = self.state.lock().await; for (index_uid, source_id) in no_shards_entries { From aa7647ab60ff33a77b5d664b90b47ff6c9d0f984 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 27 Apr 2026 18:31:33 +0000 Subject: [PATCH 3/4] fix: preserve capacity score when marking a node as having no shards Address PR review: introduce RoutingTable::mark_node_no_shards instead of calling apply_capacity_update(.., 0, 0). The new method only zeros the open_shard_count and leaves the capacity_score untouched (capacity is a node-level WAL signal independent of any specific source). It also no-ops on missing entries/nodes and on incarnation mismatches, so a narrowing signal can never roll back a fresher entry. --- .../quickwit-ingest/src/ingest_v2/router.rs | 27 +++--- .../src/ingest_v2/routing_table.rs | 87 +++++++++++++++++++ 2 files changed, 100 insertions(+), 14 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index a606c90eb3f..2b19ad497e2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -327,14 +327,10 @@ impl IngestRouter { // only returns routing_update=None on the NodeUnavailable fast path). let mut state_guard = self.state.lock().await; - for (index_uid, source_id) in no_shards_entries { - state_guard.routing_table.apply_capacity_update( - leader_id.clone(), - index_uid, - source_id, - 0, - 0, - ); + for (index_uid, source_id) in &no_shards_entries { + state_guard + .routing_table + .mark_node_no_shards(&leader_id, index_uid, source_id); } if let Some(routing_update) = persist_response.routing_update { @@ -1930,11 +1926,12 @@ mod tests { // Sanity-check: the stale entry is currently considered valid for routing. { let state_guard = router.state.lock().await; - assert!(state_guard.routing_table.has_open_nodes( + let mut unavailable_leaders: HashSet = HashSet::new(); + assert!(state_guard.routing_table.has_any_routing_candidate( "test-index", "test-source", &ingester_pool, - &HashSet::new(), + &mut unavailable_leaders, )); } @@ -1979,11 +1976,12 @@ mod tests { // The stale routing entry must no longer look routable — otherwise retries would // keep hammering the dead shard and surface as a 503. let state_guard = router.state.lock().await; - assert!(!state_guard.routing_table.has_open_nodes( + let mut unavailable_leaders: HashSet = HashSet::new(); + assert!(!state_guard.routing_table.has_any_routing_candidate( "test-index", "test-source", &ingester_pool, - &HashSet::new(), + &mut unavailable_leaders, )); } @@ -2063,11 +2061,12 @@ mod tests { .await; let state_guard = router.state.lock().await; - assert!(state_guard.routing_table.has_open_nodes( + let mut unavailable_leaders: HashSet = HashSet::new(); + assert!(state_guard.routing_table.has_any_routing_candidate( "test-index", "test-source", &ingester_pool, - &HashSet::new(), + &mut unavailable_leaders, )); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 973de0e582a..21fd6cd6691 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -273,6 +273,27 @@ impl RoutingTable { entry.nodes.insert(node_id, ingester_node); } + /// Zeros out the open shard count for `node_id` on the (index, source) entry while preserving + /// its capacity score. Called when a persist response reports that the ingester no longer + /// holds a shard for this (index_uid, source_id), so the entry stops being picked until a + /// fresh routing update or control-plane response repopulates it. + /// + /// No-op when the entry is missing, the node is absent from the entry, or the entry is at a + /// different incarnation than `index_uid` — a narrowing signal must never roll the table back + /// or shadow a fresher state. + pub fn mark_node_no_shards(&mut self, node_id: &NodeId, index_uid: &IndexUid, source_id: &str) { + let key = (index_uid.index_id.to_string(), source_id.to_string()); + let Some(entry) = self.table.get_mut(&key) else { + return; + }; + if entry.index_uid != *index_uid { + return; + } + if let Some(node) = entry.nodes.get_mut(node_id) { + node.open_shard_count = 0; + } + } + /// Merges routing updates from a GetOrCreateOpenShards control plane response into the /// table. For existing nodes, updates their open shard count, including if the count is 0, from /// the CP response while preserving capacity scores if they already exist. @@ -856,4 +877,70 @@ mod tests { assert!(!entry.nodes.contains_key("node-3")); assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 2)); } + + #[test] + fn test_mark_node_no_shards() { + let mut table = RoutingTable::default(); + let index_uid = IndexUid::for_test("test-index", 1); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Missing entry: no-op, no panic, nothing inserted. + table.mark_node_no_shards(&"node-1".into(), &index_uid, "test-source"); + assert!(table.table.get(&key).is_none()); + + // Seed an entry with two nodes carrying real capacity scores. + table.apply_capacity_update( + "node-1".into(), + index_uid.clone(), + "test-source".into(), + 8, + 3, + ); + table.apply_capacity_update( + "node-2".into(), + index_uid.clone(), + "test-source".into(), + 6, + 2, + ); + + // Missing node within the entry: no-op. + table.mark_node_no_shards(&"unknown".into(), &index_uid, "test-source"); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.get("node-1").unwrap().open_shard_count, 3); + assert_eq!(entry.nodes.get("node-2").unwrap().open_shard_count, 2); + + // Matching incarnation: zero only the open shard count, capacity score is preserved. + table.mark_node_no_shards(&"node-1".into(), &index_uid, "test-source"); + let entry = table.table.get(&key).unwrap(); + let node_1 = entry.nodes.get("node-1").unwrap(); + assert_eq!(node_1.open_shard_count, 0); + assert_eq!(node_1.capacity_score, 8); + // Sibling node untouched. + let node_2 = entry.nodes.get("node-2").unwrap(); + assert_eq!(node_2.open_shard_count, 2); + assert_eq!(node_2.capacity_score, 6); + + // Older incarnation argument: no-op (must not roll the entry back). + let stale_index_uid = IndexUid::for_test("test-index", 0); + table.mark_node_no_shards(&"node-2".into(), &stale_index_uid, "test-source"); + assert_eq!( + table + .table + .get(&key) + .unwrap() + .nodes + .get("node-2") + .unwrap() + .open_shard_count, + 2 + ); + + // Newer incarnation argument: no-op (don't shadow a fresher entry we don't know yet). + let newer_index_uid = IndexUid::for_test("test-index", 2); + table.mark_node_no_shards(&"node-2".into(), &newer_index_uid, "test-source"); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.index_uid, index_uid); + assert_eq!(entry.nodes.get("node-2").unwrap().open_shard_count, 2); + } } From 53836ed968b8f9738489da2466569ee744396cd9 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 20:02:51 +0000 Subject: [PATCH 4/4] fix: mirror apply_capacity_update incarnation checks in mark_node_no_shards MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR review: replace the != short-circuit with the same Less / Equal / Greater cmp match used by apply_capacity_update and merge_from_shards. A stale signal (entry newer than the failure's index_uid) is still ignored; a signal for a newer incarnation now advances the entry, drops stale nodes, and forces a CP re-seed — consistent with how the rest of the routing table handles monotonic incarnations. --- .../src/ingest_v2/routing_table.rs | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 21fd6cd6691..0aed8b33663 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -278,16 +278,25 @@ impl RoutingTable { /// holds a shard for this (index_uid, source_id), so the entry stops being picked until a /// fresh routing update or control-plane response repopulates it. /// - /// No-op when the entry is missing, the node is absent from the entry, or the entry is at a - /// different incarnation than `index_uid` — a narrowing signal must never roll the table back - /// or shadow a fresher state. + /// Mirrors the incarnation handling of [`Self::apply_capacity_update`]: a stale signal + /// (entry newer than `index_uid`) is ignored, and a signal for a newer incarnation advances + /// the entry and clears stale nodes so the next attempt re-queries the control plane. pub fn mark_node_no_shards(&mut self, node_id: &NodeId, index_uid: &IndexUid, source_id: &str) { let key = (index_uid.index_id.to_string(), source_id.to_string()); let Some(entry) = self.table.get_mut(&key) else { return; }; - if entry.index_uid != *index_uid { - return; + match entry.index_uid.cmp(index_uid) { + // The entry is stale relative to the signal: advance it, drop stale nodes, and force + // a control-plane re-seed on the next attempt. + Ordering::Less => { + entry.index_uid = index_uid.clone(); + entry.nodes.clear(); + entry.seeded_from_cp = false; + } + // The signal is stale relative to the entry: leave the fresher entry alone. + Ordering::Greater => return, + Ordering::Equal => {} } if let Some(node) = entry.nodes.get_mut(node_id) { node.open_shard_count = 0; @@ -936,11 +945,14 @@ mod tests { 2 ); - // Newer incarnation argument: no-op (don't shadow a fresher entry we don't know yet). + // Newer incarnation argument: advance the entry, drop stale nodes, and force a CP + // re-seed (mirrors apply_capacity_update's Less arm). No node is inserted — the next + // CP query is responsible for repopulating the entry. let newer_index_uid = IndexUid::for_test("test-index", 2); table.mark_node_no_shards(&"node-2".into(), &newer_index_uid, "test-source"); let entry = table.table.get(&key).unwrap(); - assert_eq!(entry.index_uid, index_uid); - assert_eq!(entry.nodes.get("node-2").unwrap().open_shard_count, 2); + assert_eq!(entry.index_uid, newer_index_uid); + assert!(entry.nodes.is_empty()); + assert!(!entry.seeded_from_cp); } }