Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,49 @@ pub fn get_bool_from_env(key: &str, default_value: bool) -> bool {
}
}

/// Reads and parses an environment variable exactly once, caching the result for the lifetime of

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, we should have a env.rs file in quickwit-common to gather all of these helpers.

Also not by implementing your macro only for a two calls, you creating a fracture in the way we deal with env variable.

Some stuff use a macro, some not.

/// the process via a per-call-site [`std::sync::LazyLock`].
///
/// Prefer this over [`get_from_env`] on paths that may run repeatedly (e.g. constructors that are
/// re-invoked): it avoids re-reading and, more importantly, re-logging the same variable on every
/// call.
///
/// The type is required because the backing `static` needs a concrete type. The key and default
/// must be `const` expressions or literals — a `static` initializer cannot capture locals.
///
/// ```no_run
/// # use quickwit_common::get_from_env_cached;
/// let max_concurrency: usize = get_from_env_cached!(usize, "QW_S3_MAX_CONCURRENCY", 10_000, false);
/// ```
#[macro_export]
macro_rules! get_from_env_cached {
($ty:ty, $key:expr, $default:expr, $sensitive:expr $(,)?) => {{
static CACHED: ::std::sync::LazyLock<$ty> =
::std::sync::LazyLock::new(|| $crate::get_from_env::<$ty>($key, $default, $sensitive));
// `LazyLock<T>` derefs to `T`; clone so callers receive an owned value, matching
// `get_from_env`'s return type (a no-op copy for the common `Copy` cases).
#[allow(clippy::clone_on_copy)]
let value = (*CACHED).clone();
value
}};
}

/// Boolean counterpart of [`get_from_env_cached`], using the same lenient parsing as
/// [`get_bool_from_env`]. See that macro for the caching semantics and constraints.
///
/// ```no_run
/// # use quickwit_common::get_bool_from_env_cached;
/// let cors_debug: bool = get_bool_from_env_cached!("QW_ENABLE_CORS_DEBUG", false);
/// ```
#[macro_export]
macro_rules! get_bool_from_env_cached {
($key:expr, $default:expr $(,)?) => {{
static CACHED: ::std::sync::LazyLock<bool> =
::std::sync::LazyLock::new(|| $crate::get_bool_from_env($key, $default));
*CACHED
}};
}

pub fn truncate_str(text: &str, max_len: usize) -> &str {
if max_len > text.len() {
return text;
Expand Down
8 changes: 3 additions & 5 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

pub use prometheus::{exponential_buckets, linear_buckets};
use quickwit_metrics::{Gauge, LazyCounter, LazyGauge, gauge, lazy_counter, lazy_gauge};

pub fn index_label(index_id: &str) -> &str {
static PER_INDEX_METRICS_ENABLED: LazyLock<bool> =
LazyLock::new(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false));
let per_index_metrics_enabled =
!crate::get_bool_from_env_cached!("QW_DISABLE_PER_INDEX_METRICS", false);

if *PER_INDEX_METRICS_ENABLED {
if per_index_metrics_enabled {
index_id
} else {
"__any__"
Expand Down
8 changes: 5 additions & 3 deletions quickwit/quickwit-common/src/rate_limited_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use coarsetime::{Duration, Instant};
pub enum ShouldLog {
/// Emit the log normally, within the rate limit.
Yes,
/// Emit the log; `N` similar messages were suppressed since the last emission.
/// Emit the log, annotated with a `num_suppressed = N` field recording how many similar
/// messages were suppressed since the last emission of this call site.
YesAfterSuppression(u32),
/// Suppressed — do not emit.
No,
Expand Down Expand Up @@ -165,8 +166,9 @@ macro_rules! rate_limited_tracing {
::tracing::$log_fn!($($args)*);
}
$crate::rate_limited_tracing::ShouldLog::YesAfterSuppression(skipped) => {
::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute");
::tracing::$log_fn!($($args)*);
// Attach the count of messages suppressed since this call site last emitted as a
// field on the emitted line, rather than as a separate preceding log line.
::tracing::$log_fn!(num_suppressed = skipped, $($args)*);
}
}
}};
Expand Down
9 changes: 2 additions & 7 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::LazyLock;

use anyhow::{Context, bail, ensure};
use json_comments::StripComments;
use quickwit_common::get_bool_from_env;
use quickwit_common::net::is_valid_hostname;
use quickwit_common::uri::Uri;
use quickwit_proto::types::NodeIdRef;
Expand Down Expand Up @@ -88,16 +87,12 @@ pub use crate::storage_config::{

/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: LazyLock<bool> =
LazyLock::new(|| get_bool_from_env("QW_ENABLE_INGEST_V2", true));
*ENABLE_INGEST_V2
quickwit_common::get_bool_from_env_cached!("QW_ENABLE_INGEST_V2", true)
}

/// Returns true if the ingest API v1 is disabled.
pub fn disable_ingest_v1() -> bool {
static DISABLE_INGEST_V1: LazyLock<bool> =
LazyLock::new(|| get_bool_from_env("QW_DISABLE_INGEST_V1", false));
*DISABLE_INGEST_V1
quickwit_common::get_bool_from_env_cached!("QW_DISABLE_INGEST_V1", false)
}

#[derive(utoipa::OpenApi)]
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl IndexerConfig {
}
#[cfg(not(any(test, feature = "testsuite")))]
{
quickwit_common::get_bool_from_env("QW_ENABLE_OTLP_ENDPOINT", true)
quickwit_common::get_bool_from_env_cached!("QW_ENABLE_OTLP_ENDPOINT", true)
}
}

Expand Down Expand Up @@ -770,7 +770,7 @@ impl JaegerConfig {
}
#[cfg(not(any(test, feature = "testsuite")))]
{
quickwit_common::get_bool_from_env("QW_ENABLE_JAEGER_ENDPOINT", true)
quickwit_common::get_bool_from_env_cached!("QW_ENABLE_JAEGER_ENDPOINT", true)
}
}

