Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 225 additions & 16 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use quickwit_proto::ingest::router::{
IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService,
};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, RateLimitingCause};
use quickwit_proto::types::{NodeId, SubrequestId};
use quickwit_proto::types::{IndexUid, NodeId, SourceId, SubrequestId};
use serde_json::{Value as JsonValue, json};
use tokio::sync::{Mutex, Semaphore};
use tokio::time::error::Elapsed;
Expand Down Expand Up @@ -286,6 +286,7 @@ impl IngestRouter {
match persist_result {
Ok(persist_response) => {
let leader_id = NodeId::from_str(&persist_response.leader_id);
let mut no_shards_entries: Vec<(IndexUid, SourceId)> = Vec::new();

for persist_success in persist_response.successes {
workbench.record_persist_success(persist_success);
Expand All @@ -295,9 +296,18 @@ impl IngestRouter {

match persist_failure.reason() {
PersistFailureReason::NoShardsAvailable => {
// For non-critical failures, we don't mark the nodes unavailable;
// a routing update is piggybacked on PersistResponses, so shard
// counts and capacity scores will be fresh on the next try.
// The ingester reported no shard for this (index_uid,
// source_id). The piggybacked routing update only covers
// sources the ingester still holds, so a removed source
// (e.g. after an index was deleted and recreated) would
// leave this node's entry stale and trap retries on a
// dead shard. Mark the node as having zero open shards
// for this entry; when no nodes remain, the next attempt
// re-queries the control plane.
no_shards_entries.push((
persist_failure.index_uid().clone(),
persist_failure.source_id.clone(),
));
}
PersistFailureReason::NodeUnavailable
| PersistFailureReason::WalFull
Expand All @@ -308,22 +318,35 @@ impl IngestRouter {
}
}

if let Some(routing_update) = persist_response.routing_update {
if !no_shards_entries.is_empty() || persist_response.routing_update.is_some() {
// Since we just talked to the node, we take advantage and use the
// opportunity to get a fresh routing update.
// opportunity to get a fresh routing update. Both the zero-out and the
// piggybacked update run under the same lock so the rate-limited
// subcase of NoShardsAvailable — where the shard still exists — is
// immediately restored by the routing update that follows (the ingester
// only returns routing_update=None on the NodeUnavailable fast path).
let mut state_guard = self.state.lock().await;
for shard_update in routing_update.source_shard_updates {
state_guard.routing_table.apply_capacity_update(
leader_id.clone(),
shard_update.index_uid().clone(),
shard_update.source_id,
routing_update.capacity_score as usize,
shard_update.open_shard_count as usize,
);

for (index_uid, source_id) in &no_shards_entries {
state_guard
.routing_table
.mark_node_no_shards(&leader_id, index_uid, source_id);
}
drop(state_guard);

workbench.closed_shards.extend(routing_update.closed_shards);
if let Some(routing_update) = persist_response.routing_update {
for shard_update in routing_update.source_shard_updates {
state_guard.routing_table.apply_capacity_update(
leader_id.clone(),
shard_update.index_uid().clone(),
shard_update.source_id,
routing_update.capacity_score as usize,
shard_update.open_shard_count as usize,
);
}
drop(state_guard);

workbench.closed_shards.extend(routing_update.closed_shards);
}
}
}
Err(persist_error) => {
Expand Down Expand Up @@ -1860,4 +1883,190 @@ mod tests {
.unwrap();
assert_eq!(node.node_id, NodeId::from_str("test-ingester-0"));
}

#[tokio::test]
async fn test_no_shards_available_clears_stale_routing_entry() {
// Regression test for https://github.com/quickwit-oss/quickwit/issues/6324.
//
// When an index is deleted and recreated, the ingester closes its old shards and
// stops advertising them. A persist request issued against the stale routing
// entry fails with NoShardsAvailable, and the piggybacked routing update no
// longer covers the removed source — so before the fix the router kept picking
// the dead entry forever and ingests returned 503 until Chitchat caught up.
let ingester_pool = IngesterPool::default();
let router = IngestRouter::new(
"test-router".into(),
ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()),
ingester_pool.clone(),
1,
EventBroker::default(),
Some("test-az".to_string()),
);
let stale_index_uid = IndexUid::for_test("test-index", 0);
{
let mut state_guard = router.state.lock().await;
state_guard.routing_table.merge_from_shards(
stale_index_uid.clone(),
"test-source".to_string(),
vec![Shard {
index_uid: Some(stale_index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester-0".to_string(),
..Default::default()
}],
);
}
ingester_pool.insert(
"test-ingester-0".into(),
IngesterPoolEntry::mocked_ingester(),
);

// Sanity-check: the stale entry is currently considered valid for routing.
{
let state_guard = router.state.lock().await;
let mut unavailable_leaders: HashSet<NodeId> = HashSet::new();
assert!(state_guard.routing_table.has_any_routing_candidate(
"test-index",
"test-source",
&ingester_pool,
&mut unavailable_leaders,
));
}

let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: "test-source".to_string(),
..Default::default()
}];
let mut workbench = IngestWorkbench::new(ingest_subrequests, 2);

let persist_futures = FuturesUnordered::new();
let stale_index_uid_clone = stale_index_uid.clone();
persist_futures.push(async move {
let summary = PersistRequestSummary {
leader_id: "test-ingester-0".into(),
subrequest_ids: vec![0],
};
let result = Ok::<_, IngestV2Error>(PersistResponse {
leader_id: "test-ingester-0".to_string(),
successes: Vec::new(),
failures: vec![PersistFailure {
subrequest_id: 0,
index_uid: Some(stale_index_uid_clone),
source_id: "test-source".to_string(),
reason: PersistFailureReason::NoShardsAvailable as i32,
}],
// The ingester has no shard for this source anymore, so the piggybacked
// routing update omits it entirely.
routing_update: Some(RoutingUpdate {
capacity_score: 6,
source_shard_updates: Vec::new(),
..Default::default()
}),
});
(summary, result)
});
router
.process_persist_results(&mut workbench, persist_futures)
.await;

// The stale routing entry must no longer look routable — otherwise retries would
// keep hammering the dead shard and surface as a 503.
let state_guard = router.state.lock().await;
let mut unavailable_leaders: HashSet<NodeId> = HashSet::new();
assert!(!state_guard.routing_table.has_any_routing_candidate(
"test-index",
"test-source",
&ingester_pool,
&mut unavailable_leaders,
));
}

#[tokio::test]
async fn test_no_shards_available_preserves_entry_when_routing_update_refreshes_it() {
// When a shard is merely rate-limited the ingester still returns NoShardsAvailable
// but includes a fresh routing update saying the shard is open. The update must
// take precedence so the node remains routable for the next retry.
let ingester_pool = IngesterPool::default();
let router = IngestRouter::new(
"test-router".into(),
ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()),
ingester_pool.clone(),
1,
EventBroker::default(),
Some("test-az".to_string()),
);
let index_uid = IndexUid::for_test("test-index", 0);
{
let mut state_guard = router.state.lock().await;
state_guard.routing_table.merge_from_shards(
index_uid.clone(),
"test-source".to_string(),
vec![Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester-0".to_string(),
..Default::default()
}],
);
}
ingester_pool.insert(
"test-ingester-0".into(),
IngesterPoolEntry::mocked_ingester(),
);

let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: "test-source".to_string(),
..Default::default()
}];
let mut workbench = IngestWorkbench::new(ingest_subrequests, 2);

let persist_futures = FuturesUnordered::new();
let index_uid_clone = index_uid.clone();
persist_futures.push(async move {
let summary = PersistRequestSummary {
leader_id: "test-ingester-0".into(),
subrequest_ids: vec![0],
};
let result = Ok::<_, IngestV2Error>(PersistResponse {
leader_id: "test-ingester-0".to_string(),
successes: Vec::new(),
failures: vec![PersistFailure {
subrequest_id: 0,
index_uid: Some(index_uid_clone.clone()),
source_id: "test-source".to_string(),
reason: PersistFailureReason::NoShardsAvailable as i32,
}],
routing_update: Some(RoutingUpdate {
capacity_score: 6,
source_shard_updates: vec![SourceShardUpdate {
index_uid: Some(index_uid_clone),
source_id: "test-source".to_string(),
open_shard_count: 1,
}],
..Default::default()
}),
});
(summary, result)
});
router
.process_persist_results(&mut workbench, persist_futures)
.await;

let state_guard = router.state.lock().await;
let mut unavailable_leaders: HashSet<NodeId> = HashSet::new();
assert!(state_guard.routing_table.has_any_routing_candidate(
"test-index",
"test-source",
&ingester_pool,
&mut unavailable_leaders,
));
}
}
99 changes: 99 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,36 @@ impl RoutingTable {
entry.nodes.insert(node_id, ingester_node);
}

