From 9fd743a2fb2e2c6aaf991bcca620cf942ad0aea9 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 14 Nov 2025 09:29:49 +0100 Subject: [PATCH 01/39] ref(processing): Use new transactions processor --- relay-server/src/processing/common.rs | 2 + relay-server/src/processing/mod.rs | 5 +- .../src/processing/transactions/mod.rs | 16 + relay-server/src/services/processor.rs | 300 ++---------------- .../services/processor/dynamic_sampling.rs | 68 +--- relay-server/src/services/processor/event.rs | 30 +- .../src/services/processor/profile.rs | 114 +------ relay-server/src/services/processor/report.rs | 23 +- 8 files changed, 64 insertions(+), 494 deletions(-) diff --git a/relay-server/src/processing/common.rs b/relay-server/src/processing/common.rs index 868346a39b0..28d9d69e442 100644 --- a/relay-server/src/processing/common.rs +++ b/relay-server/src/processing/common.rs @@ -6,6 +6,7 @@ use crate::processing::logs::LogsProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; use crate::processing::trace_metrics::TraceMetricsProcessor; +use crate::processing::transactions::TransactionProcessor; use crate::processing::{Forward, Processor}; macro_rules! outputs { @@ -57,4 +58,5 @@ outputs!( TraceMetrics => TraceMetricsProcessor, Spans => SpansProcessor, Sessions => SessionsProcessor, + Transactions => TransactionProcessor, ); diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index d42569a6c02..d194986454b 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -78,7 +78,10 @@ pub struct Context<'a> { /// /// The caller needs to ensure the rate limits are not yet expired. pub rate_limits: &'a RateLimits, - /// Reservoir counters for "get more samples" functionality. + + /// Counters used for getting more samples for a project on-demand. + /// + /// Reservoir counters are a legacy feature and will be removed in the near future. pub reservoir_counters: &'a ReservoirCounters, } diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index cb53b21c401..04f37cf0e30 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -97,6 +97,22 @@ pub struct TransactionProcessor { quotas_client: Option, } +impl TransactionProcessor { + /// Creates a new transaction processor. + pub fn new( + limiter: Arc, + geoip_lookup: GeoIpLookup, + #[cfg(feature = "processing")] quotas_client: Option, + ) -> Self { + Self { + limiter, + geoip_lookup, + #[cfg(feature = "processing")] + quotas_client, + } + } +} + impl Processor for TransactionProcessor { type UnitOfWork = SerializedTransaction; type Output = TransactionOutput; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 595f479c50a..eb6927ca86c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -20,7 +20,7 @@ use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token}; use relay_common::time::UnixTimestamp; use relay_config::{Config, HttpEncoding, RelayMode}; -use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig}; +use relay_dynamic_config::{Feature, GlobalConfig}; use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup}; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::{ @@ -31,7 +31,7 @@ use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNames use relay_pii::PiiConfigError; use relay_protocol::Annotated; use relay_quotas::{DataCategory, Quota, RateLimits, Scoping}; -use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; +use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision}; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; @@ -52,7 +52,7 @@ use crate::processing::logs::LogsProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; use crate::processing::trace_metrics::TraceMetricsProcessor; -use crate::processing::transactions::extraction::ExtractMetricsContext; +use crate::processing::transactions::TransactionProcessor; use crate::processing::utils::event::{ EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category, event_type, @@ -68,7 +68,7 @@ use crate::services::upstream::{ SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError, }; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; -use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult}; +use crate::utils::{self, CheckLimits, EnvelopeLimiter}; use crate::{http, processing}; use relay_threading::AsyncPool; #[cfg(feature = "processing")] @@ -84,7 +84,7 @@ use { }, relay_dynamic_config::CardinalityLimiterMode, relay_quotas::{RateLimitingError, RedisRateLimiter}, - relay_redis::{AsyncRedisClient, RedisClients}, + relay_redis::RedisClients, std::time::Instant, symbolic_unreal::{Unreal4Error, Unreal4ErrorKind}, }; @@ -1130,8 +1130,6 @@ struct InnerProcessor { global_config: GlobalConfigHandle, project_cache: ProjectCacheHandle, cogs: Cogs, - #[cfg(feature = "processing")] - quotas_client: Option, addrs: Addrs, #[cfg(feature = "processing")] rate_limiter: Option>>, @@ -1148,6 +1146,7 @@ struct Processing { spans: SpansProcessor, check_ins: CheckInsProcessor, sessions: SessionsProcessor, + transactions: TransactionProcessor, } impl EnvelopeProcessorService { @@ -1212,8 +1211,6 @@ impl EnvelopeProcessorService { project_cache, cogs, #[cfg(feature = "processing")] - quotas_client: quotas.clone(), - #[cfg(feature = "processing")] rate_limiter, addrs, #[cfg(feature = "processing")] @@ -1234,7 +1231,12 @@ impl EnvelopeProcessorService { trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)), spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), - sessions: SessionsProcessor::new(quota_limiter), + sessions: SessionsProcessor::new(Arc::clone("a_limiter)), + transactions: TransactionProcessor::new( + Arc::clone("a_limiter), + geoip_lookup.clone(), + quotas.clone(), + ), }, geoip_lookup, config, @@ -1308,13 +1310,12 @@ impl EnvelopeProcessorService { nnswitch::expand(managed_envelope)?; }); - let extraction_result = event::extract( + let mut event = event::extract( managed_envelope, &mut metrics, event_fully_normalized, &self.inner.config, )?; - let mut event = extraction_result.event; if_processing!(self.inner.config, { if let Some(inner_event_fully_normalized) = @@ -1411,260 +1412,6 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } - /// Processes only transactions and transaction-related items. - #[allow(unused_assignments)] - async fn process_transactions( - &self, - managed_envelope: &mut TypedEnvelope, - cogs: &mut Token, - project_id: ProjectId, - mut ctx: processing::Context<'_>, - ) -> Result, ProcessingError> { - let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope()); - let mut event_metrics_extracted = EventMetricsExtracted(false); - let mut spans_extracted = SpansExtracted(false); - let mut metrics = Metrics::default(); - let mut extracted_metrics = ProcessingExtractedMetrics::new(); - - // We extract the main event from the envelope. - let extraction_result = event::extract( - managed_envelope, - &mut metrics, - event_fully_normalized, - &self.inner.config, - )?; - - // If metrics were extracted we mark that. - if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted { - event_metrics_extracted = inner_event_metrics_extracted; - } - if let Some(inner_spans_extracted) = extraction_result.spans_extracted { - spans_extracted = inner_spans_extracted; - }; - - // We take the main event out of the result. - let mut event = extraction_result.event; - - let profile_id = profile::filter( - managed_envelope, - &event, - ctx.config, - project_id, - ctx.project_info, - ); - processing::transactions::profile::transfer_id(&mut event, profile_id); - processing::transactions::profile::remove_context_if_rate_limited( - &mut event, - managed_envelope.scoping(), - ctx, - ); - - ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc( - managed_envelope, - &mut event, - ctx.project_info, - ctx.sampling_project_info, - ); - - let attachments = managed_envelope - .envelope() - .items() - .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment)); - processing::utils::event::finalize( - managed_envelope.envelope().headers(), - &mut event, - attachments, - &mut metrics, - &self.inner.config, - )?; - - event_fully_normalized = processing::utils::event::normalize( - managed_envelope.envelope().headers(), - &mut event, - event_fully_normalized, - project_id, - ctx, - &self.inner.geoip_lookup, - )?; - - let filter_run = - processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx) - .map_err(|err| { - managed_envelope.reject(Outcome::Filtered(err.clone())); - ProcessingError::EventFiltered(err) - })?; - - // Always run dynamic sampling on processing Relays, - // but delay decision until inbound filters have been fully processed. - // Also, we require transaction metrics to be enabled before sampling. - let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok) - || self.inner.config.processing_enabled()) - && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()); - - let sampling_result = match run_dynamic_sampling { - true => { - #[allow(unused_mut)] - let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters)); - #[cfg(feature = "processing")] - if let Some(quotas_client) = self.inner.quotas_client.as_ref() { - reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client); - } - processing::utils::dynamic_sampling::run( - managed_envelope.envelope().headers().dsc(), - &event, - &ctx, - Some(&reservoir), - ) - .await - } - false => SamplingResult::Pending, - }; - - relay_statsd::metric!( - counter(RelayCounters::SamplingDecision) += 1, - decision = sampling_result.decision().as_str(), - item = "transaction" - ); - - #[cfg(feature = "processing")] - let server_sample_rate = sampling_result.sample_rate(); - - if let Some(outcome) = sampling_result.into_dropped_outcome() { - // Process profiles before dropping the transaction, if necessary. - // Before metric extraction to make sure the profile count is reflected correctly. - profile::process( - managed_envelope, - &mut event, - ctx.global_config, - ctx.config, - ctx.project_info, - ); - // Extract metrics here, we're about to drop the event/transaction. - event_metrics_extracted = processing::transactions::extraction::extract_metrics( - &mut event, - &mut extracted_metrics, - ExtractMetricsContext { - dsc: managed_envelope.envelope().dsc(), - project_id, - ctx, - sampling_decision: SamplingDecision::Drop, - metrics_extracted: event_metrics_extracted.0, - spans_extracted: spans_extracted.0, - }, - )?; - - dynamic_sampling::drop_unsampled_items( - managed_envelope, - event, - outcome, - spans_extracted, - ); - - // At this point we have: - // - An empty envelope. - // - An envelope containing only processed profiles. - // We need to make sure there are enough quotas for these profiles. - event = self - .enforce_quotas( - managed_envelope, - Annotated::empty(), - &mut extracted_metrics, - ctx, - ) - .await?; - - return Ok(Some(extracted_metrics)); - } - - let _post_ds = cogs.start_category("post_ds"); - - // Need to scrub the transaction before extracting spans. - // - // Unconditionally scrub to make sure PII is removed as early as possible. - processing::utils::event::scrub(&mut event, ctx.project_info)?; - - let attachments = managed_envelope - .envelope_mut() - .items_mut() - .filter(|i| i.ty() == &ItemType::Attachment); - processing::utils::attachments::scrub(attachments, ctx.project_info); - - if_processing!(self.inner.config, { - // Process profiles before extracting metrics, to make sure they are removed if they are invalid. - let profile_id = profile::process( - managed_envelope, - &mut event, - ctx.global_config, - ctx.config, - ctx.project_info, - ); - processing::transactions::profile::transfer_id(&mut event, profile_id); - processing::transactions::profile::scrub_profiler_id(&mut event); - - // Always extract metrics in processing Relays for sampled items. - event_metrics_extracted = processing::transactions::extraction::extract_metrics( - &mut event, - &mut extracted_metrics, - ExtractMetricsContext { - dsc: managed_envelope.envelope().dsc(), - project_id, - ctx, - sampling_decision: SamplingDecision::Keep, - metrics_extracted: event_metrics_extracted.0, - spans_extracted: spans_extracted.0, - }, - )?; - - if let Some(spans) = processing::transactions::spans::extract_from_event( - managed_envelope.envelope().dsc(), - &event, - ctx.global_config, - ctx.config, - server_sample_rate, - event_metrics_extracted, - spans_extracted, - ) { - spans_extracted = SpansExtracted(true); - for item in spans { - match item { - Ok(item) => managed_envelope.envelope_mut().add_item(item), - Err(()) => managed_envelope.track_outcome( - Outcome::Invalid(DiscardReason::InvalidSpan), - DataCategory::SpanIndexed, - 1, - ), - // TODO: also `DataCategory::Span`? - } - } - } - }); - - event = self - .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx) - .await?; - - // Event may have been dropped because of a quota and the envelope can be empty. - if event.value().is_some() { - event::serialize( - managed_envelope, - &mut event, - event_fully_normalized, - event_metrics_extracted, - spans_extracted, - )?; - } - - if self.inner.config.processing_enabled() && !event_fully_normalized.0 { - relay_log::error!( - tags.project = %project_id, - tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()), - "ingested event without normalizing" - ); - }; - - Ok(Some(extracted_metrics)) - } - async fn process_profile_chunks( &self, managed_envelope: &mut TypedEnvelope, @@ -1869,7 +1616,6 @@ impl EnvelopeProcessorService { async fn process_envelope( &self, - cogs: &mut Token, project_id: ProjectId, message: ProcessEnvelopeGrouped<'_>, ) -> Result { @@ -1938,7 +1684,12 @@ impl EnvelopeProcessorService { match group { ProcessingGroup::Error => run!(process_errors, project_id, ctx), ProcessingGroup::Transaction => { - run!(process_transactions, cogs, project_id, ctx) + self.process_with_processor( + &self.inner.processing.transactions, + managed_envelope, + ctx, + ) + .await } ProcessingGroup::Session => { self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx) @@ -2018,7 +1769,6 @@ impl EnvelopeProcessorService { /// to be dropped, this is `None`. async fn process<'a>( &self, - cogs: &mut Token, mut message: ProcessEnvelopeGrouped<'a>, ) -> Result>, ProcessingError> { let ProcessEnvelopeGrouped { @@ -2065,7 +1815,7 @@ impl EnvelopeProcessorService { } }); - let result = match self.process_envelope(cogs, project_id, message).await { + let result = match self.process_envelope(project_id, message).await { Ok(ProcessingResult::Envelope { mut managed_envelope, extracted_metrics, @@ -2167,7 +1917,7 @@ impl EnvelopeProcessorService { let result = metric!( timer(RelayTimers::EnvelopeProcessingTime), group = group.variant(), - { self.process(&mut cogs, message).await } + { self.process(message).await } ); match result { @@ -3648,9 +3398,7 @@ mod tests { }, }; - let Ok(Some(Submit::Envelope(mut new_envelope))) = - processor.process(&mut Token::noop(), message).await - else { + let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else { panic!(); }; let new_envelope = new_envelope.envelope_mut(); @@ -3733,9 +3481,7 @@ mod tests { }; let processor = create_test_processor(Config::from_json_value(config).unwrap()).await; - let Ok(Some(Submit::Envelope(envelope))) = - processor.process(&mut Token::noop(), message).await - else { + let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else { panic!(); }; let event = envelope diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index ae2adc5fe87..1c39e74e465 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -4,14 +4,9 @@ use relay_config::Config; use relay_dynamic_config::ErrorBoundary; use relay_event_schema::protocol::{Contexts, Event, TraceContext}; use relay_protocol::{Annotated, Empty}; -use relay_quotas::DataCategory; -use crate::envelope::ItemType; use crate::managed::TypedEnvelope; -use crate::services::outcome::Outcome; -use crate::services::processor::{ - EventProcessing, SpansExtracted, TransactionGroup, event_category, -}; +use crate::services::processor::EventProcessing; use crate::services::projects::project::ProjectInfo; use crate::utils::{self}; @@ -71,60 +66,6 @@ pub fn validate_and_set_dsc<'a, T>( None } -/// Apply the dynamic sampling decision from `compute_sampling_decision`. -pub fn drop_unsampled_items( - managed_envelope: &mut TypedEnvelope, - event: Annotated, - outcome: Outcome, - spans_extracted: SpansExtracted, -) { - // Remove all items from the envelope which need to be dropped due to dynamic sampling. - let dropped_items = managed_envelope - .envelope_mut() - // Profiles are not dropped by dynamic sampling, they are all forwarded to storage and - // later processed in Sentry and potentially dropped there. - .take_items_by(|item| *item.ty() != ItemType::Profile); - - for item in dropped_items { - for (category, quantity) in item.quantities() { - // Dynamic sampling only drops indexed items. - // - // Only emit the base category, if the item does not have an indexed category. - if category.index_category().is_none() { - managed_envelope.track_outcome(outcome.clone(), category, quantity); - } - } - } - - // Mark all remaining items in the envelope as un-sampled. - for item in managed_envelope.envelope_mut().items_mut() { - item.set_sampled(false); - } - - // Another 'hack' to emit outcomes from the container item for the contained items (spans). - // - // The entire tracking outcomes for contained elements is not handled in a systematic way - // and whenever an event/transaction is discarded, contained elements are tracked in a 'best - // effort' basis (basically in all the cases where someone figured out this is a problem). - // - // This is yet another case, when the spans have not yet been separated from the transaction - // also emit dynamic sampling outcomes for the contained spans. - if !spans_extracted.0 { - let spans = event.value().and_then(|e| e.spans.value()); - let span_count = spans.map_or(0, |s| s.len()); - - // Track the amount of contained spans + 1 segment span (the transaction itself which would - // be converted to a span). - managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1); - } - - // All items have been dropped, now make sure the event is also handled and dropped. - if let Some(category) = event_category(&event) { - let category = category.index_category().unwrap_or(category); - managed_envelope.track_outcome(outcome, category, 1) - } -} - /// Runs dynamic sampling on an incoming error and tags it in case of successful sampling /// decision. /// @@ -178,14 +119,13 @@ mod tests { use std::collections::BTreeMap; use relay_base_schema::project::ProjectKey; - use relay_cogs::Token; use relay_event_schema::protocol::EventId; use relay_protocol::RuleCondition; use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue}; use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use relay_system::Addr; - use crate::envelope::{ContentType, Envelope, Item}; + use crate::envelope::{ContentType, Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; use crate::processing; @@ -216,9 +156,7 @@ mod tests { }, }; - let Ok(Some(Submit::Envelope(envelope))) = - processor.process(&mut Token::noop(), message).await - else { + let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else { panic!(); }; diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index f3b0b5a7e2b..2d5109dd365 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -22,14 +22,6 @@ use crate::services::processor::{ use crate::statsd::{RelayCounters, RelayTimers}; use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter}; -/// Result of the extraction of the primary event payload from an envelope. -#[derive(Debug)] -pub struct ExtractionResult { - pub event: Annotated, - pub event_metrics_extracted: Option, - pub spans_extracted: Option, -} - /// Extracts the primary event payload from an envelope. /// /// The event is obtained from only one source in the following precedence: @@ -43,14 +35,13 @@ pub fn extract( metrics: &mut Metrics, event_fully_normalized: EventFullyNormalized, config: &Config, -) -> Result { +) -> Result, ProcessingError> { let envelope = managed_envelope.envelope_mut(); // Remove all items first, and then process them. After this function returns, only // attachments can remain in the envelope. The event will be added again at the end of // `process_event`. let event_item = envelope.take_item_by(|item| item.ty() == &ItemType::Event); - let transaction_item = envelope.take_item_by(|item| item.ty() == &ItemType::Transaction); let security_item = envelope.take_item_by(|item| item.ty() == &ItemType::Security); let raw_security_item = envelope.take_item_by(|item| item.ty() == &ItemType::RawSecurity); let user_report_v2_item = envelope.take_item_by(|item| item.ty() == &ItemType::UserReportV2); @@ -71,8 +62,6 @@ pub fn extract( let skip_normalization = config.processing_enabled() && event_fully_normalized.0; - let mut event_metrics_extracted = None; - let mut spans_extracted = None; let (event, event_len) = if let Some(item) = event_item.or(security_item) { relay_log::trace!("processing json event"); metric!(timer(RelayTimers::EventProcessingDeserialize), { @@ -86,17 +75,6 @@ pub fn extract( } (annotated_event, len) }) - } else if let Some(item) = transaction_item { - relay_log::trace!("processing json transaction"); - - event_metrics_extracted = Some(EventMetricsExtracted(item.metrics_extracted())); - spans_extracted = Some(SpansExtracted(item.spans_extracted())); - - metric!(timer(RelayTimers::EventProcessingDeserialize), { - // Transaction items can only contain transaction events. Force the event type to - // hint to normalization that we're dealing with a transaction now. - event_from_json_payload(item, Some(EventType::Transaction))? - }) } else if let Some(item) = user_report_v2_item { relay_log::trace!("processing user_report_v2"); event_from_json_payload(item, Some(EventType::UserReportV2))? @@ -130,11 +108,7 @@ pub fn extract( metrics.bytes_ingested_event = Annotated::new(event_len as u64); - Ok(ExtractionResult { - event, - event_metrics_extracted, - spans_extracted, - }) + Ok(event) } pub fn serialize( diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 67ffefca548..2455440e584 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -1,24 +1,20 @@ //! Profiles related processor code. -use relay_dynamic_config::{Feature, GlobalConfig}; -use std::net::IpAddr; +use relay_dynamic_config::Feature; use relay_base_schema::events::EventType; use relay_base_schema::project::ProjectId; use relay_config::Config; use relay_event_schema::protocol::Event; -use relay_filter::ProjectFiltersConfig; use relay_profiling::{ProfileError, ProfileId}; use relay_protocol::Annotated; -use crate::envelope::{ContentType, Item, ItemType}; +use crate::envelope::ItemType; use crate::managed::{ItemAction, TypedEnvelope}; +use crate::processing::utils::event::event_type; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{TransactionGroup, event_type, should_filter}; +use crate::services::processor::should_filter; use crate::services::projects::project::ProjectInfo; -/// Filters out invalid and duplicate profiles. -/// -/// Returns the profile id of the single remaining profile, if there is one. pub fn filter( managed_envelope: &mut TypedEnvelope, event: &Annotated, @@ -63,92 +59,10 @@ pub fn filter( profile_id } -/// Processes profiles and set the profile ID in the profile context on the transaction if successful. -pub fn process( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - global_config: &GlobalConfig, - config: &Config, - project_info: &ProjectInfo, -) -> Option { - let client_ip = managed_envelope.envelope().meta().client_addr(); - let filter_settings = &project_info.config.filter_settings; - - let profiling_enabled = project_info.has_feature(Feature::Profiling); - let mut profile_id = None; - - managed_envelope.retain_items(|item| match item.ty() { - ItemType::Profile => { - if !profiling_enabled { - return ItemAction::DropSilently; - } - - // There should always be an event/transaction available at this stage. - // It is required to expand the profile. If it's missing, drop the item. - let Some(event) = event.value() else { - return ItemAction::DropSilently; - }; - - match expand_profile( - item, - event, - config, - client_ip, - filter_settings, - global_config, - ) { - Ok(id) => { - profile_id = Some(id); - ItemAction::Keep - } - Err(outcome) => ItemAction::Drop(outcome), - } - } - _ => ItemAction::Keep, - }); - - profile_id -} - -/// Transfers transaction metadata to profile and check its size. -fn expand_profile( - item: &mut Item, - event: &Event, - config: &Config, - client_ip: Option, - filter_settings: &ProjectFiltersConfig, - global_config: &GlobalConfig, -) -> Result { - match relay_profiling::expand_profile( - &item.payload(), - event, - client_ip, - filter_settings, - global_config, - ) { - Ok((id, payload)) => { - if payload.len() <= config.max_profile_size() { - item.set_payload(ContentType::Json, payload); - Ok(id) - } else { - Err(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit), - ))) - } - } - Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => { - Err(Outcome::Filtered(filter_stat_key)) - } - Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling( - relay_profiling::discard_reason(err), - ))), - } -} - #[cfg(test)] #[cfg(feature = "processing")] mod tests { - use crate::envelope::Envelope; + use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; use crate::processing; @@ -157,7 +71,6 @@ mod tests { use crate::services::projects::project::ProjectInfo; use crate::testutils::create_test_processor; use insta::assert_debug_snapshot; - use relay_cogs::Token; use relay_dynamic_config::Feature; use relay_event_schema::protocol::{EventId, ProfileContext}; use relay_system::Addr; @@ -269,9 +182,7 @@ mod tests { }, }; - let Ok(Some(Submit::Envelope(new_envelope))) = - processor.process(&mut Token::noop(), message).await - else { + let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else { panic!(); }; let new_envelope = new_envelope.envelope(); @@ -403,9 +314,7 @@ mod tests { }, }; - let Ok(Some(Submit::Envelope(new_envelope))) = - processor.process(&mut Token::noop(), message).await - else { + let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else { panic!(); }; let new_envelope = new_envelope.envelope(); @@ -477,10 +386,7 @@ mod tests { }, }; - let envelope = processor - .process(&mut Token::noop(), message) - .await - .unwrap(); + let envelope = processor.process(message).await.unwrap(); assert!(envelope.is_none()); } @@ -552,9 +458,7 @@ mod tests { }, }; - let Ok(Some(Submit::Envelope(new_envelope))) = - processor.process(&mut Token::noop(), message).await - else { + let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else { panic!(); }; let new_envelope = new_envelope.envelope(); diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index c1b1145ae75..09bce8f03c0 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -266,7 +266,6 @@ fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result Date: Mon, 17 Nov 2025 09:36:54 +0100 Subject: [PATCH 02/39] fixes --- relay-server/src/processing/common.rs | 1 + .../src/processing/transactions/mod.rs | 31 ++++++++----------- relay-server/src/services/processor.rs | 1 + 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/relay-server/src/processing/common.rs b/relay-server/src/processing/common.rs index 28d9d69e442..60a8b66bc3e 100644 --- a/relay-server/src/processing/common.rs +++ b/relay-server/src/processing/common.rs @@ -13,6 +13,7 @@ macro_rules! outputs { ($($variant:ident => $ty:ty,)*) => { /// All known [`Processor`] outputs. #[derive(Debug)] + #[allow(clippy::large_enum_variant)] pub enum Outputs { $( $variant(<$ty as Processor>::Output) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 04f37cf0e30..2a9286e013b 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -235,31 +235,27 @@ pub struct SerializedTransaction { transaction: Item, attachments: Items, profile: Option, + spans: usize, } -impl SerializedTransaction { - fn items(&self) -> impl Iterator { +impl Counted for SerializedTransaction { + fn quantities(&self) -> Quantities { let Self { headers: _, transaction, attachments, profile, + spans, } = self; - std::iter::once(transaction) - .chain(attachments) - .chain(profile.iter()) - } -} - -impl Counted for SerializedTransaction { - fn quantities(&self) -> Quantities { - let mut quantities = BTreeMap::new(); - for item in self.items() { - for (category, size) in item.quantities() { - *quantities.entry(category).or_default() += size; - } - } - quantities.into_iter().collect() + let mut quantities = transaction.quantities(); + debug_assert!(!transaction.spans_extracted()); + quantities.extend(attachments.quantities()); + quantities.extend(profile.quantities()); + quantities.extend([ + (DataCategory::Span, *spans), + (DataCategory::SpanIndexed, *spans), + ]); + quantities } } @@ -572,7 +568,6 @@ impl Counted for IndexedTransaction { /// Output of the transaction processor. #[derive(Debug)] -#[allow(clippy::large_enum_variant)] pub enum TransactionOutput { Full(Managed>), Indexed(Managed>), diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index eb6927ca86c..6feee270d14 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1235,6 +1235,7 @@ impl EnvelopeProcessorService { transactions: TransactionProcessor::new( Arc::clone("a_limiter), geoip_lookup.clone(), + #[cfg(feature = "processing")] quotas.clone(), ), }, From cea5ff1aaf0a44de247824174810b70f31490ee6 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 17 Nov 2025 11:37:23 +0100 Subject: [PATCH 03/39] refactor profile test --- relay-server/src/envelope/item.rs | 21 +++ .../src/processing/transactions/mod.rs | 20 ++- .../src/services/processor/profile.rs | 162 ++++++------------ 3 files changed, 90 insertions(+), 113 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index b4858788484..6ecabbbba12 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -38,6 +38,7 @@ impl Item { other: BTreeMap::new(), metrics_extracted: false, spans_extracted: false, + span_count: 0, sampled: true, fully_normalized: false, profile_type: None, @@ -190,6 +191,11 @@ impl Item { } } + /// Returns the number of spans in `event.spans` if the item is an event item. + pub fn span_count(&self) -> u32 { + self.headers.span_count + } + /// Returns the content type of this item's payload. #[cfg_attr(not(feature = "processing"), allow(dead_code))] pub fn content_type(&self) -> Option<&ContentType> { @@ -834,6 +840,17 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] spans_extracted: bool, + /// The number of spans in the `event.spans` array. + /// + /// Should always be zero except for transaction items. + /// + /// When a transaction is dropped before spans were extracted from a transaction, + /// this number is used to emit correct outcomes for the spans category. + /// + /// This number does *not* count the transaction itself. + #[serde(default, skip_serializing_if = "is_zero")] + span_count: u32, + /// Whether the event has been _fully_ normalized. /// /// If the event has been partially normalized, this flag is false. By @@ -904,6 +921,10 @@ fn is_true(value: &bool) -> bool { *value } +fn is_zero(val: &u32) -> bool { + *val == 0 +} + #[cfg(test)] mod tests { use crate::integrations::OtelFormat; diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 2a9286e013b..cf56aa4da11 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -235,7 +235,6 @@ pub struct SerializedTransaction { transaction: Item, attachments: Items, profile: Option, - spans: usize, } impl Counted for SerializedTransaction { @@ -245,16 +244,18 @@ impl Counted for SerializedTransaction { transaction, attachments, profile, - spans, } = self; let mut quantities = transaction.quantities(); debug_assert!(!transaction.spans_extracted()); quantities.extend(attachments.quantities()); quantities.extend(profile.quantities()); + + let span_count = (transaction.span_count() + 1) as usize; quantities.extend([ - (DataCategory::Span, *spans), - (DataCategory::SpanIndexed, *spans), + (DataCategory::Span, span_count), + (DataCategory::SpanIndexed, span_count), ]); + quantities } } @@ -574,6 +575,17 @@ pub enum TransactionOutput { OnlyProfile(Managed), } +impl TransactionOutput { + #[cfg(test)] + pub fn event(self) -> Option> { + match self { + Self::Full(managed) => Some(managed.accept(|x| x).transaction.0), + Self::Indexed(managed) => Some(managed.accept(|x| x).transaction.0), + Self::OnlyProfile(managed) => None, + } + } +} + impl Forward for TransactionOutput { fn serialize_envelope( self, diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 2455440e584..9d5092e22ce 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -65,7 +65,7 @@ mod tests { use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; - use crate::processing; + use crate::processing::{self, Outputs}; use crate::services::processor::Submit; use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup}; use crate::services::projects::project::ProjectInfo; @@ -77,6 +77,39 @@ mod tests { use super::*; + async fn process_event( + envelope: Box, + config: Config, + project_info: &ProjectInfo, + ) -> Option> { + let processor = create_test_processor(config).await; + let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); + assert_eq!(envelopes.len(), 1); + let (group, envelope) = envelopes.pop().unwrap(); + + let envelope = ManagedEnvelope::new(envelope, Addr::dummy()); + + let message = ProcessEnvelopeGrouped { + group, + envelope, + ctx: processing::Context { + project_info, + ..processing::Context::for_test() + }, + }; + + let result = processor.process(message).await.unwrap()?; + + let Submit::Output { + output: Outputs::Transactions(t), + ctx: _, + } = result + else { + panic!(); + }; + Some(t.event().unwrap()) + } + #[tokio::test] async fn test_profile_id_transfered() { let config = Config::from_json_value(serde_json::json!({ @@ -86,7 +119,6 @@ mod tests { } })) .unwrap(); - let processor = create_test_processor(config).await; let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() @@ -167,37 +199,12 @@ mod tests { let mut project_info = ProjectInfo::default(); project_info.config.features.0.insert(Feature::Profiling); - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - - let (group, envelope) = envelopes.pop().unwrap(); - let envelope = ManagedEnvelope::new(envelope, Addr::dummy()); - - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - project_info: &project_info, - ..processing::Context::for_test() - }, - }; - - let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else { - panic!(); - }; - let new_envelope = new_envelope.envelope(); - - // Get the re-serialized context. - let item = new_envelope - .get_item_by(|item| item.ty() == &ItemType::Transaction) - .unwrap(); - let transaction = Annotated::::from_json_bytes(&item.payload()).unwrap(); - let context = transaction - .value() - .unwrap() - .context::() + let event = process_event(envelope, config, &project_info) + .await .unwrap(); + let context = event.value().unwrap().context::().unwrap(); + assert_debug_snapshot!(context, @r###" ProfileContext { profile_id: EventId( @@ -218,7 +225,6 @@ mod tests { } })) .unwrap(); - let processor = create_test_processor(config).await; let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() @@ -299,36 +305,10 @@ mod tests { let mut project_info = ProjectInfo::default(); project_info.config.features.0.insert(Feature::Profiling); - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - - let (group, envelope) = envelopes.pop().unwrap(); - let envelope = ManagedEnvelope::new(envelope, Addr::dummy()); - - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - project_info: &project_info, - ..processing::Context::for_test() - }, - }; - - let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else { - panic!(); - }; - let new_envelope = new_envelope.envelope(); - - // Get the re-serialized context. - let item = new_envelope - .get_item_by(|item| item.ty() == &ItemType::Transaction) - .unwrap(); - let transaction = Annotated::::from_json_bytes(&item.payload()).unwrap(); - let context = transaction - .value() - .unwrap() - .context::() + let event = process_event(envelope, config, &project_info) + .await .unwrap(); + let context = event.value().unwrap().context::().unwrap(); assert_debug_snapshot!(context, @r###" ProfileContext { @@ -341,9 +321,15 @@ mod tests { #[tokio::test] async fn filter_standalone_profile() { relay_log::init_test!(); + let config = Config::from_json_value(serde_json::json!({ + "processing": { + "enabled": true, + "kafka_config": [] + } + })) + .unwrap(); // Setup - let processor = create_test_processor(Default::default()).await; let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() @@ -371,23 +357,8 @@ mod tests { let mut project_info = ProjectInfo::default(); project_info.config.features.0.insert(Feature::Profiling); - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - - let (group, envelope) = envelopes.pop().unwrap(); - let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy()); - - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - project_info: &project_info, - ..processing::Context::for_test() - }, - }; - - let envelope = processor.process(message).await.unwrap(); - assert!(envelope.is_none()); + let event = process_event(envelope, config, &project_info).await; + assert!(event.is_none()); } #[tokio::test] @@ -400,7 +371,6 @@ mod tests { } })) .unwrap(); - let processor = create_test_processor(config).await; let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() @@ -443,36 +413,10 @@ mod tests { let mut project_info = ProjectInfo::default(); project_info.config.features.0.insert(Feature::Profiling); - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - - let (group, envelope) = envelopes.pop().unwrap(); - let envelope = ManagedEnvelope::new(envelope, Addr::dummy()); - - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - project_info: &project_info, - ..processing::Context::for_test() - }, - }; - - let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else { - panic!(); - }; - let new_envelope = new_envelope.envelope(); - - // Get the re-serialized context. - let item = new_envelope - .get_item_by(|item| item.ty() == &ItemType::Transaction) - .unwrap(); - let transaction = Annotated::::from_json_bytes(&item.payload()).unwrap(); - let context = transaction - .value() - .unwrap() - .context::() + let event = process_event(envelope, config, &project_info) + .await .unwrap(); + let context = event.value().unwrap().context::().unwrap(); assert_debug_snapshot!(context, @r###" ProfileContext { From 989b03164eb44e6ba62596e00257dcc447b55ea8 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 17 Nov 2025 15:33:48 +0100 Subject: [PATCH 04/39] ref --- relay-server/src/processing/transactions/mod.rs | 14 +++++++++++++- .../src/processing/transactions/process.rs | 6 +++++- relay-server/src/services/processor/profile.rs | 2 ++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index cf56aa4da11..b2de6dee505 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -155,18 +155,23 @@ impl Processor for TransactionProcessor { let project_id = work.scoping().project_id; let mut metrics = Metrics::default(); - let mut work = process::parse(work)?; + relay_log::trace!("Epand transaction"); + let mut work = process::expand(work)?; + relay_log::trace!("Prepare transaction data"); process::prepare_data(&mut work, &mut ctx, &mut metrics)?; + relay_log::trace!("Normalize transaction"); let mut work = process::normalize(work, ctx, &self.geoip_lookup)?; + relay_log::trace!("Filter transaction"); let filters_status = process::run_inbound_filters(&work, ctx)?; #[cfg(feature = "processing")] let quotas_client = self.quotas_client.as_ref(); #[cfg(not(feature = "processing"))] let quotas_client = None; + relay_log::trace!("Sample transaction"); let sampling_result = process::run_dynamic_sampling(&work, ctx, filters_status, quotas_client).await; @@ -174,7 +179,9 @@ impl Processor for TransactionProcessor { let server_sample_rate = sampling_result.sample_rate(); if let Some(outcome) = sampling_result.into_dropped_outcome() { + relay_log::trace!("Process profile transaction"); let work = process::process_profile(work, ctx, SamplingDecision::Drop); + relay_log::trace!("Extract transaction metrics"); let (work, extracted_metrics) = process::extract_metrics(work, ctx, SamplingDecision::Drop)?; @@ -191,18 +198,23 @@ impl Processor for TransactionProcessor { } // Need to scrub the transaction before extracting spans. + relay_log::trace!("Scrubbing transaction"); work = process::scrub(work, ctx)?; #[cfg(feature = "processing")] if ctx.config.processing_enabled() { // Process profiles before extracting metrics, to make sure they are removed if they are invalid. + relay_log::trace!("Process transaction profile"); let work = process::process_profile(work, ctx, SamplingDecision::Keep); + relay_log::trace!("Extract transaction metrics"); let (indexed, extracted_metrics) = process::extract_metrics(work, ctx, SamplingDecision::Keep)?; + relay_log::trace!("Extract spans"); let mut indexed = process::extract_spans(indexed, ctx, server_sample_rate); + relay_log::trace!("Enforce quotas"); self.limiter.enforce_quotas(&mut indexed, ctx).await?; if !indexed.flags.fully_normalized { diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 6c68fce9cbc..c063d011fd0 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -30,7 +30,7 @@ use crate::statsd::{RelayCounters, RelayTimers}; use crate::utils::SamplingResult; /// Parses the event payload. -pub fn parse( +pub fn expand( work: Managed, ) -> Result>, Rejected> { work.try_map(|work, _| { @@ -261,6 +261,10 @@ pub fn extract_metrics( .0; Ok::<_, Error>(ExpandedTransaction::::from(work)) })?; + relay_log::trace!( + "Did extract transaction metrics? {}", + &indexed.flags.metrics_extracted + ); let metrics = indexed.wrap(metrics.into_inner()); Ok((indexed, metrics)) } diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 9d5092e22ce..e806f4b1175 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -93,6 +93,7 @@ mod tests { group, envelope, ctx: processing::Context { + config: &processor.inner.config, project_info, ..processing::Context::for_test() }, @@ -112,6 +113,7 @@ mod tests { #[tokio::test] async fn test_profile_id_transfered() { + relay_log::init_test!(); let config = Config::from_json_value(serde_json::json!({ "processing": { "enabled": true, From b962ef81c11cdba6bb3f84a110f05363b3289a89 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 10:36:47 +0100 Subject: [PATCH 05/39] Log error on extraction failure --- relay-server/src/processing/transactions/extraction.rs | 4 +++- relay-server/src/processing/transactions/process.rs | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/relay-server/src/processing/transactions/extraction.rs b/relay-server/src/processing/transactions/extraction.rs index 17fac07346e..55087e1ae27 100644 --- a/relay-server/src/processing/transactions/extraction.rs +++ b/relay-server/src/processing/transactions/extraction.rs @@ -59,10 +59,12 @@ pub fn extract_metrics( } = ctx; if metrics_extracted { + debug_assert!(false, "metrics extraction called twice"); return Ok(EventMetricsExtracted(metrics_extracted)); } let Some(event) = event.value_mut() else { - return Ok(EventMetricsExtracted(metrics_extracted)); + // Nothing to extract, but metrics extraction was called. + return Ok(EventMetricsExtracted(true)); }; // NOTE: This function requires a `metric_extraction` in the project config. Legacy configs diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index c063d011fd0..e1563131001 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -245,7 +245,6 @@ pub fn extract_metrics( let mut metrics = ProcessingExtractedMetrics::new(); let indexed = work.try_map(|mut work, record_keeper| { - // Extract metrics here, we're about to drop the event/transaction. work.flags.metrics_extracted = extraction::extract_metrics( &mut work.transaction.0, &mut metrics, @@ -261,10 +260,11 @@ pub fn extract_metrics( .0; Ok::<_, Error>(ExpandedTransaction::::from(work)) })?; - relay_log::trace!( - "Did extract transaction metrics? {}", - &indexed.flags.metrics_extracted - ); + + if !indexed.flags.metrics_extracted { + relay_log::error!("Failed to extract metrics. Check project config"); + } + let metrics = indexed.wrap(metrics.into_inner()); Ok((indexed, metrics)) } From f063ad665fd82b9fa8e2ec8b7e805246521ef5a1 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 10:59:50 +0100 Subject: [PATCH 06/39] bookkeeping in metrics extraction --- relay-server/src/managed/managed.rs | 4 ++-- .../src/processing/transactions/extraction.rs | 2 +- .../src/processing/transactions/mod.rs | 18 +++++++++++++----- .../src/processing/transactions/process.rs | 17 ++++++++++++++++- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/relay-server/src/managed/managed.rs b/relay-server/src/managed/managed.rs index 1b98dc24042..ab8cd90ea40 100644 --- a/relay-server/src/managed/managed.rs +++ b/relay-server/src/managed/managed.rs @@ -761,8 +761,8 @@ impl<'a> RecordKeeper<'a> { let mut sums = debug::Quantities::from(&original).0; for (category, offset) in &self.modifications { - let v = sums.entry(*category).or_default(); - match v.checked_add_signed(*offset) { + let v = dbg!(sums.entry(dbg!(*category)).or_default()); + match dbg!(v.checked_add_signed(*offset)) { Some(result) => *v = result, None => emit!( category, diff --git a/relay-server/src/processing/transactions/extraction.rs b/relay-server/src/processing/transactions/extraction.rs index 55087e1ae27..d00fd8b9f98 100644 --- a/relay-server/src/processing/transactions/extraction.rs +++ b/relay-server/src/processing/transactions/extraction.rs @@ -104,7 +104,7 @@ pub fn extract_metrics( } None => { relay_log::debug!("Legacy transaction metrics config is missing"); - return Ok(EventMetricsExtracted(metrics_extracted)); + return Ok(EventMetricsExtracted(dbg!(metrics_extracted))); } }; diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index b2de6dee505..99762607f46 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -347,6 +347,17 @@ impl>> ExpandedTransaction { } } +impl>> ExpandedTransaction { + fn count_embedded_spans_and_self(&self) -> usize { + 1 + self + .transaction + .as_ref() + .value() + .and_then(|e| e.spans.value()) + .map_or(0, Vec::len) + } +} + impl>> Counted for ExpandedTransaction { fn quantities(&self) -> Quantities { let Self { @@ -361,12 +372,9 @@ impl>> Counted for ExpandedTransaction { let mut quantities = transaction.quantities(); if !flags.spans_extracted { // TODO: encode this flag into the type and remove `extracted_spans` from the "BeforeSpanExtraction" type. + // TODO: write span_count header in fast path. debug_assert!(extracted_spans.0.is_empty()); - let span_count = 1 + transaction - .as_ref() - .value() - .and_then(|e| e.spans.value()) - .map_or(0, Vec::len); + let span_count = self.count_embedded_spans_and_self(); quantities.push((DataCategory::SpanIndexed, span_count)); // TODO: instead of looking at the flag, depend on `T` if !flags.metrics_extracted { diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index e1563131001..1367448f439 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -1,3 +1,4 @@ +use std::isize; use std::sync::Arc; use relay_base_schema::events::EventType; @@ -254,10 +255,24 @@ pub fn extract_metrics( ctx, sampling_decision, metrics_extracted: work.flags.metrics_extracted, - spans_extracted: work.flags.spans_extracted, + spans_extracted: work.flags.spans_extracted, // TODO: what does fn do with this flag? }, )? .0; + + // TODO: remove `(SpanIndexed, 0)` from bookkeeping. + + // The extracted metrics now take over the "total" data categories. + record_keeper.modify_by(DataCategory::Transaction, -1); + record_keeper.modify_by( + DataCategory::Span, + -dbg!( + work.count_embedded_spans_and_self() + .try_into() + .unwrap_or(isize::MAX) + ), + ); + Ok::<_, Error>(ExpandedTransaction::::from(work)) })?; From add0dd86835426f2386039d10a59bce03c08fa15 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 11:33:23 +0100 Subject: [PATCH 07/39] fix except rate limiting --- .../src/processing/transactions/mod.rs | 191 +++++++----------- .../src/processing/transactions/process.rs | 79 ++++---- .../src/processing/transactions/profile.rs | 11 +- 3 files changed, 117 insertions(+), 164 deletions(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 99762607f46..bd56118d07c 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -1,5 +1,6 @@ #![expect(unused)] use std::collections::BTreeMap; +use std::marker::PhantomData; use std::sync::Arc; use relay_base_schema::events::EventType; @@ -19,6 +20,7 @@ use crate::envelope::{ContentType, EnvelopeHeaders, Item, ItemType, Items}; use crate::managed::{ Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; +use crate::processing::spans::{Indexed, TotalAndIndexed}; use crate::processing::transactions::profile::{Profile, ProfileWithHeaders}; use crate::processing::utils::event::{ EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_type, @@ -138,7 +140,7 @@ impl Processor for TransactionProcessor { let work = SerializedTransaction { headers, - transaction, + event: transaction, attachments, profile, }; @@ -220,7 +222,7 @@ impl Processor for TransactionProcessor { if !indexed.flags.fully_normalized { relay_log::error!( tags.project = %project_id, - tags.ty = event_type(&indexed.transaction.0).map(|e| e.to_string()).unwrap_or("none".to_owned()), + tags.ty = event_type(&indexed.event.0).map(|e| e.to_string()).unwrap_or("none".to_owned()), "ingested event without normalizing" ); }; @@ -234,7 +236,7 @@ impl Processor for TransactionProcessor { self.limiter.enforce_quotas(&mut work, ctx).await?; Ok(Output { - main: Some(TransactionOutput::Full(work)), + main: Some(TransactionOutput::TotalAndIndexed(work)), metrics: None, }) } @@ -244,7 +246,7 @@ impl Processor for TransactionProcessor { #[derive(Debug)] pub struct SerializedTransaction { headers: EnvelopeHeaders, - transaction: Item, + event: Item, attachments: Items, profile: Option, } @@ -253,16 +255,16 @@ impl Counted for SerializedTransaction { fn quantities(&self) -> Quantities { let Self { headers: _, - transaction, + event, attachments, profile, } = self; - let mut quantities = transaction.quantities(); - debug_assert!(!transaction.spans_extracted()); + let mut quantities = event.quantities(); + debug_assert!(!event.spans_extracted()); quantities.extend(attachments.quantities()); quantities.extend(profile.quantities()); - let span_count = (transaction.span_count() + 1) as usize; + let span_count = (event.span_count() + 1) as usize; quantities.extend([ (DataCategory::Span, span_count), (DataCategory::SpanIndexed, span_count), @@ -277,41 +279,46 @@ impl Counted for SerializedTransaction { /// The type parameter indicates whether metrics were already extracted, which changes how /// we count the transaction (total vs indexed). #[derive(Debug)] -pub struct ExpandedTransaction { +pub struct ExpandedTransaction { headers: EnvelopeHeaders, - transaction: T, // might be empty + event: Annotated, flags: Flags, attachments: Items, profile: Option, extracted_spans: ExtractedSpans, + + #[expect(unused, reason = "marker field, only set never read")] + category: C, } -impl From> for ExpandedTransaction { - fn from(value: ExpandedTransaction) -> Self { +impl From> for ExpandedTransaction { + fn from(value: ExpandedTransaction) -> Self { let ExpandedTransaction { headers, - transaction, + event, flags, attachments, profile, extracted_spans, + category, } = value; Self { headers, - transaction: IndexedTransaction::from(transaction), + event, flags, attachments, profile, extracted_spans, + category: Indexed, } } } -impl>> ExpandedTransaction { +impl ExpandedTransaction { fn serialize_envelope(self) -> Result, serde_json::Error> { let Self { headers, - transaction, + event, flags: Flags { metrics_extracted, @@ -321,10 +328,9 @@ impl>> ExpandedTransaction { attachments, profile, extracted_spans, + category, } = self; - let event = transaction.into(); - let mut items = smallvec![]; if !event.is_empty() { let data = metric!(timer(RelayTimers::EventProcessingSerialization), { @@ -347,39 +353,69 @@ impl>> ExpandedTransaction { } } -impl>> ExpandedTransaction { +impl ExpandedTransaction { fn count_embedded_spans_and_self(&self) -> usize { 1 + self - .transaction - .as_ref() + .event .value() .and_then(|e| e.spans.value()) .map_or(0, Vec::len) } } -impl>> Counted for ExpandedTransaction { +impl Counted for ExpandedTransaction { fn quantities(&self) -> Quantities { let Self { headers: _, - transaction, + event: transaction, flags, attachments, profile, extracted_spans, + category, } = self; + debug_assert!(!flags.metrics_extracted); + let mut quantities = smallvec![ + (DataCategory::Transaction, 1), + (DataCategory::TransactionIndexed, 1), + ]; + + // For now, span extraction happens after metrics extraction: + debug_assert!(!flags.spans_extracted); + debug_assert!(extracted_spans.0.is_empty()); + + let span_count = self.count_embedded_spans_and_self(); + quantities.extend([ + (DataCategory::Span, span_count), + (DataCategory::SpanIndexed, span_count), + ]); + + quantities.extend(attachments.quantities()); + quantities.extend(profile.quantities()); + + quantities + } +} - let mut quantities = transaction.quantities(); +impl Counted for ExpandedTransaction { + fn quantities(&self) -> Quantities { + let Self { + headers: _, + event: transaction, + flags, + attachments, + profile, + extracted_spans, + category, + } = self; + debug_assert!(flags.metrics_extracted); + let mut quantities = smallvec![(DataCategory::TransactionIndexed, 1),]; if !flags.spans_extracted { // TODO: encode this flag into the type and remove `extracted_spans` from the "BeforeSpanExtraction" type. // TODO: write span_count header in fast path. - debug_assert!(extracted_spans.0.is_empty()); + debug_assert!(!extracted_spans.0.is_empty()); let span_count = self.count_embedded_spans_and_self(); quantities.push((DataCategory::SpanIndexed, span_count)); - // TODO: instead of looking at the flag, depend on `T` - if !flags.metrics_extracted { - quantities.push((DataCategory::Span, span_count)); - } } quantities.extend(attachments.quantities()); @@ -388,14 +424,17 @@ impl>> Counted for ExpandedTransaction { if !extracted_spans.0.is_empty() { // For now, span extraction always happens at the very end: debug_assert!(flags.metrics_extracted); + quantities.extend(extracted_spans.quantities()); } - quantities.extend(extracted_spans.quantities()); quantities } } -impl>> RateLimited for Managed> { +impl RateLimited for Managed> +where + ExpandedTransaction: Counted, +{ type Error = Error; async fn enforce( @@ -410,11 +449,12 @@ impl>> RateLimited for Managed); - -impl AsRef> for Transaction { - fn as_ref(&self) -> &Annotated { - &self.0 - } -} - -impl AsMut> for Transaction { - fn as_mut(&mut self) -> &mut Annotated { - &mut self.0 - } -} - -impl From for Annotated { - fn from(val: Transaction) -> Self { - val.0 - } -} - -impl Counted for Transaction { - fn quantities(&self) -> Quantities { - debug_assert!( - self.0 - .value() - .is_none_or(|event| event.ty.value() == Some(&EventType::Transaction)) - ); - smallvec![ - (DataCategory::TransactionIndexed, 1), - (DataCategory::Transaction, 1), - ] - } -} - -/// Same as [`Transaction`], but only reports the `TransactionIndexed` quantity. -/// -/// After dynamic sampling & metrics extraction, the total category is owned by `ExtractedMetrics`. -#[derive(Debug)] -pub struct IndexedTransaction(Annotated); - -impl AsRef> for IndexedTransaction { - fn as_ref(&self) -> &Annotated { - &self.0 - } -} - -impl AsMut> for IndexedTransaction { - fn as_mut(&mut self) -> &mut Annotated { - &mut self.0 - } -} - -impl From for Annotated { - fn from(val: IndexedTransaction) -> Self { - val.0 - } -} - -impl From for IndexedTransaction { - fn from(value: Transaction) -> Self { - Self(value.0) - } -} - -impl Counted for IndexedTransaction { - fn quantities(&self) -> Quantities { - debug_assert!( - self.0 - .value() - .is_none_or(|event| event.ty.value() == Some(&EventType::Transaction)) - ); - smallvec![(DataCategory::TransactionIndexed, 1)] - } -} - /// Output of the transaction processor. #[derive(Debug)] pub enum TransactionOutput { - Full(Managed>), - Indexed(Managed>), + TotalAndIndexed(Managed>), + Indexed(Managed>), OnlyProfile(Managed), } @@ -599,8 +562,8 @@ impl TransactionOutput { #[cfg(test)] pub fn event(self) -> Option> { match self { - Self::Full(managed) => Some(managed.accept(|x| x).transaction.0), - Self::Indexed(managed) => Some(managed.accept(|x| x).transaction.0), + Self::TotalAndIndexed(managed) => Some(managed.accept(|x| x).event), + Self::Indexed(managed) => Some(managed.accept(|x| x).event), Self::OnlyProfile(managed) => None, } } @@ -612,7 +575,7 @@ impl Forward for TransactionOutput { ctx: ForwardContext<'_>, ) -> Result>, Rejected<()>> { match self { - TransactionOutput::Full(managed) => managed.try_map(|output, _| { + TransactionOutput::TotalAndIndexed(managed) => managed.try_map(|output, _| { output .serialize_envelope() .map_err(drop) diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 1367448f439..97340e1fe35 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -15,11 +15,12 @@ use smallvec::smallvec; use crate::envelope::Item; use crate::managed::{Counted, Managed, ManagedResult, Quantities, RecordKeeper, Rejected}; use crate::metrics_extraction::transactions::ExtractedMetrics; +use crate::processing::spans::{Indexed, TotalAndIndexed}; use crate::processing::transactions::extraction::{self, ExtractMetricsContext}; use crate::processing::transactions::profile::{Profile, ProfileWithHeaders}; use crate::processing::transactions::{ - Error, ExpandedTransaction, ExtractedSpans, Flags, IndexedTransaction, SerializedTransaction, - Transaction, TransactionOutput, profile, spans, + Error, ExpandedTransaction, ExtractedSpans, Flags, SerializedTransaction, TransactionOutput, + profile, spans, }; use crate::processing::utils::event::{ EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, @@ -33,18 +34,18 @@ use crate::utils::SamplingResult; /// Parses the event payload. pub fn expand( work: Managed, -) -> Result>, Rejected> { +) -> Result>, Rejected> { work.try_map(|work, _| { let SerializedTransaction { headers, - transaction: transaction_item, + event: transaction_item, attachments, profile, } = work; - let mut transaction = metric!(timer(RelayTimers::EventProcessingDeserialize), { + let mut event = metric!(timer(RelayTimers::EventProcessingDeserialize), { Annotated::::from_json_bytes(&transaction_item.payload()) })?; - if let Some(event) = transaction.value_mut() { + if let Some(event) = event.value_mut() { event.ty = EventType::Transaction.into(); } let flags = Flags { @@ -57,11 +58,12 @@ pub fn expand( Ok::<_, Error>(ExpandedTransaction { headers, - transaction: Transaction(transaction), + event, flags, attachments, profile, extracted_spans: ExtractedSpans(vec![]), + category: TotalAndIndexed, }) }) } @@ -78,22 +80,21 @@ fn validate_flags(flags: &Flags) { /// Validates and massages the data. pub fn prepare_data( - work: &mut Managed>, + work: &mut Managed>, ctx: &mut Context<'_>, metrics: &mut Metrics, ) -> Result<(), Rejected> { let scoping = work.scoping(); work.try_modify(|work, record_keeper| { let profile_id = profile::filter(work, record_keeper, *ctx, scoping.project_id); - let event = &mut work.transaction.0; - profile::transfer_id(event, profile_id); - profile::remove_context_if_rate_limited(event, scoping, *ctx); + profile::transfer_id(&mut work.event, profile_id); + profile::remove_context_if_rate_limited(&mut work.event, scoping, *ctx); - utils::dsc::validate_and_set_dsc(&mut work.headers, event, ctx); + utils::dsc::validate_and_set_dsc(&mut work.headers, &mut work.event, ctx); utils::event::finalize( &work.headers, - event, + &mut work.event, work.attachments.iter(), metrics, ctx.config, @@ -105,15 +106,15 @@ pub fn prepare_data( /// Normalizes the transaction event. pub fn normalize( - work: Managed>, + work: Managed>, ctx: Context<'_>, geoip_lookup: &GeoIpLookup, -) -> Result>, Rejected> { +) -> Result>, Rejected> { let project_id = work.scoping().project_id; work.try_map(|mut work, _| { work.flags.fully_normalized = utils::event::normalize( &work.headers, - &mut work.transaction.0, + &mut work.event, EventFullyNormalized(work.flags.fully_normalized), project_id, ctx, @@ -126,10 +127,10 @@ pub fn normalize( /// Rejects the entire unit of work if one of the project's filters matches. pub fn run_inbound_filters( - work: &Managed>, + work: &Managed>, ctx: Context<'_>, ) -> Result> { - utils::event::filter(&work.headers, &work.transaction.0, &ctx) + utils::event::filter(&work.headers, &work.event, &ctx) .map_err(ProcessingError::EventFiltered) .map_err(Error::from) .reject(work) @@ -137,7 +138,7 @@ pub fn run_inbound_filters( /// Computes the dynamic sampling decision for the unit of work, but does not perform action on data. pub async fn run_dynamic_sampling( - work: &Managed>, + work: &Managed>, ctx: Context<'_>, filters_status: FiltersStatus, quotas_client: Option<&AsyncRedisClient>, @@ -152,7 +153,7 @@ pub async fn run_dynamic_sampling( } async fn do_run_dynamic_sampling( - work: &Managed>, + work: &Managed>, ctx: Context<'_>, filters_status: FiltersStatus, quotas_client: Option<&AsyncRedisClient>, @@ -173,18 +174,12 @@ async fn do_run_dynamic_sampling( if let Some(quotas_client) = quotas_client { reservoir.set_redis(work.scoping().organization_id, quotas_client); } - utils::dynamic_sampling::run( - work.headers.dsc(), - &work.transaction.0, - &ctx, - Some(&reservoir), - ) - .await + utils::dynamic_sampling::run(work.headers.dsc(), &work.event, &ctx, Some(&reservoir)).await } /// Finishes transaction and profile processing when the dynamic sampling decision was "drop". pub fn drop_after_sampling( - mut work: Managed>, + mut work: Managed>, ctx: Context<'_>, outcome: Outcome, ) -> Option> { @@ -203,10 +198,10 @@ pub fn drop_after_sampling( /// Processes the profile attached to the transaction. pub fn process_profile( - work: Managed>, + work: Managed>, ctx: Context<'_>, sampling_decision: SamplingDecision, -) -> Managed> { +) -> Managed> { work.map(|mut work, record_keeper| { let mut profile_id = None; if let Some(profile) = work.profile.as_mut() { @@ -214,7 +209,7 @@ pub fn process_profile( let result = profile::process( profile, work.headers.meta().client_addr(), - work.transaction.0.value(), + work.event.value(), &ctx, ); match result { @@ -224,21 +219,21 @@ pub fn process_profile( Ok(id) => profile_id = Some(id), }; } - profile::transfer_id(&mut work.transaction.0, profile_id); - profile::scrub_profiler_id(&mut work.transaction.0); + profile::transfer_id(&mut work.event, profile_id); + profile::scrub_profiler_id(&mut work.event); work }) } type IndexedWithMetrics = ( - Managed>, + Managed>, Managed, ); /// Extracts transaction & span metrics from the payload. pub fn extract_metrics( - work: Managed>, + work: Managed>, ctx: Context<'_>, sampling_decision: SamplingDecision, ) -> Result> { @@ -247,7 +242,7 @@ pub fn extract_metrics( let mut metrics = ProcessingExtractedMetrics::new(); let indexed = work.try_map(|mut work, record_keeper| { work.flags.metrics_extracted = extraction::extract_metrics( - &mut work.transaction.0, + &mut work.event, &mut metrics, ExtractMetricsContext { dsc: work.headers.dsc(), @@ -273,7 +268,7 @@ pub fn extract_metrics( ), ); - Ok::<_, Error>(ExpandedTransaction::::from(work)) + Ok::<_, Error>(ExpandedTransaction::::from(work)) })?; if !indexed.flags.metrics_extracted { @@ -287,14 +282,14 @@ pub fn extract_metrics( /// Converts the spans embedded in the transaction into top-level span items. #[cfg(feature = "processing")] pub fn extract_spans( - work: Managed>, + work: Managed>, ctx: Context<'_>, server_sample_rate: Option, -) -> Managed> { +) -> Managed> { work.map(|mut work, r| { if let Some(results) = spans::extract_from_event( work.headers.dsc(), - &work.transaction.0, + &work.event, ctx.global_config, ctx.config, server_sample_rate, @@ -322,10 +317,10 @@ pub fn scrub( ctx: Context<'_>, ) -> Result>, Rejected> where - T: Counted + AsRef> + AsMut>, + ExpandedTransaction: Counted, { work.try_map(|mut work, _| { - utils::event::scrub(work.transaction.as_mut(), ctx.project_info)?; + utils::event::scrub(&mut work.event, ctx.project_info)?; utils::attachments::scrub(work.attachments.iter_mut(), ctx.project_info); Ok::<_, Error>(work) }) diff --git a/relay-server/src/processing/transactions/profile.rs b/relay-server/src/processing/transactions/profile.rs index 6ac55f652b2..7269b5f1e51 100644 --- a/relay-server/src/processing/transactions/profile.rs +++ b/relay-server/src/processing/transactions/profile.rs @@ -13,7 +13,8 @@ use relay_protocol::{Getter, Remark, RemarkType}; use crate::envelope::{ContentType, EnvelopeHeaders, Item, ItemType}; use crate::managed::{Counted, Managed, Quantities, RecordKeeper}; -use crate::processing::transactions::{Error, ExpandedTransaction, Transaction}; +use crate::processing::spans::TotalAndIndexed; +use crate::processing::transactions::{Error, ExpandedTransaction}; use crate::processing::{Context, CountRateLimited}; use crate::services::outcome::{DiscardReason, Outcome}; use crate::utils::should_filter; @@ -49,7 +50,7 @@ impl CountRateLimited for Managed { /// /// Returns the profile id of the single remaining profile, if there is one. pub fn filter( - work: &mut ExpandedTransaction, + work: &mut ExpandedTransaction, record_keeper: &mut RecordKeeper, ctx: Context, project_id: ProjectId, @@ -63,12 +64,6 @@ pub fn filter( Outcome::Invalid(DiscardReason::FeatureDisabled(feature)), work.profile.take(), ); - } else if work.transaction.0.value().is_none() && profile_item.sampled() { - // A profile with `sampled=true` should never be without a transaction - record_keeper.reject_err( - Outcome::Invalid(DiscardReason::Profiling("missing_transaction")), - work.profile.take(), - ); } else { match relay_profiling::parse_metadata(&profile_item.payload(), project_id) { Ok(id) => { From 40b1326ca1bbd103847ef82e52c60e3b25f4da30 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 11:55:34 +0100 Subject: [PATCH 08/39] fix --- .../src/processing/transactions/extraction.rs | 2 ++ .../src/processing/transactions/mod.rs | 34 +++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/relay-server/src/processing/transactions/extraction.rs b/relay-server/src/processing/transactions/extraction.rs index d00fd8b9f98..13076d8f9b5 100644 --- a/relay-server/src/processing/transactions/extraction.rs +++ b/relay-server/src/processing/transactions/extraction.rs @@ -57,6 +57,8 @@ pub fn extract_metrics( metrics_extracted, spans_extracted, } = ctx; + // TODO(follow-up): this function should always extract metrics. Dynamic sampling should validate + // the full metrics extraction config and skip sampling if it is incomplete. if metrics_extracted { debug_assert!(false, "metrics extraction called twice"); diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index bd56118d07c..aab99f22c55 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -222,7 +222,7 @@ impl Processor for TransactionProcessor { if !indexed.flags.fully_normalized { relay_log::error!( tags.project = %project_id, - tags.ty = event_type(&indexed.event.0).map(|e| e.to_string()).unwrap_or("none".to_owned()), + tags.ty = event_type(&indexed.event).map(|e| e.to_string()).unwrap_or("none".to_owned()), "ingested event without normalizing" ); }; @@ -300,7 +300,7 @@ impl From> for ExpandedTransaction attachments, profile, extracted_spans, - category, + category: _, } = value; Self { headers, @@ -328,7 +328,7 @@ impl ExpandedTransaction { attachments, profile, extracted_spans, - category, + category: _, } = self; let mut items = smallvec![]; @@ -372,7 +372,7 @@ impl Counted for ExpandedTransaction { attachments, profile, extracted_spans, - category, + category: _, } = self; debug_assert!(!flags.metrics_extracted); let mut quantities = smallvec![ @@ -406,7 +406,7 @@ impl Counted for ExpandedTransaction { attachments, profile, extracted_spans, - category, + category: _, } = self; debug_assert!(flags.metrics_extracted); let mut quantities = smallvec![(DataCategory::TransactionIndexed, 1),]; @@ -433,6 +433,7 @@ impl Counted for ExpandedTransaction { impl RateLimited for Managed> where + T: TransactionQuantities, ExpandedTransaction: Counted, { type Error = Error; @@ -454,12 +455,12 @@ where attachments, profile, extracted_spans, - category, + category: _, } = self.as_ref(); // If there is a transaction limit, drop everything. // This also affects profiles that lost their transaction due to sampling. - for (category, quantity) in transaction.quantities() { + for (category, quantity) in T::transaction_quantities() { let limits = rate_limiter .try_consume(scoping.item(category), quantity) .await; @@ -519,6 +520,25 @@ where } } +trait TransactionQuantities { + fn transaction_quantities() -> Quantities; +} + +impl TransactionQuantities for TotalAndIndexed { + fn transaction_quantities() -> Quantities { + smallvec![ + (DataCategory::Transaction, 1), + (DataCategory::TransactionIndexed, 1) + ] + } +} + +impl TransactionQuantities for Indexed { + fn transaction_quantities() -> Quantities { + smallvec![(DataCategory::TransactionIndexed, 1)] + } +} + /// Wrapper for spans extracted from a transaction. /// /// Needed to not emit the total category for spans. From ce20017c30eda53c6180009210d7f4524572593c Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 14:15:19 +0100 Subject: [PATCH 09/39] split rate limiting code --- .../src/processing/transactions/mod.rs | 103 +++++++++++++----- 1 file changed, 77 insertions(+), 26 deletions(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index aab99f22c55..922960ee88a 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -431,11 +431,7 @@ impl Counted for ExpandedTransaction { } } -impl RateLimited for Managed> -where - T: TransactionQuantities, - ExpandedTransaction: Counted, -{ +impl RateLimited for Managed> { type Error = Error; async fn enforce( @@ -448,6 +444,12 @@ where { let scoping = self.scoping(); + // These debug assertions are here because this function does not check indexed or extracted + // span limits. + // TODO: encode flags into types instead. + debug_assert!(!self.flags.metrics_extracted); + debug_assert!(!self.flags.spans_extracted); + let ExpandedTransaction { headers: _, event: transaction, @@ -460,16 +462,80 @@ where // If there is a transaction limit, drop everything. // This also affects profiles that lost their transaction due to sampling. - for (category, quantity) in T::transaction_quantities() { + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::Transaction), 1) + .await; + + // We do not check indexed quota at this point, because metrics have not been extracted + // from the transaction yet. + + let attachment_quantities = attachments.quantities(); + + // Check profile limits: + for (category, quantity) in profile.quantities() { let limits = rate_limiter .try_consume(scoping.item(category), quantity) .await; if !limits.is_empty() { - return Err(self.reject_err(Error::from(limits))); + self.modify(|this, record_keeper| { + record_keeper.reject_err(Error::from(limits), this.profile.take()); + }); } } + // Check attachment limits: + for (category, quantity) in attachment_quantities { + let limits = rate_limiter + .try_consume(scoping.item(category), quantity) + .await; + + if !limits.is_empty() { + self.modify(|this, record_keeper| { + record_keeper + .reject_err(Error::from(limits), std::mem::take(&mut this.attachments)); + }); + } + } + + Ok(()) + } +} + +impl RateLimited for Managed> { + type Error = Error; + + async fn enforce( + &mut self, + mut rate_limiter: R, + ctx: Context<'_>, + ) -> Result<(), Rejected> + where + R: RateLimiter, + { + let scoping = self.scoping(); + + // These debug assertions are here because this function does not check indexed or extracted + // span limits. + // TODO: encode flags into types instead. + debug_assert!(self.flags.metrics_extracted); + + let ExpandedTransaction { + headers: _, + event: transaction, + flags, + attachments, + profile, + extracted_spans, + category: _, + } = self.as_ref(); + + // If there is a transaction limit, drop everything. + // This also affects profiles that lost their transaction due to sampling. + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::TransactionIndexed), 1) + .await; + let attachment_quantities = attachments.quantities(); let span_quantities = extracted_spans.quantities(); @@ -520,25 +586,6 @@ where } } -trait TransactionQuantities { - fn transaction_quantities() -> Quantities; -} - -impl TransactionQuantities for TotalAndIndexed { - fn transaction_quantities() -> Quantities { - smallvec![ - (DataCategory::Transaction, 1), - (DataCategory::TransactionIndexed, 1) - ] - } -} - -impl TransactionQuantities for Indexed { - fn transaction_quantities() -> Quantities { - smallvec![(DataCategory::TransactionIndexed, 1)] - } -} - /// Wrapper for spans extracted from a transaction. /// /// Needed to not emit the total category for spans. @@ -555,6 +602,10 @@ impl Counted for ExtractedSpans { .all(|i| i.ty() == &ItemType::Span && i.metrics_extracted()) ); + if self.0.is_empty() { + return smallvec![]; + } + smallvec![(DataCategory::SpanIndexed, self.0.len())] } } From 4ccc5575aca280440ec139d9a565c22a659fc824 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 14:59:59 +0100 Subject: [PATCH 10/39] rust tests pass --- relay-server/src/managed/managed.rs | 4 +- .../src/processing/transactions/extraction.rs | 2 +- .../src/processing/transactions/mod.rs | 3 +- .../src/processing/transactions/process.rs | 12 ++- relay-server/src/services/processor.rs | 2 +- .../src/services/processor/profile.rs | 79 ++++++------------- 6 files changed, 34 insertions(+), 68 deletions(-) diff --git a/relay-server/src/managed/managed.rs b/relay-server/src/managed/managed.rs index ab8cd90ea40..1b98dc24042 100644 --- a/relay-server/src/managed/managed.rs +++ b/relay-server/src/managed/managed.rs @@ -761,8 +761,8 @@ impl<'a> RecordKeeper<'a> { let mut sums = debug::Quantities::from(&original).0; for (category, offset) in &self.modifications { - let v = dbg!(sums.entry(dbg!(*category)).or_default()); - match dbg!(v.checked_add_signed(*offset)) { + let v = sums.entry(*category).or_default(); + match v.checked_add_signed(*offset) { Some(result) => *v = result, None => emit!( category, diff --git a/relay-server/src/processing/transactions/extraction.rs b/relay-server/src/processing/transactions/extraction.rs index 13076d8f9b5..b354e2efe23 100644 --- a/relay-server/src/processing/transactions/extraction.rs +++ b/relay-server/src/processing/transactions/extraction.rs @@ -106,7 +106,7 @@ pub fn extract_metrics( } None => { relay_log::debug!("Legacy transaction metrics config is missing"); - return Ok(EventMetricsExtracted(dbg!(metrics_extracted))); + return Ok(EventMetricsExtracted(metrics_extracted)); } }; diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 922960ee88a..7f0e2878f45 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -413,7 +413,7 @@ impl Counted for ExpandedTransaction { if !flags.spans_extracted { // TODO: encode this flag into the type and remove `extracted_spans` from the "BeforeSpanExtraction" type. // TODO: write span_count header in fast path. - debug_assert!(!extracted_spans.0.is_empty()); + debug_assert!(extracted_spans.0.is_empty()); let span_count = self.count_embedded_spans_and_self(); quantities.push((DataCategory::SpanIndexed, span_count)); } @@ -422,6 +422,7 @@ impl Counted for ExpandedTransaction { quantities.extend(profile.quantities()); if !extracted_spans.0.is_empty() { + debug_assert!(flags.spans_extracted); // For now, span extraction always happens at the very end: debug_assert!(flags.metrics_extracted); quantities.extend(extracted_spans.quantities()); diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 97340e1fe35..6e4a171d569 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -1,4 +1,3 @@ -use std::isize; use std::sync::Arc; use relay_base_schema::events::EventType; @@ -90,7 +89,7 @@ pub fn prepare_data( profile::transfer_id(&mut work.event, profile_id); profile::remove_context_if_rate_limited(&mut work.event, scoping, *ctx); - utils::dsc::validate_and_set_dsc(&mut work.headers, &mut work.event, ctx); + utils::dsc::validate_and_set_dsc(&mut work.headers, &work.event, ctx); utils::event::finalize( &work.headers, @@ -261,11 +260,10 @@ pub fn extract_metrics( record_keeper.modify_by(DataCategory::Transaction, -1); record_keeper.modify_by( DataCategory::Span, - -dbg!( - work.count_embedded_spans_and_self() - .try_into() - .unwrap_or(isize::MAX) - ), + -work + .count_embedded_spans_and_self() + .try_into() + .unwrap_or(isize::MAX), ); Ok::<_, Error>(ExpandedTransaction::::from(work)) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 6feee270d14..f0736c194a7 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -3471,7 +3471,7 @@ mod tests { }); let message = ProcessEnvelopeGrouped { - group: ProcessingGroup::Transaction, + group: ProcessingGroup::Error, envelope: managed_envelope, ctx: processing::Context { config: &Config::from_json_value(config.clone()).unwrap(), diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index e806f4b1175..876f9cc315f 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -71,17 +71,20 @@ mod tests { use crate::services::projects::project::ProjectInfo; use crate::testutils::create_test_processor; use insta::assert_debug_snapshot; - use relay_dynamic_config::Feature; + use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig, TransactionMetricsConfig}; use relay_event_schema::protocol::{EventId, ProfileContext}; use relay_system::Addr; use super::*; - async fn process_event( - envelope: Box, - config: Config, - project_info: &ProjectInfo, - ) -> Option> { + async fn process_event(envelope: Box) -> Option> { + let config = Config::from_json_value(serde_json::json!({ + "processing": { + "enabled": true, + "kafka_config": [] + } + })) + .unwrap(); let processor = create_test_processor(config).await; let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); assert_eq!(envelopes.len(), 1); @@ -89,12 +92,20 @@ mod tests { let envelope = ManagedEnvelope::new(envelope, Addr::dummy()); + let mut project_info = ProjectInfo::default().sanitized(); + project_info.config.transaction_metrics = + Some(ErrorBoundary::Ok(TransactionMetricsConfig::new())); + project_info.config.features.0.insert(Feature::Profiling); + + let mut global_config = GlobalConfig::default(); + global_config.normalize(); let message = ProcessEnvelopeGrouped { group, envelope, ctx: processing::Context { config: &processor.inner.config, - project_info, + project_info: &project_info, + global_config: &global_config, ..processing::Context::for_test() }, }; @@ -114,13 +125,7 @@ mod tests { #[tokio::test] async fn test_profile_id_transfered() { relay_log::init_test!(); - let config = Config::from_json_value(serde_json::json!({ - "processing": { - "enabled": true, - "kafka_config": [] - } - })) - .unwrap(); + let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() @@ -198,12 +203,7 @@ mod tests { item }); - let mut project_info = ProjectInfo::default(); - project_info.config.features.0.insert(Feature::Profiling); - - let event = process_event(envelope, config, &project_info) - .await - .unwrap(); + let event = process_event(envelope).await.unwrap(); let context = event.value().unwrap().context::().unwrap(); @@ -220,13 +220,6 @@ mod tests { #[tokio::test] async fn test_invalid_profile_id_not_transfered() { // Setup - let config = Config::from_json_value(serde_json::json!({ - "processing": { - "enabled": true, - "kafka_config": [] - } - })) - .unwrap(); let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() @@ -304,12 +297,7 @@ mod tests { item }); - let mut project_info = ProjectInfo::default(); - project_info.config.features.0.insert(Feature::Profiling); - - let event = process_event(envelope, config, &project_info) - .await - .unwrap(); + let event = process_event(envelope).await.unwrap(); let context = event.value().unwrap().context::().unwrap(); assert_debug_snapshot!(context, @r###" @@ -323,14 +311,6 @@ mod tests { #[tokio::test] async fn filter_standalone_profile() { relay_log::init_test!(); - let config = Config::from_json_value(serde_json::json!({ - "processing": { - "enabled": true, - "kafka_config": [] - } - })) - .unwrap(); - // Setup let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" @@ -356,23 +336,12 @@ mod tests { item }); - let mut project_info = ProjectInfo::default(); - project_info.config.features.0.insert(Feature::Profiling); - - let event = process_event(envelope, config, &project_info).await; + let event = process_event(envelope).await; assert!(event.is_none()); } #[tokio::test] async fn test_profile_id_removed_profiler_id_kept() { - // Setup - let config = Config::from_json_value(serde_json::json!({ - "processing": { - "enabled": true, - "kafka_config": [] - } - })) - .unwrap(); let event_id = EventId::new(); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" .parse() @@ -415,9 +384,7 @@ mod tests { let mut project_info = ProjectInfo::default(); project_info.config.features.0.insert(Feature::Profiling); - let event = process_event(envelope, config, &project_info) - .await - .unwrap(); + let event = process_event(envelope).await.unwrap(); let context = event.value().unwrap().context::().unwrap(); assert_debug_snapshot!(context, @r###" From d8a903af34c97650a2fb2c720c7aa743dc6345ba Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 15:42:05 +0100 Subject: [PATCH 11/39] feat(spans): Add span_count item header --- relay-server/src/envelope/item.rs | 28 +++++++++++++++++++ .../src/services/projects/cache/project.rs | 28 +++++++++++-------- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 4e2c2c24969..793fa0a2c84 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -39,6 +39,7 @@ impl Item { other: BTreeMap::new(), metrics_extracted: false, spans_extracted: false, + span_count: 0, sampled: true, fully_normalized: false, profile_type: None, @@ -191,6 +192,18 @@ impl Item { } } + /// Returns the number of spans in `event.spans` if the item is a transaction item (0 otherwise). + pub fn span_count(&self) -> u32 { + self.headers.span_count + } + + /// Sets the span count. Should only be used for transactions. + /// + /// Must match the number of entries in `event.spans`. + pub fn set_span_count(&mut self, value: u32) { + self.headers.span_count = value; + } + /// Returns the content type of this item's payload. #[cfg_attr(not(feature = "processing"), allow(dead_code))] pub fn content_type(&self) -> Option<&ContentType> { @@ -851,6 +864,17 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] spans_extracted: bool, + /// The number of spans in the `event.spans` array. + /// + /// Should always be zero except for transaction items. + /// + /// When a transaction is dropped before spans were extracted from a transaction, + /// this number is used to emit correct outcomes for the spans category. + /// + /// This number does *not* count the transaction itself. + #[serde(default, skip_serializing_if = "is_zero")] + span_count: u32, + /// Whether the event has been _fully_ normalized. /// /// If the event has been partially normalized, this flag is false. By @@ -921,6 +945,10 @@ fn is_true(value: &bool) -> bool { *value } +fn is_zero(val: &u32) -> bool { + *val == 0 +} + #[cfg(test)] mod tests { use crate::integrations::OtelFormat; diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index 904ed24f294..b9e087fc03c 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -89,7 +89,7 @@ impl<'a> Project<'a> { // If we can extract spans from the event, we want to try and count the number of nested // spans to correctly emit negative outcomes in case the transaction itself is dropped. relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), { - sync_spans_to_enforcement(&envelope, &mut enforcement); + sync_spans_to_enforcement(&mut envelope, &mut enforcement); }); enforcement.apply_with_outcomes(&mut envelope); @@ -135,12 +135,12 @@ pub struct CheckedEnvelope { /// On the fast path of rate limiting, we do not have nested spans of a transaction extracted /// as top-level spans, thus if we limited a transaction, we want to count and emit negative /// outcomes for each of the spans nested inside that transaction. -fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) { +fn sync_spans_to_enforcement(envelope: &mut ManagedEnvelope, enforcement: &mut Enforcement) { if !enforcement.is_event_active() { return; } - let spans_count = count_nested_spans(envelope); + let spans_count = count_nested_spans(envelope).unwrap_or(0); if spans_count == 0 { return; } @@ -149,6 +149,7 @@ fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enfor enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count); } + // TODO(follow-up): Do not manually enforce, rely on quantities() instead. if enforcement.event_indexed.is_active() { enforcement.spans_indexed = enforcement .event_indexed @@ -157,20 +158,23 @@ fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enfor } /// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). -fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { +fn count_nested_spans(envelope: &mut ManagedEnvelope) -> Option { #[derive(Debug, serde::Deserialize)] struct PartialEvent { spans: crate::utils::SeqCount, } - envelope - .envelope() - .items() - .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) - .and_then(|item| serde_json::from_slice::(&item.payload()).ok()) - // We do + 1, since we count the transaction itself because it will be extracted - // as a span and counted during the slow path of rate limiting. - .map_or(0, |event| event.spans.0 + 1) + let item = envelope + .envelope_mut() + .items_mut() + .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())?; + + let event = serde_json::from_slice::(&item.payload()).ok()?; + + let count = event.spans.0; + item.set_span_count(count as u32); + + Some(count) } #[cfg(test)] From 4737ed3a6ad9d1d38891e548bc974423acb4cc17 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 15:44:48 +0100 Subject: [PATCH 12/39] Add TODO --- relay-server/src/services/projects/cache/project.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index b9e087fc03c..a38b0f50045 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -173,6 +173,7 @@ fn count_nested_spans(envelope: &mut ManagedEnvelope) -> Option { let count = event.spans.0; item.set_span_count(count as u32); + // TODO(follow-up): Update span count when serializing the event back into the envelope. Some(count) } From 9c27ac406edcb4aa86bbc5b527e0ce57172cb1cb Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 16:09:19 +0100 Subject: [PATCH 13/39] ref: auto-ensure --- relay-server/src/envelope/item.rs | 42 +++++++++++++------ .../src/services/projects/cache/project.rs | 28 +++---------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 793fa0a2c84..edf1748c55d 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -39,7 +39,7 @@ impl Item { other: BTreeMap::new(), metrics_extracted: false, spans_extracted: false, - span_count: 0, + span_count: None, sampled: true, fully_normalized: false, profile_type: None, @@ -192,16 +192,19 @@ impl Item { } } - /// Returns the number of spans in `event.spans` if the item is a transaction item (0 otherwise). - pub fn span_count(&self) -> u32 { - self.headers.span_count - } - - /// Sets the span count. Should only be used for transactions. + /// Returns the number of spans in `event.spans`. /// - /// Must match the number of entries in `event.spans`. - pub fn set_span_count(&mut self, value: u32) { - self.headers.span_count = value; + /// This function lazily sets & returns the span count if it is `None`. + pub fn span_count(&mut self) -> usize { + match &mut self.headers.span_count { + Some(count) => *count, + None => { + let count = self.parse_span_count(); + self.headers.span_count = Some(count); + count + } + } + // TODO(follow-up): Refresh span count after re-serializing the transaction } /// Returns the content type of this item's payload. @@ -535,6 +538,19 @@ impl Item { pub fn is_container(&self) -> bool { self.content_type().is_some_and(ContentType::is_container) } + + fn parse_span_count(&self) -> usize { + #[derive(Debug, serde::Deserialize)] + struct PartialEvent { + spans: crate::utils::SeqCount, + } + + let Ok(event) = serde_json::from_slice::(&self.payload()) else { + return 0; + }; + + event.spans.0 + } } pub type Items = SmallVec<[Item; 3]>; @@ -866,14 +882,14 @@ pub struct ItemHeaders { /// The number of spans in the `event.spans` array. /// - /// Should always be zero except for transaction items. + /// Should never be set except for transaction items. /// /// When a transaction is dropped before spans were extracted from a transaction, /// this number is used to emit correct outcomes for the spans category. /// /// This number does *not* count the transaction itself. - #[serde(default, skip_serializing_if = "is_zero")] - span_count: u32, + #[serde(default)] + span_count: Option, /// Whether the event has been _fully_ normalized. /// diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index a38b0f50045..f90b73e10fe 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -140,7 +140,12 @@ fn sync_spans_to_enforcement(envelope: &mut ManagedEnvelope, enforcement: &mut E return; } - let spans_count = count_nested_spans(envelope).unwrap_or(0); + let spans_count = envelope + .envelope_mut() + .items_mut() + .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) + .map_or(0, |item| item.span_count()); + if spans_count == 0 { return; } @@ -157,27 +162,6 @@ fn sync_spans_to_enforcement(envelope: &mut ManagedEnvelope, enforcement: &mut E } } -/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). -fn count_nested_spans(envelope: &mut ManagedEnvelope) -> Option { - #[derive(Debug, serde::Deserialize)] - struct PartialEvent { - spans: crate::utils::SeqCount, - } - - let item = envelope - .envelope_mut() - .items_mut() - .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())?; - - let event = serde_json::from_slice::(&item.payload()).ok()?; - - let count = event.spans.0; - item.set_span_count(count as u32); - // TODO(follow-up): Update span count when serializing the event back into the envelope. - - Some(count) -} - #[cfg(test)] mod tests { use crate::envelope::{ContentType, Envelope, Item}; From a1b42a293dd85731f7ed3055b6bc7bedd2c0bc7e Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 16:25:56 +0100 Subject: [PATCH 14/39] simplify --- relay-server/src/envelope/item.rs | 25 ++++++++----------- .../src/services/projects/cache/project.rs | 4 ++- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index edf1748c55d..404ef7b54fc 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -193,18 +193,17 @@ impl Item { } /// Returns the number of spans in `event.spans`. + pub fn span_count(&self) -> Option { + self.headers.span_count + } + + /// Sets the `span_count` item header by shallow parsing the event. /// - /// This function lazily sets & returns the span count if it is `None`. - pub fn span_count(&mut self) -> usize { - match &mut self.headers.span_count { - Some(count) => *count, - None => { - let count = self.parse_span_count(); - self.headers.span_count = Some(count); - count - } - } - // TODO(follow-up): Refresh span count after re-serializing the transaction + /// Returns the recomputed count. + pub fn refresh_span_count(&mut self) -> usize { + let count = self.parse_span_count(); + self.headers.span_count = Some(count); + count } /// Returns the content type of this item's payload. @@ -961,10 +960,6 @@ fn is_true(value: &bool) -> bool { *value } -fn is_zero(val: &u32) -> bool { - *val == 0 -} - #[cfg(test)] mod tests { use crate::integrations::OtelFormat; diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index f90b73e10fe..dae1b9d0b43 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -144,7 +144,9 @@ fn sync_spans_to_enforcement(envelope: &mut ManagedEnvelope, enforcement: &mut E .envelope_mut() .items_mut() .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) - .map_or(0, |item| item.span_count()); + .map_or(0, |item| item.refresh_span_count()); + + // TODO(follow-up): Refresh span count after re-serializing the transaction if spans_count == 0 { return; From 6f242b860962b224a10105a3c60f9967c37ed22e Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 16:38:20 +0100 Subject: [PATCH 15/39] ensure --- relay-server/src/envelope/item.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 404ef7b54fc..6daaec27d19 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -193,8 +193,8 @@ impl Item { } /// Returns the number of spans in `event.spans`. - pub fn span_count(&self) -> Option { - self.headers.span_count + pub fn span_count(&self) -> usize { + self.headers.span_count.unwrap_or(0) } /// Sets the `span_count` item header by shallow parsing the event. @@ -206,6 +206,16 @@ impl Item { count } + /// Returns the span_count header, and computes it if it is not yet set. + /// + /// Returns the recomputed count. + pub fn ensure_span_count(&mut self) -> usize { + match self.headers.span_count { + Some(count) => count, + None => self.refresh_span_count(), + } + } + /// Returns the content type of this item's payload. #[cfg_attr(not(feature = "processing"), allow(dead_code))] pub fn content_type(&self) -> Option<&ContentType> { @@ -887,7 +897,7 @@ pub struct ItemHeaders { /// this number is used to emit correct outcomes for the spans category. /// /// This number does *not* count the transaction itself. - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] span_count: Option, /// Whether the event has been _fully_ normalized. From fb2c13251761c1ef6d9272dfeb685b83f08c3ae2 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 16:47:36 +0100 Subject: [PATCH 16/39] ref --- relay-server/src/envelope/item.rs | 16 +++++++++------- relay-server/src/services/processor/event.rs | 1 + .../src/services/projects/cache/project.rs | 4 +--- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 6daaec27d19..d5cc12be603 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -202,8 +202,8 @@ impl Item { /// Returns the recomputed count. pub fn refresh_span_count(&mut self) -> usize { let count = self.parse_span_count(); - self.headers.span_count = Some(count); - count + self.headers.span_count = count; + count.unwrap_or(0) } /// Returns the span_count header, and computes it if it is not yet set. @@ -548,17 +548,19 @@ impl Item { self.content_type().is_some_and(ContentType::is_container) } - fn parse_span_count(&self) -> usize { + fn parse_span_count(&self) -> Option { #[derive(Debug, serde::Deserialize)] struct PartialEvent { spans: crate::utils::SeqCount, } - let Ok(event) = serde_json::from_slice::(&self.payload()) else { - return 0; - }; + if self.headers.ty != ItemType::Transaction || self.headers.spans_extracted { + return None; + } + + let event = serde_json::from_slice::(&self.payload()).ok()?; - event.spans.0 + Some(event.spans.0) } } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index f3b0b5a7e2b..bb6f0534b47 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -162,6 +162,7 @@ pub fn serialize( event_item.set_metrics_extracted(event_metrics_extracted.0); event_item.set_spans_extracted(spans_extracted.0); event_item.set_fully_normalized(event_fully_normalized.0); + event_item.refresh_span_count(); managed_envelope.envelope_mut().add_item(event_item); diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index dae1b9d0b43..faf20a7e20e 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -144,9 +144,7 @@ fn sync_spans_to_enforcement(envelope: &mut ManagedEnvelope, enforcement: &mut E .envelope_mut() .items_mut() .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) - .map_or(0, |item| item.refresh_span_count()); - - // TODO(follow-up): Refresh span count after re-serializing the transaction + .map_or(0, |item| item.ensure_span_count()); if spans_count == 0 { return; From 799103e13d1874dd68320a03279c4797bd3276dd Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 18 Nov 2025 17:01:51 +0100 Subject: [PATCH 17/39] test --- tests/integration/test_spans.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 21b9fbcf8b9..088f950a063 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -1308,6 +1308,7 @@ def summarize_outcomes(): @pytest.mark.parametrize("category", ["transaction", "transaction_indexed"]) +@pytest.mark.parametrize("span_count_header", [None, 666]) def test_rate_limit_is_consistent_between_transaction_and_spans( mini_sentry, relay_with_processing, @@ -1316,6 +1317,7 @@ def test_rate_limit_is_consistent_between_transaction_and_spans( metrics_consumer, outcomes_consumer, category, + span_count_header, ): """ Rate limits are consistent between transactions and nested spans. @@ -1407,19 +1409,23 @@ def summarize_outcomes(): with maybe_raises: relay.send_envelope(project_id, envelope) + # The fast path now trusts the span_count item header + expected_span_count = 2 if span_count_header is None else 667 + if category == "transaction": assert summarize_outcomes() == { (2, 2): 1, # Transaction, Rate Limited (9, 2): 1, # TransactionIndexed, Rate Limited - (12, 2): 2, # Span, Rate Limited - (16, 2): 2, # SpanIndexed, Rate Limited + (12, 2): expected_span_count, # Span, Rate Limited + (16, 2): expected_span_count, # SpanIndexed, Rate Limited } assert usage_metrics() == (0, 0) elif category == "transaction_indexed": assert summarize_outcomes() == { (9, 2): 1, # TransactionIndexed, Rate Limited - (16, 2): 2, # SpanIndexed, Rate Limited + (16, 2): expected_span_count, # SpanIndexed, Rate Limited } + # Metrics are always correct: assert usage_metrics() == (1, 2) From 0ce93c7ae9b0b9ceb75e1c8fe48cbe60b0f76ab3 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 09:51:55 +0100 Subject: [PATCH 18/39] rust test --- relay-server/src/envelope/item.rs | 5 ++ .../src/services/projects/cache/project.rs | 76 ++++++++++++++++++- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index d5cc12be603..b1cdc438260 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -197,6 +197,11 @@ impl Item { self.headers.span_count.unwrap_or(0) } + #[cfg(test)] + pub fn set_span_count(&mut self, value: usize) { + self.headers.span_count = Some(value); + } + /// Sets the `span_count` item header by shallow parsing the event. /// /// Returns the recomputed count. diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index faf20a7e20e..a04f93d5143 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -144,7 +144,7 @@ fn sync_spans_to_enforcement(envelope: &mut ManagedEnvelope, enforcement: &mut E .envelope_mut() .items_mut() .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) - .map_or(0, |item| item.ensure_span_count()); + .map_or(0, |item| 1 + item.ensure_span_count()); if spans_count == 0 { return; @@ -202,6 +202,15 @@ mod tests { RequestMeta::new(dsn) } + fn get_span_count(managed_envelope: &ManagedEnvelope) -> usize { + managed_envelope + .envelope() + .items() + .next() + .unwrap() + .span_count() + } + #[tokio::test] async fn test_track_nested_spans_outcomes() { let config = Default::default(); @@ -258,7 +267,9 @@ mod tests { let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone()); + assert_eq!(get_span_count(&managed_envelope), 0); // not written yet project.check_envelope(managed_envelope).await.unwrap(); + drop(outcome_aggregator); let expected = [ @@ -274,4 +285,67 @@ mod tests { assert_eq!(outcome.quantity, expected_quantity); } } + + #[tokio::test] + async fn test_track_nested_spans_outcomes_predefined() { + let config = Default::default(); + let project = create_project( + &config, + Some(json!({ + "quotas": [{ + "id": "foo", + "categories": ["transaction"], + "window": 3600, + "limit": 0, + "reasonCode": "foo", + }] + })), + ); + + let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); + + let mut transaction = Item::new(ItemType::Transaction); + transaction.set_span_count(666); + transaction.set_payload( + ContentType::Json, + r#"{ + "event_id": "52df9022835246eeb317dbd739ccd059", + "type": "transaction", + "transaction": "I have a stale timestamp, but I'm recent!", + "start_timestamp": 1, + "timestamp": 2, + "contexts": { + "trace": { + "trace_id": "ff62a8b040f340bda5d830223def1d81", + "span_id": "bd429c44b67a3eb4" + } + }, + "spans": [] +}"#, + ); + + envelope.add_item(transaction); + + let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom(); + + let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone()); + + assert_eq!(get_span_count(&managed_envelope), 666); + project.check_envelope(managed_envelope).await.unwrap(); + + drop(outcome_aggregator); + + let expected = [ + (DataCategory::Transaction, 1), + (DataCategory::TransactionIndexed, 1), + (DataCategory::Span, 667), + (DataCategory::SpanIndexed, 667), + ]; + + for (expected_category, expected_quantity) in expected { + let outcome = outcome_aggregator_rx.recv().await.unwrap(); + assert_eq!(outcome.category, expected_category); + assert_eq!(outcome.quantity, expected_quantity); + } + } } From 1ebc03f7a3eeb963aae01e61009de8b999d3392c Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 10:50:09 +0100 Subject: [PATCH 19/39] fix integration test --- tests/integration/test_spans.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 088f950a063..5c473b2cabf 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -1368,6 +1368,8 @@ def summarize_outcomes(): end = start + timedelta(seconds=1) envelope = envelope_with_transaction_and_spans(start, end) + if span_count_header is not None: + envelope.items[0].headers["span_count"] = span_count_header # First batch passes relay.send_envelope(project_id, envelope) @@ -1423,7 +1425,7 @@ def summarize_outcomes(): elif category == "transaction_indexed": assert summarize_outcomes() == { (9, 2): 1, # TransactionIndexed, Rate Limited - (16, 2): expected_span_count, # SpanIndexed, Rate Limited + (16, 2): 2, # SpanIndexed, Rate Limited } # Metrics are always correct: assert usage_metrics() == (1, 2) From 0120631262c02c6c9e7cb99d0b4ffb9ceecacf96 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 12:51:07 +0100 Subject: [PATCH 20/39] update test --- tests/integration/test_proxy.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/test_proxy.py b/tests/integration/test_proxy.py index b3aedf024ab..c52953c8f35 100644 --- a/tests/integration/test_proxy.py +++ b/tests/integration/test_proxy.py @@ -148,6 +148,8 @@ class RateLimitBehavior: "transaction", PayloadType.JSON, [ + {"category": "span", "quantity": 1, "reason": "generic"}, + {"category": "span_indexed", "quantity": 1, "reason": "generic"}, {"category": "transaction", "quantity": 1, "reason": "generic"}, {"category": "transaction_indexed", "quantity": 1, "reason": "generic"}, ], From a2600b28ffb35d0e91e8752d2c7b45f1ba1f60ae Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 12:56:26 +0100 Subject: [PATCH 21/39] doc: changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index be3e3bcc0c7..02daec9ea97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ - Add `response_timeout` config setting for Redis. ([#5329](https://github.com/getsentry/relay/pull/5329)) - Remove `projects:discard-transaction` feature flag. ([#5307](https://github.com/getsentry/relay/pull/5307)) - Add SEER_USER data category. ([#5383](https://github.com/getsentry/relay/pull/5383)) +- Add `span_count` item header to the envelope protocol. ([#5392](https://github.com/getsentry/relay/pull/5392)) **Bug Fixes**: From a9beb7c3a6fea1c21074f1957876e1d6d2ba2a40 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 13:33:53 +0100 Subject: [PATCH 22/39] review comments --- CHANGELOG.md | 2 +- relay-server/src/envelope/item.rs | 28 +++++++++++++------ relay-server/src/services/processor/event.rs | 2 +- .../src/services/projects/cache/project.rs | 7 ++--- tests/integration/test_spans.py | 2 +- 5 files changed, 24 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02daec9ea97..f1927d48867 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ **Features**: - Support uploading attachments directly to objectstore. ([#5367](https://github.com/getsentry/relay/pull/5367)) +- Add `span_count` item header to the envelope protocol. ([#5392](https://github.com/getsentry/relay/pull/5392)) ## 25.11.0 @@ -26,7 +27,6 @@ - Add `response_timeout` config setting for Redis. ([#5329](https://github.com/getsentry/relay/pull/5329)) - Remove `projects:discard-transaction` feature flag. ([#5307](https://github.com/getsentry/relay/pull/5307)) - Add SEER_USER data category. ([#5383](https://github.com/getsentry/relay/pull/5383)) -- Add `span_count` item header to the envelope protocol. ([#5392](https://github.com/getsentry/relay/pull/5392)) **Bug Fixes**: diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index b1cdc438260..fc8f0ab43d8 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -14,6 +14,7 @@ use smallvec::{SmallVec, smallvec}; use crate::envelope::{AttachmentType, ContentType, EnvelopeError}; use crate::integrations::{Integration, LogsIntegration, SpansIntegration}; +use crate::statsd::RelayTimers; #[derive(Clone, Debug)] pub struct Item { @@ -192,32 +193,41 @@ impl Item { } } - /// Returns the number of spans in `event.spans`. + /// Returns the number of spans in the `event.spans` array. + /// + /// Should always be 0 except for transaction items. + /// + /// When a transaction is dropped before spans were extracted from a transaction, + /// this number is used to emit correct outcomes for the spans category. + /// + /// This number does *not* count the transaction itself. pub fn span_count(&self) -> usize { self.headers.span_count.unwrap_or(0) } - #[cfg(test)] - pub fn set_span_count(&mut self, value: usize) { - self.headers.span_count = Some(value); + /// Sets the number of spans in the transaction payload. + pub fn set_span_count(&mut self, value: Option) { + self.headers.span_count = value; } /// Sets the `span_count` item header by shallow parsing the event. /// /// Returns the recomputed count. - pub fn refresh_span_count(&mut self) -> usize { + fn refresh_span_count(&mut self) -> usize { let count = self.parse_span_count(); self.headers.span_count = count; count.unwrap_or(0) } - /// Returns the span_count header, and computes it if it is not yet set. - /// - /// Returns the recomputed count. + /// Returns the `span_count`` header, and computes it if it has not yet been set. pub fn ensure_span_count(&mut self) -> usize { match self.headers.span_count { Some(count) => count, - None => self.refresh_span_count(), + None => { + relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), { + self.refresh_span_count() + }) + } } } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index bb6f0534b47..a0229ed84f9 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -162,7 +162,7 @@ pub fn serialize( event_item.set_metrics_extracted(event_metrics_extracted.0); event_item.set_spans_extracted(spans_extracted.0); event_item.set_fully_normalized(event_fully_normalized.0); - event_item.refresh_span_count(); + event_item.set_span_count(event.value().and_then(|e| e.spans.value()).map(Vec::len)); managed_envelope.envelope_mut().add_item(event_item); diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index a04f93d5143..3e40786c087 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -9,7 +9,6 @@ use crate::managed::ManagedEnvelope; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::projects::cache::state::SharedProject; use crate::services::projects::project::ProjectState; -use crate::statsd::RelayTimers; use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter}; /// A loaded project. @@ -88,9 +87,7 @@ impl<'a> Project<'a> { // If we can extract spans from the event, we want to try and count the number of nested // spans to correctly emit negative outcomes in case the transaction itself is dropped. - relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), { - sync_spans_to_enforcement(&mut envelope, &mut enforcement); - }); + sync_spans_to_enforcement(&mut envelope, &mut enforcement); enforcement.apply_with_outcomes(&mut envelope); @@ -305,7 +302,7 @@ mod tests { let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); let mut transaction = Item::new(ItemType::Transaction); - transaction.set_span_count(666); + transaction.set_span_count(Some(666)); transaction.set_payload( ContentType::Json, r#"{ diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 5c473b2cabf..15f7be042a6 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -1425,7 +1425,7 @@ def summarize_outcomes(): elif category == "transaction_indexed": assert summarize_outcomes() == { (9, 2): 1, # TransactionIndexed, Rate Limited - (16, 2): 2, # SpanIndexed, Rate Limited + (16, 2): expected_span_count, # SpanIndexed, Rate Limited } # Metrics are always correct: assert usage_metrics() == (1, 2) From 5acad09a926ccb76d4b53d1c4fc0162a442faebc Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 14:02:29 +0100 Subject: [PATCH 23/39] test --- tests/integration/test_spans.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 15f7be042a6..84353bb4cc1 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -1423,9 +1423,11 @@ def summarize_outcomes(): } assert usage_metrics() == (0, 0) elif category == "transaction_indexed": + # We do not check indexed limits on the fast path, + # so we count the correct number of spans (ignoring the span_count header): assert summarize_outcomes() == { (9, 2): 1, # TransactionIndexed, Rate Limited - (16, 2): expected_span_count, # SpanIndexed, Rate Limited + (16, 2): 2, # SpanIndexed, Rate Limited } # Metrics are always correct: assert usage_metrics() == (1, 2) From 976c7cda01ad9006757c0f91ed0f70361b1bf270 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 15:17:31 +0100 Subject: [PATCH 24/39] Count spans in item quantities --- relay-server/src/envelope/item.rs | 17 +++++-- .../src/services/projects/cache/project.rs | 48 +++++++------------ 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index fc8f0ab43d8..be38416da0c 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -110,10 +110,19 @@ impl Item { match self.ty() { ItemType::Event => smallvec![(DataCategory::Error, item_count)], - ItemType::Transaction => smallvec![ - (DataCategory::Transaction, item_count), - (DataCategory::TransactionIndexed, item_count), - ], + ItemType::Transaction => { + let mut quantities = smallvec![ + (DataCategory::Transaction, item_count), + (DataCategory::TransactionIndexed, item_count), + ]; + if !self.spans_extracted() { + quantities.extend([ + (DataCategory::Span, item_count + self.span_count()), + (DataCategory::SpanIndexed, item_count + self.span_count()), + ]); + } + quantities + } ItemType::Security | ItemType::RawSecurity => { smallvec![(DataCategory::Security, item_count)] } diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index 3e40786c087..f836ae9423c 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -9,7 +9,7 @@ use crate::managed::ManagedEnvelope; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::projects::cache::state::SharedProject; use crate::services::projects::project::ProjectState; -use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter}; +use crate::utils::{CheckLimits, EnvelopeLimiter}; /// A loaded project. pub struct Project<'a> { @@ -76,19 +76,25 @@ impl<'a> Project<'a> { let current_limits = self.rate_limits().current_limits(); let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]); + + // To get the correct span outcomes, we have to partially parse the event payload + // and count the spans contained in the transaction events. + // For performance reasons, we only do this if there is an active limit on `Transaction`. + if current_limits + .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Transaction)]) + { + ensure_span_count(&mut envelope); + } + let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| { let current_limits = Arc::clone(¤t_limits); async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) } }); - let (mut enforcement, mut rate_limits) = envelope_limiter + let (enforcement, mut rate_limits) = envelope_limiter .compute(envelope.envelope_mut(), &scoping) .await?; - // If we can extract spans from the event, we want to try and count the number of nested - // spans to correctly emit negative outcomes in case the transaction itself is dropped. - sync_spans_to_enforcement(&mut envelope, &mut enforcement); - enforcement.apply_with_outcomes(&mut envelope); envelope.update(); @@ -127,35 +133,13 @@ pub struct CheckedEnvelope { pub rate_limits: RateLimits, } -/// Adds category limits for the nested spans inside a transaction. -/// -/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted -/// as top-level spans, thus if we limited a transaction, we want to count and emit negative -/// outcomes for each of the spans nested inside that transaction. -fn sync_spans_to_enforcement(envelope: &mut ManagedEnvelope, enforcement: &mut Enforcement) { - if !enforcement.is_event_active() { - return; - } - - let spans_count = envelope +fn ensure_span_count(envelope: &mut ManagedEnvelope) { + if let Some(transaction_item) = envelope .envelope_mut() .items_mut() .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) - .map_or(0, |item| 1 + item.ensure_span_count()); - - if spans_count == 0 { - return; - } - - if enforcement.event.is_active() { - enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count); - } - - // TODO(follow-up): Do not manually enforce, rely on quantities() instead. - if enforcement.event_indexed.is_active() { - enforcement.spans_indexed = enforcement - .event_indexed - .clone_for(DataCategory::SpanIndexed, spans_count); + { + transaction_item.ensure_span_count(); } } From 886dbe7393d92fa62f10fe4d9fb2fcdc49ed352c Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 19 Nov 2025 15:21:22 +0100 Subject: [PATCH 25/39] instr: only measure actual parsing --- relay-server/src/envelope/item.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index be38416da0c..4e318fe8433 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -232,11 +232,7 @@ impl Item { pub fn ensure_span_count(&mut self) -> usize { match self.headers.span_count { Some(count) => count, - None => { - relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), { - self.refresh_span_count() - }) - } + None => self.refresh_span_count(), } } @@ -582,7 +578,9 @@ impl Item { return None; } - let event = serde_json::from_slice::(&self.payload()).ok()?; + let event = relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), { + serde_json::from_slice::(&self.payload()).ok()? + }); Some(event.spans.0) } From c0950908fa18632b5565ef471fa9c5cd69531cef Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 07:42:11 +0100 Subject: [PATCH 26/39] fix tests --- relay-server/src/utils/rate_limits.rs | 10 +++++++++- tests/integration/test_outcome.py | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 3fbb1feb9ca..e9df38093cb 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -1635,6 +1635,8 @@ mod tests { vec![ (DataCategory::Transaction, 1), (DataCategory::TransactionIndexed, 1), + (DataCategory::Span, 1), + (DataCategory::SpanIndexed, 1), ] ); } @@ -1665,8 +1667,11 @@ mod tests { assert!(!enforcement.event.is_active()); assert!(!enforcement.profiles_indexed.is_active()); assert!(!enforcement.profiles.is_active()); + assert!(!enforcement.spans.is_active()); + assert!(!enforcement.spans_indexed.is_active()); mock.lock().await.assert_call(DataCategory::Transaction, 1); mock.lock().await.assert_call(DataCategory::Profile, 1); + mock.lock().await.assert_call(DataCategory::Span, 1); } #[tokio::test] @@ -1722,6 +1727,8 @@ mod tests { (DataCategory::TransactionIndexed, 1), (DataCategory::Profile, 1), (DataCategory::ProfileIndexed, 1), + (DataCategory::Span, 1), + (DataCategory::SpanIndexed, 1), ] ); } @@ -1769,7 +1776,8 @@ mod tests { vec![ (DataCategory::TransactionIndexed, 1), (DataCategory::Attachment, 10), - (DataCategory::AttachmentItem, 1) + (DataCategory::AttachmentItem, 1), + (DataCategory::SpanIndexed, 1), ] ); } diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 737ac8f911e..6cf3f2faa6c 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -2029,7 +2029,7 @@ def make_envelope(): envelope = make_envelope() upstream.send_envelope(project_id, envelope) - outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=4) + outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=5) outcomes.sort(key=lambda o: sorted(o.items())) assert outcomes == [ From b1a4136e9cb285cfc654653101090e8f47b2144b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 08:26:26 +0100 Subject: [PATCH 27/39] fix integration test --- tests/integration/test_outcome.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 6cf3f2faa6c..132ba48e0cb 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -2029,7 +2029,7 @@ def make_envelope(): envelope = make_envelope() upstream.send_envelope(project_id, envelope) - outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=5) + outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=6) outcomes.sort(key=lambda o: sorted(o.items())) assert outcomes == [ @@ -2048,7 +2048,9 @@ def make_envelope(): (DataCategory.TRANSACTION, "invalid_transaction"), (DataCategory.TRANSACTION_INDEXED, "invalid_transaction"), (DataCategory.SPAN, "invalid_span"), + (DataCategory.SPAN, "invalid_transaction"), (DataCategory.SPAN_INDEXED, "invalid_span"), + (DataCategory.SPAN_INDEXED, "invalid_transaction"), ] ] From a336c2cf5dd6d83a3ebd623a1d4d149eb83e1b62 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 09:27:40 +0100 Subject: [PATCH 28/39] Fix all the tests --- tests/integration/test_metrics.py | 5 ++++- tests/integration/test_outcome.py | 7 +++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index a49b2ce2878..1003ae4e163 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -1157,13 +1157,16 @@ def test_no_transaction_metrics_when_filtered(mini_sentry, relay): relay.send_transaction(project_id, tx) # The only envelopes received should be outcomes for Transaction{,Indexed}: - reports = [mini_sentry.get_client_report() for _ in range(2)] + reports = [mini_sentry.get_client_report() for _ in range(4)] filtered_events = [ outcome for report in reports for outcome in report["filtered_events"] ] filtered_events.sort(key=lambda x: x["category"]) + # NOTE: span categories should be 2. assert filtered_events == [ + {"reason": "release-version", "category": "span", "quantity": 1}, + {"reason": "release-version", "category": "span_indexed", "quantity": 1}, {"reason": "release-version", "category": "transaction", "quantity": 1}, {"reason": "release-version", "category": "transaction_indexed", "quantity": 1}, ] diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 132ba48e0cb..19771fa297c 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -243,7 +243,7 @@ def _send_event(relay, project_id=42, event_type="error", event_id=None, trace_i "type": "trace", } }, - "spans": [], + "spans": [{}], "extra": {"id": event_id}, "environment": "production", "release": "foo@1.2.3", @@ -481,7 +481,10 @@ def test_outcome_forwarding( _send_event(downstream_relay, event_type=event_type) - expected_categories = [1] if event_type == "error" else [2, 9] + # NOTE: This should emit a span count of 2, will be fixed in + # https://github.com/getsentry/relay/pull/5379. + expected_categories = [1] if event_type == "error" else [2, 9, 12, 16] + outcomes = outcomes_consumer.get_outcomes(n=len(expected_categories)) outcomes.sort(key=lambda x: x["category"]) From dd6e9139a2c85765d81773e9e0f7d6ed0d672ff0 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 09:54:30 +0100 Subject: [PATCH 29/39] revert test fixture change --- tests/integration/test_metrics.py | 1 + tests/integration/test_outcome.py | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 1003ae4e163..80759dfd7d7 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -1164,6 +1164,7 @@ def test_no_transaction_metrics_when_filtered(mini_sentry, relay): filtered_events.sort(key=lambda x: x["category"]) # NOTE: span categories should be 2. + # Will be fixed in https://github.com/getsentry/relay/pull/5379. assert filtered_events == [ {"reason": "release-version", "category": "span", "quantity": 1}, {"reason": "release-version", "category": "span_indexed", "quantity": 1}, diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 19771fa297c..82ca5a1859b 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -243,7 +243,7 @@ def _send_event(relay, project_id=42, event_type="error", event_id=None, trace_i "type": "trace", } }, - "spans": [{}], + "spans": [], "extra": {"id": event_id}, "environment": "production", "release": "foo@1.2.3", @@ -481,8 +481,6 @@ def test_outcome_forwarding( _send_event(downstream_relay, event_type=event_type) - # NOTE: This should emit a span count of 2, will be fixed in - # https://github.com/getsentry/relay/pull/5379. expected_categories = [1] if event_type == "error" else [2, 9, 12, 16] outcomes = outcomes_consumer.get_outcomes(n=len(expected_categories)) From 11861ed43c6d546e00ac167941d60bb4039b7737 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 09:58:46 +0100 Subject: [PATCH 30/39] test: update --- tests/integration/test_metrics.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 80759dfd7d7..651694b7b5d 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -1164,10 +1164,9 @@ def test_no_transaction_metrics_when_filtered(mini_sentry, relay): filtered_events.sort(key=lambda x: x["category"]) # NOTE: span categories should be 2. - # Will be fixed in https://github.com/getsentry/relay/pull/5379. assert filtered_events == [ - {"reason": "release-version", "category": "span", "quantity": 1}, - {"reason": "release-version", "category": "span_indexed", "quantity": 1}, + {"reason": "release-version", "category": "span", "quantity": 2}, + {"reason": "release-version", "category": "span_indexed", "quantity": 2}, {"reason": "release-version", "category": "transaction", "quantity": 1}, {"reason": "release-version", "category": "transaction_indexed", "quantity": 1}, ] From 06410b21617bb5a66469806e1ecf9f8117c7570b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 12:22:16 +0100 Subject: [PATCH 31/39] lint --- relay-server/src/processing/transactions/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 7f0e2878f45..85fce3efd40 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -157,7 +157,7 @@ impl Processor for TransactionProcessor { let project_id = work.scoping().project_id; let mut metrics = Metrics::default(); - relay_log::trace!("Epand transaction"); + relay_log::trace!("Expand transaction"); let mut work = process::expand(work)?; relay_log::trace!("Prepare transaction data"); @@ -264,7 +264,7 @@ impl Counted for SerializedTransaction { quantities.extend(attachments.quantities()); quantities.extend(profile.quantities()); - let span_count = (event.span_count() + 1) as usize; + let span_count = event.span_count() + 1; quantities.extend([ (DataCategory::Span, span_count), (DataCategory::SpanIndexed, span_count), From 6991a3a5bac93f0cae2a4a22908731019ac58e23 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 12:44:37 +0100 Subject: [PATCH 32/39] test: more span outcomes --- tests/integration/test_outcome.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 82ca5a1859b..90edf94960f 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -274,6 +274,8 @@ def test_outcomes_non_processing(relay, mini_sentry, event_type): [ DataCategory.TRANSACTION, DataCategory.TRANSACTION_INDEXED, + DataCategory.SPAN, + DataCategory.SPAN_INDEXED, ] if event_type == "transaction" else [DataCategory.ERROR] From 753c2e98d30dd99052531ded55624780385c9dc7 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 13:25:03 +0100 Subject: [PATCH 33/39] fix: duplicate span counting --- relay-server/src/processing/transactions/mod.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 85fce3efd40..3471c3adaf2 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -259,17 +259,11 @@ impl Counted for SerializedTransaction { attachments, profile, } = self; - let mut quantities = event.quantities(); debug_assert!(!event.spans_extracted()); + let mut quantities = event.quantities(); // counts spans based on `span_count` header. quantities.extend(attachments.quantities()); quantities.extend(profile.quantities()); - let span_count = event.span_count() + 1; - quantities.extend([ - (DataCategory::Span, span_count), - (DataCategory::SpanIndexed, span_count), - ]); - quantities } } From 0263642ddd200a322cfc949a2c7e94c7e81964ff Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 13:34:11 +0100 Subject: [PATCH 34/39] fix test assumptions --- relay-server/src/processing/transactions/mod.rs | 10 ++++++++-- tests/integration/test_outcome.py | 16 ++++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 3471c3adaf2..89c0137e520 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -126,10 +126,16 @@ impl Processor for TransactionProcessor { ) -> Option> { let headers = envelope.envelope().headers().clone(); - let transaction = envelope + #[allow(unused_mut)] + let mut event = envelope .envelope_mut() .take_item_by(|item| matches!(*item.ty(), ItemType::Transaction))?; + // Count number of spans by shallow-parsing the event. + // Needed for accounting but not in prod, because the event is immediately parsed afterwards. + #[cfg(debug_assertions)] + event.ensure_span_count(); + let attachments = envelope .envelope_mut() .take_items_by(|item| matches!(*item.ty(), ItemType::Attachment)); @@ -140,7 +146,7 @@ impl Processor for TransactionProcessor { let work = SerializedTransaction { headers, - event: transaction, + event, attachments, profile, }; diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 90edf94960f..7c867f26174 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -2042,18 +2042,18 @@ def make_envelope(): "org_id": 1, "outcome": 3, # Invalid "project_id": 42, - "quantity": 1, + "quantity": quantity, "reason": reason, "source": "pop-relay", "timestamp": time_within_delta(), } - for (category, reason) in [ - (DataCategory.TRANSACTION, "invalid_transaction"), - (DataCategory.TRANSACTION_INDEXED, "invalid_transaction"), - (DataCategory.SPAN, "invalid_span"), - (DataCategory.SPAN, "invalid_transaction"), - (DataCategory.SPAN_INDEXED, "invalid_span"), - (DataCategory.SPAN_INDEXED, "invalid_transaction"), + for (category, quantity, reason) in [ + (DataCategory.TRANSACTION, 1, "invalid_transaction"), + (DataCategory.TRANSACTION_INDEXED, 1, "invalid_transaction"), + (DataCategory.SPAN, 1, "invalid_span"), + (DataCategory.SPAN, 2, "invalid_transaction"), + (DataCategory.SPAN_INDEXED, 1, "invalid_span"), + (DataCategory.SPAN_INDEXED, 2, "invalid_transaction"), ] ] From 747a9a2b6b77309c9aeea801691f79ecead994d2 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 14:11:57 +0100 Subject: [PATCH 35/39] minor stuff --- CHANGELOG.md | 4 +--- relay-server/src/processing/transactions/mod.rs | 8 ++++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10806d5480e..73138728545 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,13 +7,11 @@ - Support comparing release versions without build code. ([#5376](https://github.com/getsentry/relay/pull/5376)) - Support uploading attachments directly to objectstore. ([#5367](https://github.com/getsentry/relay/pull/5367)) - Add `span_count` item header to the envelope protocol. ([#5392](https://github.com/getsentry/relay/pull/5392)) -<<<<<<< HEAD -======= **Internal**: - Derive the rate limiting decision in Relay from consumed quota. ([#5390](https://github.com/getsentry/relay/pull/5390)) ->>>>>>> origin/master +- Use new processor architecture to process transactions. ([#5379](https://github.com/getsentry/relay/pull/5379)) ## 25.11.0 diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 89c0137e520..6279467dbae 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -367,7 +367,7 @@ impl Counted for ExpandedTransaction { fn quantities(&self) -> Quantities { let Self { headers: _, - event: transaction, + event, flags, attachments, profile, @@ -401,7 +401,7 @@ impl Counted for ExpandedTransaction { fn quantities(&self) -> Quantities { let Self { headers: _, - event: transaction, + event, flags, attachments, profile, @@ -453,7 +453,7 @@ impl RateLimited for Managed> { let ExpandedTransaction { headers: _, - event: transaction, + event, flags, attachments, profile, @@ -523,7 +523,7 @@ impl RateLimited for Managed> { let ExpandedTransaction { headers: _, - event: transaction, + event, flags, attachments, profile, From 6f1c486f3993bd3d1e99ff433e2a0ceb34408f59 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 15:17:03 +0100 Subject: [PATCH 36/39] fix bookkeeping --- .../src/processing/transactions/extraction.rs | 12 ++++++------ relay-server/src/processing/transactions/mod.rs | 12 ++++++++++-- tests/integration/fixtures/mini_sentry.py | 1 + 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/relay-server/src/processing/transactions/extraction.rs b/relay-server/src/processing/transactions/extraction.rs index b354e2efe23..33a90e4b736 100644 --- a/relay-server/src/processing/transactions/extraction.rs +++ b/relay-server/src/processing/transactions/extraction.rs @@ -62,7 +62,7 @@ pub fn extract_metrics( if metrics_extracted { debug_assert!(false, "metrics extraction called twice"); - return Ok(EventMetricsExtracted(metrics_extracted)); + return Ok(EventMetricsExtracted(true)); } let Some(event) = event.value_mut() else { // Nothing to extract, but metrics extraction was called. @@ -75,7 +75,7 @@ pub fn extract_metrics( let combined_config = { let config = match &ctx.project_info.config.metric_extraction { ErrorBoundary::Ok(config) if config.is_supported() => config, - _ => return Ok(EventMetricsExtracted(metrics_extracted)), + _ => return Ok(EventMetricsExtracted(false)), }; let global_config = match &ctx.global_config.metric_extraction { ErrorBoundary::Ok(global_config) => global_config, @@ -90,7 +90,7 @@ pub fn extract_metrics( // If there's an error with global metrics extraction, it is safe to assume that this // Relay instance is not up-to-date, and we should skip extraction. relay_log::debug!("Failed to parse global extraction config: {e}"); - return Ok(EventMetricsExtracted(metrics_extracted)); + return Ok(EventMetricsExtracted(false)); } } }; @@ -102,11 +102,11 @@ pub fn extract_metrics( Some(ErrorBoundary::Ok(tx_config)) => tx_config, Some(ErrorBoundary::Err(e)) => { relay_log::debug!("Failed to parse legacy transaction metrics config: {e}"); - return Ok(EventMetricsExtracted(metrics_extracted)); + return Ok(EventMetricsExtracted(false)); } None => { relay_log::debug!("Legacy transaction metrics config is missing"); - return Ok(EventMetricsExtracted(metrics_extracted)); + return Ok(EventMetricsExtracted(false)); } }; @@ -121,7 +121,7 @@ pub fn extract_metrics( } }); - return Ok(EventMetricsExtracted(metrics_extracted)); + return Ok(EventMetricsExtracted(false)); } // If spans were already extracted for an event, we rely on span processing to extract metrics. diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 6279467dbae..5bf5da74468 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -222,7 +222,7 @@ impl Processor for TransactionProcessor { relay_log::trace!("Extract spans"); let mut indexed = process::extract_spans(indexed, ctx, server_sample_rate); - relay_log::trace!("Enforce quotas"); + relay_log::trace!("Enforce quotas (processing)"); self.limiter.enforce_quotas(&mut indexed, ctx).await?; if !indexed.flags.fully_normalized { @@ -233,14 +233,17 @@ impl Processor for TransactionProcessor { ); }; + relay_log::trace!("Done"); return Ok(Output { main: Some(TransactionOutput::Indexed(indexed)), metrics: Some(extracted_metrics), }); } + relay_log::trace!("Enforce quotas"); self.limiter.enforce_quotas(&mut work, ctx).await?; + relay_log::trace!("Done"); Ok(Output { main: Some(TransactionOutput::TotalAndIndexed(work)), metrics: None, @@ -653,7 +656,12 @@ impl Forward for TransactionOutput { .map_err(drop) .with_outcome(Outcome::Invalid(DiscardReason::Internal)) }), - TransactionOutput::Indexed(managed) => managed.try_map(|output, _| { + TransactionOutput::Indexed(managed) => managed.try_map(|output, record_keeper| { + // TODO(follow-up): `Counted` impl of `Box` is wrong. + // But we will send structured data to the store soon instead of an envelope, + // then this problem is circumvented. + record_keeper.lenient(DataCategory::Transaction); + record_keeper.lenient(DataCategory::Span); output .serialize_envelope() .map_err(drop) diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index 1971dd8f7ec..c2270ec72b0 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -179,6 +179,7 @@ def full_project_config(self, project_id, dsn_public_key=None, extra=None): }, "blacklistedIps": ["127.43.33.22"], "trustedRelays": [], + "transactionMetrics": {"version": 3}, }, } From e785f23d537e23c0fdcac0f8d644cc527954c92c Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 16:10:36 +0100 Subject: [PATCH 37/39] ref: transaction at the end --- relay-server/src/processing/transactions/mod.rs | 12 +++++++----- relay-server/src/processing/transactions/spans.rs | 3 ++- relay-server/src/services/store.rs | 1 + 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 5bf5da74468..fa5c4330e3a 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -7,7 +7,7 @@ use relay_base_schema::events::EventType; use relay_dynamic_config::{ErrorBoundary, Feature}; use relay_event_normalization::GeoIpLookup; use relay_event_schema::protocol::{Event, Metrics, SpanV2}; -use relay_protocol::{Annotated, Empty}; +use relay_protocol::{Annotated, Empty, get_value}; use relay_quotas::{DataCategory, RateLimits}; #[cfg(feature = "processing")] use relay_redis::AsyncRedisClient; @@ -335,6 +335,12 @@ impl ExpandedTransaction { } = self; let mut items = smallvec![]; + + items.extend(attachments); + items.extend(profile); + items.extend(extracted_spans.0); + + // To be compatible with previous code, add the transaction at the end: if !event.is_empty() { let data = metric!(timer(RelayTimers::EventProcessingSerialization), { event.to_json()? @@ -348,9 +354,6 @@ impl ExpandedTransaction { items.push(item); } - items.extend(attachments); - items.extend(profile); - items.extend(extracted_spans.0); Ok(Envelope::from_parts(headers, items)) } @@ -681,7 +684,6 @@ impl Forward for TransactionOutput { s: &relay_system::Addr, ctx: ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - // TODO: split out spans into a separate message. let envelope: ManagedEnvelope = self.serialize_envelope(ctx)?.into(); s.send(StoreEnvelope { diff --git a/relay-server/src/processing/transactions/spans.rs b/relay-server/src/processing/transactions/spans.rs index 712a7f1e97f..b10eef4220e 100644 --- a/relay-server/src/processing/transactions/spans.rs +++ b/relay-server/src/processing/transactions/spans.rs @@ -9,7 +9,7 @@ use relay_config::Config; use relay_dynamic_config::GlobalConfig; use relay_event_schema::protocol::{Event, Measurement, Measurements, Span}; use relay_metrics::{FractionUnit, MetricNamespace, MetricUnit}; -use relay_protocol::{Annotated, Empty}; +use relay_protocol::{Annotated, Empty, get_value}; use relay_sampling::DynamicSamplingContext; #[cfg(feature = "processing")] @@ -124,6 +124,7 @@ fn make_span_item( .map_err(|_| ())?; let mut item = create_span_item(span, config)?; + // If metrics extraction happened for the event, it also happened for its spans: item.set_metrics_extracted(metrics_extracted); diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index a47d2389e0f..a6e902ae010 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -271,6 +271,7 @@ impl StoreService { fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> { let mut envelope = managed_envelope.take_envelope(); + let received_at = managed_envelope.received_at(); let scoping = managed_envelope.scoping(); From cf08b4f5600067c572cdfef8a333c884e389f101 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 16:15:00 +0100 Subject: [PATCH 38/39] Update relay-server/src/processing/transactions/process.rs --- relay-server/src/processing/transactions/process.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 6e4a171d569..26fff8e0c35 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -254,8 +254,6 @@ pub fn extract_metrics( )? .0; - // TODO: remove `(SpanIndexed, 0)` from bookkeeping. - // The extracted metrics now take over the "total" data categories. record_keeper.modify_by(DataCategory::Transaction, -1); record_keeper.modify_by( From 5256ccf98ebb583c82335b19ae8b58a7c891c345 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 20 Nov 2025 16:15:05 +0100 Subject: [PATCH 39/39] Update relay-server/src/processing/transactions/mod.rs --- relay-server/src/processing/transactions/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index fa5c4330e3a..a71791c9bb1 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -418,7 +418,6 @@ impl Counted for ExpandedTransaction { let mut quantities = smallvec![(DataCategory::TransactionIndexed, 1),]; if !flags.spans_extracted { // TODO: encode this flag into the type and remove `extracted_spans` from the "BeforeSpanExtraction" type. - // TODO: write span_count header in fast path. debug_assert!(extracted_spans.0.is_empty()); let span_count = self.count_embedded_spans_and_self(); quantities.push((DataCategory::SpanIndexed, span_count));