Skip to content

Commit 5e9a67c

Browse files
committed
Fix leaf cancellation propagation through tokio tasks
1 parent 80c53c2 commit 5e9a67c

File tree

3 files changed

+38
-23
lines changed

3 files changed

+38
-23
lines changed

quickwit/quickwit-search/src/leaf.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::time::{Duration, Instant};
2121

2222
use anyhow::Context;
2323
use bytesize::ByteSize;
24+
use futures::TryFutureExt;
2425
use futures::future::try_join_all;
2526
use quickwit_common::pretty::PrettySample;
2627
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
@@ -1167,6 +1168,7 @@ pub async fn multi_index_leaf_search(
11671168
leaf_search_request: LeafSearchRequest,
11681169
storage_resolver: &StorageResolver,
11691170
is_broad_search: bool,
1171+
timeout_deadline: tokio::time::Instant,
11701172
) -> Result<LeafSearchResponse, SearchError> {
11711173
let search_request: Arc<SearchRequest> = leaf_search_request
11721174
.search_request
@@ -1216,7 +1218,7 @@ pub async fn multi_index_leaf_search(
12161218
let searcher_context = searcher_context.clone();
12171219
let search_request = search_request.clone();
12181220
let aggregation_limits = aggregation_limits.clone();
1219-
async move {
1221+
let instrumented_future = async move {
12201222
let storage = storage_resolver.resolve(&index_uri).await?;
12211223
single_doc_mapping_leaf_search(
12221224
searcher_context,
@@ -1226,21 +1228,18 @@ pub async fn multi_index_leaf_search(
12261228
doc_mapper,
12271229
aggregation_limits,
12281230
is_broad_search,
1231+
timeout_deadline,
12291232
)
12301233
.await
1231-
}
1232-
.in_current_span()
1234+
};
1235+
async move { tokio::time::timeout_at(timeout_deadline, instrumented_future).await? }
1236+
.in_current_span()
12331237
});
12341238
leaf_request_tasks.push(leaf_request_future);
12351239
}
12361240

1237-
let timeout = if is_broad_search {
1238-
searcher_context.searcher_config.secondary_request_timeout()
1239-
} else {
1240-
searcher_context.searcher_config.request_timeout()
1241-
};
12421241
let leaf_responses: Vec<crate::Result<LeafSearchResponse>> =
1243-
tokio::time::timeout(timeout, try_join_all(leaf_request_tasks)).await??;
1242+
try_join_all(leaf_request_tasks).await?;
12441243
let merge_collector = make_merge_collector(&search_request, &aggregation_limits)?;
12451244
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
12461245
for result in leaf_responses {
@@ -1294,6 +1293,7 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) {
12941293
/// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in
12951294
/// charge to consolidate, identify the actual final top hits to display, and
12961295
/// fetch the actual documents to convert the partial hits into actual Hits.
1296+
#[allow(clippy::too_many_arguments)]
12971297
#[instrument(skip_all, fields(index = ?PrettySample::new(&request.index_id_patterns, 5)))]
12981298
pub async fn single_doc_mapping_leaf_search(
12991299
searcher_context: Arc<SearcherContext>,
@@ -1303,6 +1303,7 @@ pub async fn single_doc_mapping_leaf_search(
13031303
doc_mapper: Arc<DocMapper>,
13041304
aggregations_limits: AggregationLimitsGuard,
13051305
is_broad_search: bool,
1306+
timeout_deadline: tokio::time::Instant,
13061307
) -> Result<LeafSearchResponse, SearchError> {
13071308
let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum();
13081309
let num_splits = splits.len();
@@ -1362,18 +1363,22 @@ pub async fn single_doc_mapping_leaf_search(
13621363
leaf_search_single_split_join_handles.push((
13631364
split.split_id.clone(),
13641365
tokio::spawn(
1365-
leaf_search_single_split_wrapper(
1366-
request,
1367-
searcher_context.clone(),
1368-
index_storage.clone(),
1369-
doc_mapper.clone(),
1370-
split,
1371-
split_filter.clone(),
1372-
incremental_merge_collector.clone(),
1373-
leaf_split_search_permit,
1374-
aggregations_limits.clone(),
1375-
is_broad_search,
1366+
tokio::time::timeout_at(
1367+
timeout_deadline,
1368+
leaf_search_single_split_wrapper(
1369+
request,
1370+
searcher_context.clone(),
1371+
index_storage.clone(),
1372+
doc_mapper.clone(),
1373+
split,
1374+
split_filter.clone(),
1375+
incremental_merge_collector.clone(),
1376+
leaf_split_search_permit,
1377+
aggregations_limits.clone(),
1378+
is_broad_search,
1379+
),
13761380
)
1381+
.unwrap_or_else(|_| ())
13771382
.in_current_span(),
13781383
),
13791384
));

quickwit/quickwit-search/src/service.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,20 +196,28 @@ impl SearchService for SearchServiceImpl {
196196
} else {
197197
false
198198
};
199+
let timeout = if is_broad_search {
200+
self.searcher_context
201+
.searcher_config
202+
.secondary_request_timeout()
203+
} else {
204+
self.searcher_context.searcher_config.request_timeout()
205+
};
199206

200-
LeafSearchMetricsFuture {
207+
let tracked_multi_index_leaf_search_future = LeafSearchMetricsFuture {
201208
tracked: multi_index_leaf_search(
202209
self.searcher_context.clone(),
203210
leaf_search_request,
204211
&self.storage_resolver,
205212
is_broad_search,
213+
tokio::time::Instant::now() + timeout,
206214
),
207215
start: Instant::now(),
208216
targeted_splits: num_splits,
209217
status: None,
210218
is_broad_search,
211-
}
212-
.await
219+
};
220+
tokio::time::timeout(timeout, tracked_multi_index_leaf_search_future).await?
213221
}
214222

215223
async fn fetch_docs(

quickwit/quickwit-search/src/tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::{BTreeMap, BTreeSet};
16+
use std::time::Duration;
1617

1718
use assert_json_diff::{assert_json_eq, assert_json_include};
1819
use quickwit_config::SearcherConfig;
@@ -1059,6 +1060,7 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec<u32> {
10591060
test_sandbox.doc_mapper(),
10601061
agg_limits,
10611062
false,
1063+
tokio::time::Instant::now() + Duration::from_secs(60),
10621064
)
10631065
.await
10641066
.unwrap();

0 commit comments

Comments
 (0)