/// Zeros out the open shard count for `node_id` on the (index, source) entry while preserving
/// its capacity score. Called when a persist response reports that the ingester no longer
/// holds a shard for this (index_uid, source_id), so the entry stops being picked until a
/// fresh routing update or control-plane response repopulates it.
///
/// Mirrors the incarnation handling of [`Self::apply_capacity_update`]: a stale signal
/// (entry newer than `index_uid`) is ignored, and a signal for a newer incarnation advances
/// the entry and clears stale nodes so the next attempt re-queries the control plane.
pub fn mark_node_no_shards(&mut self, node_id: &NodeId, index_uid: &IndexUid, source_id: &str) {
let key = (index_uid.index_id.to_string(), source_id.to_string());
let Some(entry) = self.table.get_mut(&key) else {
return;
};
match entry.index_uid.cmp(index_uid) {
// The entry is stale relative to the signal: advance it, drop stale nodes, and force
// a control-plane re-seed on the next attempt.
Ordering::Less => {
entry.index_uid = index_uid.clone();
entry.nodes.clear();
entry.seeded_from_cp = false;
}
// The signal is stale relative to the entry: leave the fresher entry alone.
Ordering::Greater => return,
Ordering::Equal => {}
}
if let Some(node) = entry.nodes.get_mut(node_id) {
node.open_shard_count = 0;
}
}