Expand Down
16 changes: 7 additions & 9 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,13 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
}

fn get_default_load_per_shard() -> NonZeroU32 {
static DEFAULT_LOAD_PER_SHARD: LazyLock<NonZeroU32> = LazyLock::new(|| {
let default_load_per_shard = quickwit_common::get_from_env(
"QW_DEFAULT_LOAD_PER_SHARD",
PIPELINE_FULL_CAPACITY.cpu_millis() / 4,
false,
);
NonZeroU32::new(default_load_per_shard).unwrap()
});
*DEFAULT_LOAD_PER_SHARD
let default_load_per_shard = quickwit_common::get_from_env_cached!(
u32,
"QW_DEFAULT_LOAD_PER_SHARD",
PIPELINE_FULL_CAPACITY.cpu_millis() / 4,
false
);
NonZeroU32::new(default_load_per_shard).unwrap()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I don't see the benefit of a macro here. Are we trying to same one LOC?

Less nitpick: if introducing a macro is something we want to do, it should be a separate PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main benefit is to avoid this issue in the future by adding a central way that includes caching.
An different approach would be to have a central env hashmap in a oncelock, with all env variables in the repo. Which one do you think is better?

I can put it in a different PR

}

fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use tantivy::tokenizer::TokenizerManager;
use tantivy::{DateTime, IndexBuilder, IndexSettings};
use tokio::runtime::Handle;
use tokio::sync::Semaphore;
use tracing::{Span, info, info_span, warn};
use tracing::{Span, debug, info_span, warn};
use ulid::Ulid;

