diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 50dcdeec846..aeb5f92051b 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -156,6 +156,49 @@ pub fn get_bool_from_env(key: &str, default_value: bool) -> bool { } } +/// Reads and parses an environment variable exactly once, caching the result for the lifetime of +/// the process via a per-call-site [`std::sync::LazyLock`]. +/// +/// Prefer this over [`get_from_env`] on paths that may run repeatedly (e.g. constructors that are +/// re-invoked): it avoids re-reading and, more importantly, re-logging the same variable on every +/// call. +/// +/// The type is required because the backing `static` needs a concrete type. The key and default +/// must be `const` expressions or literals — a `static` initializer cannot capture locals. +/// +/// ```no_run +/// # use quickwit_common::get_from_env_cached; +/// let max_concurrency: usize = get_from_env_cached!(usize, "QW_S3_MAX_CONCURRENCY", 10_000, false); +/// ``` +#[macro_export] +macro_rules! get_from_env_cached { + ($ty:ty, $key:expr, $default:expr, $sensitive:expr $(,)?) => {{ + static CACHED: ::std::sync::LazyLock<$ty> = + ::std::sync::LazyLock::new(|| $crate::get_from_env::<$ty>($key, $default, $sensitive)); + // `LazyLock` derefs to `T`; clone so callers receive an owned value, matching + // `get_from_env`'s return type (a no-op copy for the common `Copy` cases). + #[allow(clippy::clone_on_copy)] + let value = (*CACHED).clone(); + value + }}; +} + +/// Boolean counterpart of [`get_from_env_cached`], using the same lenient parsing as +/// [`get_bool_from_env`]. See that macro for the caching semantics and constraints. +/// +/// ```no_run +/// # use quickwit_common::get_bool_from_env_cached; +/// let cors_debug: bool = get_bool_from_env_cached!("QW_ENABLE_CORS_DEBUG", false); +/// ``` +#[macro_export] +macro_rules! get_bool_from_env_cached { + ($key:expr, $default:expr $(,)?) => {{ + static CACHED: ::std::sync::LazyLock = + ::std::sync::LazyLock::new(|| $crate::get_bool_from_env($key, $default)); + *CACHED + }}; +} + pub fn truncate_str(text: &str, max_len: usize) -> &str { if max_len > text.len() { return text; diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index db5eccfc6da..12ea89cff96 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -12,16 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::LazyLock; - pub use prometheus::{exponential_buckets, linear_buckets}; use quickwit_metrics::{Gauge, LazyCounter, LazyGauge, gauge, lazy_counter, lazy_gauge}; pub fn index_label(index_id: &str) -> &str { - static PER_INDEX_METRICS_ENABLED: LazyLock = - LazyLock::new(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false)); + let per_index_metrics_enabled = + !crate::get_bool_from_env_cached!("QW_DISABLE_PER_INDEX_METRICS", false); - if *PER_INDEX_METRICS_ENABLED { + if per_index_metrics_enabled { index_id } else { "__any__" diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index 03e76fbb331..97c2b5ca39b 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -23,7 +23,8 @@ use coarsetime::{Duration, Instant}; pub enum ShouldLog { /// Emit the log normally, within the rate limit. Yes, - /// Emit the log; `N` similar messages were suppressed since the last emission. + /// Emit the log, annotated with a `num_suppressed = N` field recording how many similar + /// messages were suppressed since the last emission of this call site. YesAfterSuppression(u32), /// Suppressed — do not emit. No, @@ -165,8 +166,9 @@ macro_rules! rate_limited_tracing { ::tracing::$log_fn!($($args)*); } $crate::rate_limited_tracing::ShouldLog::YesAfterSuppression(skipped) => { - ::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute"); - ::tracing::$log_fn!($($args)*); + // Attach the count of messages suppressed since this call site last emitted as a + // field on the emitted line, rather than as a separate preceding log line. + ::tracing::$log_fn!(num_suppressed = skipped, $($args)*); } } }}; diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 8f2bfc46a4a..43693ba1f66 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -20,7 +20,6 @@ use std::sync::LazyLock; use anyhow::{Context, bail, ensure}; use json_comments::StripComments; -use quickwit_common::get_bool_from_env; use quickwit_common::net::is_valid_hostname; use quickwit_common::uri::Uri; use quickwit_proto::types::NodeIdRef; @@ -88,16 +87,12 @@ pub use crate::storage_config::{ /// Returns true if the ingest API v2 is enabled. pub fn enable_ingest_v2() -> bool { - static ENABLE_INGEST_V2: LazyLock = - LazyLock::new(|| get_bool_from_env("QW_ENABLE_INGEST_V2", true)); - *ENABLE_INGEST_V2 + quickwit_common::get_bool_from_env_cached!("QW_ENABLE_INGEST_V2", true) } /// Returns true if the ingest API v1 is disabled. pub fn disable_ingest_v1() -> bool { - static DISABLE_INGEST_V1: LazyLock = - LazyLock::new(|| get_bool_from_env("QW_DISABLE_INGEST_V1", false)); - *DISABLE_INGEST_V1 + quickwit_common::get_bool_from_env_cached!("QW_DISABLE_INGEST_V1", false) } #[derive(utoipa::OpenApi)] diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 4a0114d33e0..bfa29b2a222 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -214,7 +214,7 @@ impl IndexerConfig { } #[cfg(not(any(test, feature = "testsuite")))] { - quickwit_common::get_bool_from_env("QW_ENABLE_OTLP_ENDPOINT", true) + quickwit_common::get_bool_from_env_cached!("QW_ENABLE_OTLP_ENDPOINT", true) } } @@ -770,7 +770,7 @@ impl JaegerConfig { } #[cfg(not(any(test, feature = "testsuite")))] { - quickwit_common::get_bool_from_env("QW_ENABLE_JAEGER_ENDPOINT", true) + quickwit_common::get_bool_from_env_cached!("QW_ENABLE_JAEGER_ENDPOINT", true) } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index f67a5462264..097ec4469ca 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -189,15 +189,13 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { } fn get_default_load_per_shard() -> NonZeroU32 { - static DEFAULT_LOAD_PER_SHARD: LazyLock = LazyLock::new(|| { - let default_load_per_shard = quickwit_common::get_from_env( - "QW_DEFAULT_LOAD_PER_SHARD", - PIPELINE_FULL_CAPACITY.cpu_millis() / 4, - false, - ); - NonZeroU32::new(default_load_per_shard).unwrap() - }); - *DEFAULT_LOAD_PER_SHARD + let default_load_per_shard = quickwit_common::get_from_env_cached!( + u32, + "QW_DEFAULT_LOAD_PER_SHARD", + PIPELINE_FULL_CAPACITY.cpu_millis() / 4, + false + ); + NonZeroU32::new(default_load_per_shard).unwrap() } fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 52941167016..6ff733f617a 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -47,7 +47,7 @@ use tantivy::tokenizer::TokenizerManager; use tantivy::{DateTime, IndexBuilder, IndexSettings}; use tokio::runtime::Handle; use tokio::sync::Semaphore; -use tracing::{Span, info, info_span, warn}; +use tracing::{Span, debug, info_span, warn}; use ulid::Ulid; use super::IndexSerializer; @@ -133,7 +133,7 @@ impl IndexerState { index_builder, io_controls, )?; - info!( + debug!( split_id=%indexed_split.split_id(), partition_id=%partition_id, "new-split" @@ -666,7 +666,7 @@ impl Indexer { } let num_splits = splits.len() as u64; let split_ids: String = splits.iter().map(|split| split.split_id()).join(","); - info!( + debug!( index=%self.indexer_state.pipeline_id.index_uid, source=self.indexer_state.pipeline_id.source_id.as_str(), pipeline_uid=%self.indexer_state.pipeline_id.pipeline_uid, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 9d6287245ac..efa987309e8 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -288,7 +288,7 @@ impl IndexingPipeline { let index_id = &self.params.pipeline_id.index_uid.index_id; let source_id = &self.params.pipeline_id.source_id; - info!( + debug!( index_id, source_id, pipeline_uid=%self.params.pipeline_id.pipeline_uid, diff --git a/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs b/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs index f4e6e656f9f..0a84d71b427 100644 --- a/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs +++ b/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs @@ -98,7 +98,16 @@ impl Handler for Publisher { ); return Ok(()); } - info!("publish-new-splits"); + let num_docs: usize = new_splits.iter().map(|split| split.num_docs).sum(); + // `footer_offsets.end` is the on-disk size of the split file in bytes. + let split_size_bytes: u64 = new_splits + .iter() + .map(|split| split.footer_offsets.end) + .sum(); + info!( + num_splits = new_splits.len(), + num_docs, split_size_bytes, "publish-new-splits" + ); suggest_truncate(ctx, &self.source_mailbox_opt, checkpoint_delta_opt).await; if !new_splits.is_empty() { diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 0d532efbf63..0b9766d5a2a 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -23,7 +23,7 @@ use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::types::{DocMappingUid, SplitId}; use tantivy::Inventory; use time::OffsetDateTime; -use tracing::{info, warn}; +use tracing::{debug, warn}; use super::merge_scheduler_service::schedule_merge; use super::{MergeSchedulerService, MergeSplitDownloader}; @@ -331,7 +331,7 @@ impl MergePlanner { // index as well. let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?; for merge_operation in merge_ops { - info!(merge_operation=?merge_operation, "schedule merge operation"); + debug!(merge_operation=?merge_operation, "schedule merge operation"); let tracked_merge_operation = self .ongoing_merge_operations_inventory .track(merge_operation); diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 11613ab84d7..67346a41f9e 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -20,7 +20,7 @@ use quickwit_common::io::IoControls; use quickwit_common::temp_dir::{self, TempDirectory}; use quickwit_metastore::SplitMetadata; use tantivy::Directory; -use tracing::{debug, info, instrument}; +use tracing::{debug, instrument}; use super::MergeExecutor; use crate::merge_policy::MergeTask; @@ -66,7 +66,7 @@ impl Handler for MergeSplitDownloader { .join("merge") .tempdir_in(self.scratch_directory.path()) .map_err(|error| anyhow::anyhow!(error))?; - info!(dir=%merge_scratch_directory.path().display(), "download-merge-splits"); + debug!(dir=%merge_scratch_directory.path().display(), "download-merge-splits"); let downloaded_splits_directory = temp_dir::Builder::default() .join("downloaded-splits") .tempdir_in(merge_scratch_directory.path()) diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index cbf3d70d976..74a4e4fdc65 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -430,7 +430,7 @@ impl IngestSource { }); return Ok(()); } - info!( + debug!( index_uid=%self.client_id.source_uid.index_uid, pipeline_uid=%self.client_id.pipeline_uid, "resetting indexing pipeline" diff --git a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs index e221602bce6..069dfd22ced 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs @@ -14,7 +14,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, LazyLock, Weak}; +use std::sync::{Arc, Weak}; use quickwit_common::rate_limited_error; use quickwit_common::thread_pool::run_cpu_intensive; @@ -90,10 +90,7 @@ pub(super) async fn validate_doc_batch( } fn is_document_validation_enabled() -> bool { - static IS_DOCUMENT_VALIDATION_ENABLED: LazyLock = LazyLock::new(|| { - !quickwit_common::get_bool_from_env("QW_DISABLE_DOCUMENT_VALIDATION", false) - }); - *IS_DOCUMENT_VALIDATION_ENABLED + !quickwit_common::get_bool_from_env_cached!("QW_DISABLE_DOCUMENT_VALIDATION", false) } #[instrument(name = "ingester.validate_doc_batch", skip_all, fields(num_docs = doc_batch.num_docs(), num_bytes = doc_batch.num_bytes()))] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index b1de41d3163..203d1c8f8d3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -16,7 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::path::Path; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; @@ -87,10 +87,12 @@ pub(super) const PERSIST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = const DEFAULT_BATCH_NUM_BYTES: usize = 1024 * 1024; // 1 MiB fn get_batch_num_bytes() -> usize { - static BATCH_NUM_BYTES_CELL: LazyLock = LazyLock::new(|| { - quickwit_common::get_from_env("QW_INGEST_BATCH_NUM_BYTES", DEFAULT_BATCH_NUM_BYTES, false) - }); - *BATCH_NUM_BYTES_CELL + quickwit_common::get_from_env_cached!( + usize, + "QW_INGEST_BATCH_NUM_BYTES", + DEFAULT_BATCH_NUM_BYTES, + false + ) } #[derive(Clone)] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index de080bfcc4e..70399c5cf23 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -23,7 +23,7 @@ use futures::StreamExt; use itertools::Itertools; use quickwit_common::pretty::PrettySample; use quickwit_common::uri::Uri; -use quickwit_common::{ServiceStream, get_bool_from_env, rate_limited_error}; +use quickwit_common::{ServiceStream, get_bool_from_env_cached, rate_limited_error}; use quickwit_config::{ IndexTemplate, IndexTemplateId, PostgresMetastoreConfig, validate_index_id_pattern, }; @@ -122,9 +122,13 @@ impl PostgresqlMetastore { .max_connection_lifetime_opt() .expect("PostgreSQL metastore config should have been validated"); - let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false); - let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); - let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); + // These environment variables are process-global and don't change over a process's + // lifetime, so they're read (and logged) once rather than on every metastore + // construction, which can recur frequently. + let read_only = get_bool_from_env_cached!(QW_POSTGRES_READ_ONLY_ENV_KEY, false); + let skip_migrations = get_bool_from_env_cached!(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); + let skip_locking = + get_bool_from_env_cached!(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); let connection_pool = establish_connection( connection_uri, diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 121e2d7df02..d230c05f476 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -1893,7 +1893,7 @@ pub async fn single_doc_mapping_leaf_search( ) -> Result { let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum(); let num_splits = splits.len(); - info!(num_docs, num_splits, split_offsets = ?PrettySample::new(&splits, 5)); + debug!(num_docs, num_splits, split_offsets = ?PrettySample::new(&splits, 5)); // We simplify the request as much as possible. let split_filter: CanSplitDoBetter = @@ -2080,7 +2080,7 @@ async fn run_local_search_tasks( }); } - info!(split_outcome_counters=%leaf_search_context.split_outcome_counters, "leaf split search finished"); + debug!(split_outcome_counters=%leaf_search_context.split_outcome_counters, "leaf split search finished"); } /// We identify the splits that are in the cache and append them to the incremental merge collector. diff --git a/quickwit/quickwit-search/src/list_fields/mod.rs b/quickwit/quickwit-search/src/list_fields/mod.rs index 9cc81957c95..f53f7995d69 100644 --- a/quickwit/quickwit-search/src/list_fields/mod.rs +++ b/quickwit/quickwit-search/src/list_fields/mod.rs @@ -18,7 +18,6 @@ mod patterns; mod root; use std::cmp::Ordering; -use std::sync::LazyLock; pub use cache::ListFieldsCache; use itertools::Itertools; @@ -37,8 +36,9 @@ pub use crate::list_fields::root::root_list_fields; /// a JSON type with random field names. This leads to huge memory consumption /// when building the response. This is a workaround until a way is found to /// prune the long tail of rare fields. -static FIELD_LIST_SIZE_LIMIT: LazyLock = - LazyLock::new(|| quickwit_common::get_from_env("QW_FIELD_LIST_SIZE_LIMIT", 100_000, false)); +fn field_list_size_limit() -> usize { + quickwit_common::get_from_env_cached!(usize, "QW_FIELD_LIST_SIZE_LIMIT", 100_000, false) +} // Sorts and deduplicates the list of fields. // @@ -83,10 +83,10 @@ fn merge_entries(entry_groups: Vec>) -> crate::Result= *FIELD_LIST_SIZE_LIMIT { + if entries.len() >= field_list_size_limit() { return Err(SearchError::Internal(format!( "list fields response exceeded {} fields", - *FIELD_LIST_SIZE_LIMIT + field_list_size_limit() ))); } current_group.push(entry); diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index de9e816cf19..dc1bd53c120 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -17,7 +17,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt; use std::hash::{Hash, Hasher}; use std::net::SocketAddr; -use std::sync::LazyLock; use std::time::Duration; use anyhow::bail; @@ -25,7 +24,7 @@ use async_trait::async_trait; use futures::future::join_all; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; -use quickwit_common::{SocketAddrLegacyHash, get_bool_from_env}; +use quickwit_common::{SocketAddrLegacyHash, get_bool_from_env_cached}; use quickwit_metrics::counter; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; use tracing::{info, warn}; @@ -104,8 +103,9 @@ impl fmt::Debug for SearchJobPlacer { } } -static LOAD_ESTIMATION_DISABLED: LazyLock = - LazyLock::new(|| get_bool_from_env("QW_DISABLE_LOAD_ESTIMATION", false)); +fn load_estimation_disabled() -> bool { + get_bool_from_env_cached!("QW_DISABLE_LOAD_ESTIMATION", false) +} impl SearchJobPlacer { /// Returns an [`SearchJobPlacer`] from a search service client pool. @@ -211,7 +211,7 @@ impl SearchJobPlacer { }) .collect(); - if load_aware && !*LOAD_ESTIMATION_DISABLED { + if load_aware && !load_estimation_disabled() { // Seed each candidate node with its current load so the placer avoids // routing work to already-loaded nodes. If a node fails to report its // load (error or timeout), `load` stays `None`: we still route work diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 39bfb0e2580..a17b07dbba6 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -577,7 +577,7 @@ fn get_status_with_error(rejection: Rejection) -> Result CorsLayer { - let debug_mode = quickwit_common::get_bool_from_env("QW_ENABLE_CORS_DEBUG", false); + let debug_mode = quickwit_common::get_bool_from_env_cached!("QW_ENABLE_CORS_DEBUG", false); if debug_mode { info!("CORS debug mode is enabled, localhost and 127.0.0.1 origins will be allowed"); return CorsLayer::new()