/// Merges routing updates from a GetOrCreateOpenShards control plane response into the
/// table. For existing nodes, updates their open shard count, including if the count is 0, from
/// the CP response while preserving capacity scores if they already exist.
Expand Down Expand Up @@ -856,4 +886,73 @@ mod tests {
assert!(!entry.nodes.contains_key("node-3"));
assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 2));
}

#[test]
fn test_mark_node_no_shards() {
let mut table = RoutingTable::default();
let index_uid = IndexUid::for_test("test-index", 1);
let key = ("test-index".to_string(), "test-source".to_string());

// Missing entry: no-op, no panic, nothing inserted.
table.mark_node_no_shards(&"node-1".into(), &index_uid, "test-source");
assert!(table.table.get(&key).is_none());

// Seed an entry with two nodes carrying real capacity scores.
table.apply_capacity_update(
"node-1".into(),
index_uid.clone(),
"test-source".into(),
8,
3,
);
table.apply_capacity_update(
"node-2".into(),
index_uid.clone(),
"test-source".into(),
6,
2,
);

// Missing node within the entry: no-op.
table.mark_node_no_shards(&"unknown".into(), &index_uid, "test-source");
let entry = table.table.get(&key).unwrap();
assert_eq!(entry.nodes.get("node-1").unwrap().open_shard_count, 3);
assert_eq!(entry.nodes.get("node-2").unwrap().open_shard_count, 2);

// Matching incarnation: zero only the open shard count, capacity score is preserved.
table.mark_node_no_shards(&"node-1".into(), &index_uid, "test-source");
let entry = table.table.get(&key).unwrap();
let node_1 = entry.nodes.get("node-1").unwrap();
assert_eq!(node_1.open_shard_count, 0);
assert_eq!(node_1.capacity_score, 8);
// Sibling node untouched.
let node_2 = entry.nodes.get("node-2").unwrap();
assert_eq!(node_2.open_shard_count, 2);
assert_eq!(node_2.capacity_score, 6);

// Older incarnation argument: no-op (must not roll the entry back).
let stale_index_uid = IndexUid::for_test("test-index", 0);
table.mark_node_no_shards(&"node-2".into(), &stale_index_uid, "test-source");
assert_eq!(
table
.table
.get(&key)
.unwrap()
.nodes
.get("node-2")
.unwrap()
.open_shard_count,
2
);

// Newer incarnation argument: advance the entry, drop stale nodes, and force a CP
// re-seed (mirrors apply_capacity_update's Less arm). No node is inserted — the next
// CP query is responsible for repopulating the entry.
let newer_index_uid = IndexUid::for_test("test-index", 2);
table.mark_node_no_shards(&"node-2".into(), &newer_index_uid, "test-source");
let entry = table.table.get(&key).unwrap();
assert_eq!(entry.index_uid, newer_index_uid);
assert!(entry.nodes.is_empty());
assert!(!entry.seeded_from_cp);
}
}