use super::IndexSerializer;
Expand Down Expand Up @@ -133,7 +133,8 @@ impl IndexerState {
index_builder,
io_controls,
)?;
info!(
quickwit_common::rate_limited_info!(
limit_per_min = 1,
split_id=%indexed_split.split_id(),
partition_id=%partition_id,
"new-split"
Expand Down Expand Up @@ -666,7 +667,7 @@ impl Indexer {
}
let num_splits = splits.len() as u64;
let split_ids = splits.iter().map(|split| split.split_id()).join(",");
info!(
debug!(
index=%self.indexer_state.pipeline_id.index_uid,
source=self.indexer_state.pipeline_id.source_id.as_str(),
pipeline_uid=%self.indexer_state.pipeline_id.pipeline_uid,
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl IndexingPipeline {
let index_id = &self.params.pipeline_id.index_uid.index_id;
let source_id = &self.params.pipeline_id.source_id;

info!(
debug!(
index_id,
source_id,
pipeline_uid=%self.params.pipeline_id.pipeline_uid,
Expand Down Expand Up @@ -499,7 +499,8 @@ impl Handler<AssignShards> for IndexingPipeline {
// If the pipeline is running, we forward the message to its source.
// If it is not, it will be respawned soon, and the shards will be assigned afterward.
if let Some(handles) = &self.handles_opt {
info!(
quickwit_common::rate_limited_info!(
limit_per_min = 1,
shard_ids=?assign_shards_message.0.shard_ids,
"assigning shards to indexing pipeline"
);
Expand Down
13 changes: 12 additions & 1 deletion quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,18 @@ impl Handler<SplitsUpdate> for Publisher {
);
return Ok(());
}
info!("publish-new-splits");
let num_docs: usize = new_splits.iter().map(|split| split.num_docs).sum();
// `footer_offsets.end` is the on-disk size of the split file in bytes.
let split_size_bytes: u64 = new_splits
.iter()
.map(|split| split.footer_offsets.end)
.sum();
info!(
num_splits = new_splits.len(),
num_docs,
split_size = %bytesize::ByteSize(split_size_bytes),
"publish-new-splits"
);
suggest_truncate(ctx, &self.source_mailbox_opt, checkpoint_delta_opt).await;

if !new_splits.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::types::DocMappingUid;
use tantivy::Inventory;
use time::OffsetDateTime;
use tracing::{info, warn};
use tracing::{debug, warn};

use super::merge_scheduler_service::schedule_merge;
use super::{MergeSchedulerService, MergeSplitDownloader};
Expand Down Expand Up @@ -331,7 +331,7 @@ impl MergePlanner {
// index as well.
let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?;
for merge_operation in merge_ops {
info!(merge_operation=?merge_operation, "schedule merge operation");
debug!(merge_operation=?merge_operation, "schedule merge operation");
let tracked_merge_operation = self
.ongoing_merge_operations_inventory
.track(merge_operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use quickwit_common::io::IoControls;
use quickwit_common::temp_dir::{self, TempDirectory};
use quickwit_metastore::SplitMetadata;
use tantivy::Directory;
use tracing::{debug, info, instrument};
use tracing::{debug, instrument};

use super::MergeExecutor;
use crate::merge_policy::MergeTask;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl Handler<MergeTask> for MergeSplitDownloader {
.join("merge")
.tempdir_in(self.scratch_directory.path())
.map_err(|error| anyhow::anyhow!(error))?;
info!(dir=%merge_scratch_directory.path().display(), "download-merge-splits");
debug!(dir=%merge_scratch_directory.path().display(), "download-merge-splits");
let downloaded_splits_directory = temp_dir::Builder::default()
.join("downloaded-splits")
.tempdir_in(merge_scratch_directory.path())
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl IngestSource {
});
return Ok(());
}
info!(
debug!(
index_uid=%self.client_id.source_uid.index_uid,
pipeline_uid=%self.client_id.pipeline_uid,
"resetting indexing pipeline"
Expand Down Expand Up @@ -531,7 +531,7 @@ impl Source for IngestSource {
.collect();

assert!(!added_shard_ids.is_empty());
info!(added_shards=?added_shard_ids, "adding shards assignment");
debug!(added_shards=?added_shard_ids, "adding shards assignment");

let acquire_shards_request = AcquireShardsRequest {
index_uid: Some(self.client_id.source_uid.index_uid.clone()),
Expand Down
7 changes: 2 additions & 5 deletions quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock, Weak};
use std::sync::{Arc, Weak};

use quickwit_common::rate_limited_error;
use quickwit_common::thread_pool::run_cpu_intensive;
Expand Down Expand Up @@ -90,10 +90,7 @@ pub(super) async fn validate_doc_batch(
}

fn is_document_validation_enabled() -> bool {
static IS_DOCUMENT_VALIDATION_ENABLED: LazyLock<bool> = LazyLock::new(|| {
!quickwit_common::get_bool_from_env("QW_DISABLE_DOCUMENT_VALIDATION", false)
});
*IS_DOCUMENT_VALIDATION_ENABLED
!quickwit_common::get_bool_from_env_cached!("QW_DISABLE_DOCUMENT_VALIDATION", false)
}

#[instrument(name = "ingester.validate_doc_batch", skip_all, fields(num_docs = doc_batch.num_docs(), num_bytes = doc_batch.num_bytes()))]
Expand Down
12 changes: 7 additions & 5 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt;
use std::path::Path;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
Expand Down Expand Up @@ -87,10 +87,12 @@ pub(super) const PERSIST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature =
const DEFAULT_BATCH_NUM_BYTES: usize = 1024 * 1024; // 1 MiB

fn get_batch_num_bytes() -> usize {
static BATCH_NUM_BYTES_CELL: LazyLock<usize> = LazyLock::new(|| {
quickwit_common::get_from_env("QW_INGEST_BATCH_NUM_BYTES", DEFAULT_BATCH_NUM_BYTES, false)
});
*BATCH_NUM_BYTES_CELL
quickwit_common::get_from_env_cached!(
usize,
"QW_INGEST_BATCH_NUM_BYTES",
DEFAULT_BATCH_NUM_BYTES,
false
)
}

#[derive(Clone)]
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ impl FullyLockedIngesterState<'_> {
}
}
}
info!(
quickwit_common::rate_limited_info!(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rate limited cannot be the right move here!

Generally speaking, rate limiting is rarely a good move for info statement.
If we don't need it (because it is useless or redundant) let's remove it.

Those info statement are good to investigate, when something goes wrong, what chain of events led to the problem.
If you rate limit, we cannot draw any conclusions about the absence of the log statement. We cannot no that indexing did not reach a given step for a given split for instance.

limit_per_min = 1,
"truncated shard `{queue_id}` at {truncate_up_to_position_inclusive} initiated via \
`{initiator}`"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::StreamExt;
use itertools::Itertools;
use quickwit_common::pretty::PrettySample;
use quickwit_common::uri::Uri;
use quickwit_common::{ServiceStream, get_bool_from_env, rate_limited_error};
use quickwit_common::{ServiceStream, get_bool_from_env_cached, rate_limited_error};
use quickwit_config::{
IndexTemplate, IndexTemplateId, PostgresMetastoreConfig, validate_index_id_pattern,
};
Expand Down Expand Up @@ -122,9 +122,13 @@ impl PostgresqlMetastore {
.max_connection_lifetime_opt()
.expect("PostgreSQL metastore config should have been validated");

let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false);
let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false);
let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false);
// These environment variables are process-global and don't change over a process's
// lifetime, so they're read (and logged) once rather than on every metastore
// construction, which can recur frequently.
let read_only = get_bool_from_env_cached!(QW_POSTGRES_READ_ONLY_ENV_KEY, false);
let skip_migrations = get_bool_from_env_cached!(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false);
let skip_locking =
get_bool_from_env_cached!(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false);

let connection_pool = establish_connection(
connection_uri,
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,8 @@ async fn run_offloaded_search_tasks(
Err(err) => {
// Transport-level failure: the Lambda invocation itself failed.
// Mark all splits in this batch as failed.
error!(
quickwit_common::rate_limited_error!(
limit_per_min = 1,
error = %err,
num_splits = batch_split_ids.len(),
"lambda invocation failed for batch"
Expand Down Expand Up @@ -1889,7 +1890,7 @@ pub async fn single_doc_mapping_leaf_search(
) -> Result<LeafSearchResponse, SearchError> {
let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum();
let num_splits = splits.len();
info!(num_docs, num_splits, split_offsets = ?PrettySample::new(&splits, 5));
debug!(num_docs, num_splits, split_offsets = ?PrettySample::new(&splits, 5));

// We simplify the request as much as possible.
let split_filter: CanSplitDoBetter =
Expand Down Expand Up @@ -2076,7 +2077,7 @@ async fn run_local_search_tasks(
});
}

info!(split_outcome_counters=%leaf_search_context.split_outcome_counters, "leaf split search finished");
debug!(split_outcome_counters=%leaf_search_context.split_outcome_counters, "leaf split search finished");
}

/// We identify the splits that are in the cache and append them to the incremental merge collector.
Expand Down
Loading
Loading