diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index d260400878..43dcfebe80 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -129,6 +129,9 @@ pub enum Feature { #[doc(hidden)] #[serde(rename = "organizations:indexed-spans-extraction")] DeprecatedExtractSpansFromEvent, + /// Enable the experimental Span Attachment subset of the Span V2 processing pipeline in Relay. + #[serde(rename = "projects:span-v2-attachment-processing")] + SpanV2AttachmentProcessing, /// Forward compatibility. #[doc(hidden)] #[serde(other)] diff --git a/relay-event-schema/src/protocol/attachment_v2.rs b/relay-event-schema/src/protocol/attachment_v2.rs new file mode 100644 index 0000000000..04a9448bf2 --- /dev/null +++ b/relay-event-schema/src/protocol/attachment_v2.rs @@ -0,0 +1,34 @@ +use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, Value}; + +use crate::processor::ProcessValue; +use crate::protocol::{Attributes, Timestamp}; + +use uuid::Uuid; + +/// Metadata for a span attachment. +#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +pub struct AttachmentV2Meta { + /// Unique identifier for this attachment. + #[metastructure(required = true, nonempty = true, trim = false)] + pub attachment_id: Annotated, + + /// Timestamp when the attachment was created. + #[metastructure(required = true, trim = false)] + pub timestamp: Annotated, + + /// Original filename of the attachment. + #[metastructure(pii = "true", max_chars = 256, max_chars_allowance = 40, trim = false)] + pub filename: Annotated, + + /// Content type of the attachment body. + #[metastructure(required = true, max_chars = 128, trim = false)] + pub content_type: Annotated, + + /// Arbitrary attributes on a span attachment. + #[metastructure(pii = "maybe")] + pub attributes: Annotated, + + /// Additional arbitrary fields for forwards compatibility. + #[metastructure(additional_properties, pii = "maybe")] + pub other: Object, +} diff --git a/relay-event-schema/src/protocol/mod.rs b/relay-event-schema/src/protocol/mod.rs index 948d8f6ed3..5f2fdff18a 100644 --- a/relay-event-schema/src/protocol/mod.rs +++ b/relay-event-schema/src/protocol/mod.rs @@ -1,5 +1,6 @@ //! Implements the sentry event protocol. +mod attachment_v2; mod attributes; mod base; mod breadcrumb; @@ -41,6 +42,7 @@ mod utils; #[doc(inline)] pub use relay_base_schema::{events::*, spans::*}; +pub use self::attachment_v2::*; pub use self::attributes::*; pub use self::breadcrumb::*; pub use self::breakdowns::*; diff --git a/relay-server/src/envelope/content_type.rs b/relay-server/src/envelope/content_type.rs index bd081fae01..d64f36d8e4 100644 --- a/relay-server/src/envelope/content_type.rs +++ b/relay-server/src/envelope/content_type.rs @@ -35,6 +35,8 @@ pub enum ContentType { SpanV2Container, /// `application/vnd.sentry.items.trace-metric+json` TraceMetricContainer, + /// `application/vnd.sentry.attachment.v2` + AttachmentV2, /// All integration content types. Integration(Integration), /// Any arbitrary content type not listed explicitly. @@ -57,6 +59,7 @@ impl ContentType { Self::LogContainer => "application/vnd.sentry.items.log+json", Self::SpanV2Container => "application/vnd.sentry.items.span.v2+json", Self::TraceMetricContainer => "application/vnd.sentry.items.trace-metric+json", + Self::AttachmentV2 => "application/vnd.sentry.attachment.v2", Self::Integration(integration) => integration.as_content_type(), Self::Other(other) => other, } @@ -99,6 +102,8 @@ impl ContentType { Some(Self::SpanV2Container) } else if ct.eq_ignore_ascii_case(Self::TraceMetricContainer.as_str()) { Some(Self::TraceMetricContainer) + } else if ct.eq_ignore_ascii_case(Self::AttachmentV2.as_str()) { + Some(Self::AttachmentV2) } else { Integration::from_content_type(ct).map(Self::Integration) } diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 4e318fe843..3f9297ad68 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -6,7 +6,7 @@ use std::ops::AddAssign; use uuid::Uuid; use bytes::Bytes; -use relay_event_schema::protocol::EventType; +use relay_event_schema::protocol::{EventType, SpanId}; use relay_protocol::Value; use relay_quotas::DataCategory; use serde::{Deserialize, Serialize}; @@ -45,6 +45,8 @@ impl Item { fully_normalized: false, profile_type: None, platform: None, + parent_id: None, + meta_length: None, }, payload: Bytes::new(), } @@ -434,6 +436,30 @@ impl Item { self.headers.sampled = sampled; } + /// Returns the length of the item. + pub fn meta_length(&self) -> Option { + self.headers.meta_length + } + + /// Sets the length of the optional meta segment. + /// + /// Only applicable if the item is an attachment. + pub fn set_meta_length(&mut self, meta_length: u32) { + self.headers.meta_length = Some(meta_length); + } + + /// Returns the parent entity that this item is associated with, if any. + /// + /// Only applicable if the item is an attachment. + pub fn parent_id(&self) -> Option<&ParentId> { + self.headers.parent_id.as_ref() + } + + /// Sets the parent entity that this item is associated with. + pub fn set_parent_id(&mut self, parent_id: ParentId) { + self.headers.parent_id = Some(parent_id); + } + /// Returns the specified header value, if present. pub fn get_header(&self, name: &K) -> Option<&Value> where @@ -947,6 +973,18 @@ pub struct ItemHeaders { #[serde(default, skip)] profile_type: Option, + /// Content length of an optional meta segment that might be contained in the item. + /// + /// For the time being such an meta segment is only present for span attachments. + #[serde(skip_serializing_if = "Option::is_none")] + meta_length: Option, + + /// Parent entity that this item is associated with, if any. + /// + /// For the time being only applicable if the item is a span-attachment. + #[serde(flatten, skip_serializing_if = "Option::is_none")] + parent_id: Option, + /// Other attributes for forward compatibility. #[serde(flatten)] other: BTreeMap, @@ -994,6 +1032,18 @@ fn is_true(value: &bool) -> bool { *value } +/// Parent identifier for an attachment-v2. +/// +/// Attachments can be associated with different types of parent entities (only spans for now). +/// +/// SpanId(None) indicates that the item is a span-attachment that is associated with no specific +/// span. +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ParentId { + SpanId(Option), +} + #[cfg(test)] mod tests { use crate::integrations::OtelFormat; diff --git a/relay-server/src/managed/counted.rs b/relay-server/src/managed/counted.rs index 6983353f3b..6f8fdd45fa 100644 --- a/relay-server/src/managed/counted.rs +++ b/relay-server/src/managed/counted.rs @@ -61,6 +61,10 @@ impl Counted for Box { let data = [ (DataCategory::Attachment, summary.attachment_quantity), + ( + DataCategory::AttachmentItem, + summary.attachment_item_quantity, + ), (DataCategory::Profile, summary.profile_quantity), (DataCategory::ProfileIndexed, summary.profile_quantity), (DataCategory::Span, summary.span_quantity), diff --git a/relay-server/src/managed/envelope.rs b/relay-server/src/managed/envelope.rs index 77abc4bde0..6bc0693a59 100644 --- a/relay-server/src/managed/envelope.rs +++ b/relay-server/src/managed/envelope.rs @@ -362,6 +362,14 @@ impl ManagedEnvelope { ); } + if self.context.summary.attachment_item_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::AttachmentItem, + self.context.summary.attachment_item_quantity, + ); + } + if self.context.summary.monitor_quantity > 0 { self.track_outcome( outcome.clone(), diff --git a/relay-server/src/processing/spans/dynamic_sampling.rs b/relay-server/src/processing/spans/dynamic_sampling.rs index 83ed475fff..6899ecc91c 100644 --- a/relay-server/src/processing/spans/dynamic_sampling.rs +++ b/relay-server/src/processing/spans/dynamic_sampling.rs @@ -91,6 +91,7 @@ pub fn validate_dsc(spans: &ExpandedSpans) -> Result<()> { }; for span in &spans.spans { + let span = &span.span; let trace_id = get_value!(span.trace_id); if trace_id != Some(&dsc.trace_id) { @@ -289,17 +290,34 @@ fn create_metrics( /// as the total category is counted from now in in metrics. struct UnsampledSpans { spans: Vec, + attachments: Vec, } impl From for UnsampledSpans { fn from(value: SerializedSpans) -> Self { - Self { spans: value.spans } + Self { + spans: value.spans, + attachments: value.attachments, + } } } impl Counted for UnsampledSpans { fn quantities(&self) -> Quantities { let quantity = outcome_count(&self.spans) as usize; - smallvec::smallvec![(DataCategory::SpanIndexed, quantity),] + let mut quantities = smallvec::smallvec![]; + + if quantity > 0 { + quantities.push((DataCategory::SpanIndexed, quantity)); + } + if !self.attachments.is_empty() { + quantities.push(( + DataCategory::Attachment, + self.attachments.iter().map(Item::len).sum(), + )); + quantities.push((DataCategory::AttachmentItem, self.attachments.len())); + } + + quantities } } diff --git a/relay-server/src/processing/spans/filter.rs b/relay-server/src/processing/spans/filter.rs index 9617759b93..2c4287bc6d 100644 --- a/relay-server/src/processing/spans/filter.rs +++ b/relay-server/src/processing/spans/filter.rs @@ -5,7 +5,7 @@ use relay_protocol::Annotated; use crate::extractors::RequestMeta; use crate::managed::Managed; use crate::processing::Context; -use crate::processing::spans::{Error, ExpandedSpans, Result}; +use crate::processing::spans::{Error, ExpandedSpans, Result, SerializedSpans}; /// Filters standalone spans sent for a project which does not allow standalone span ingestion. pub fn feature_flag(ctx: Context<'_>) -> Result<()> { @@ -15,11 +15,25 @@ pub fn feature_flag(ctx: Context<'_>) -> Result<()> { } } +// Filters span attachments for a project which does not allow for span attachment ingestion. +pub fn feature_flag_attachment( + spans: Managed, + ctx: Context<'_>, +) -> Managed { + spans.map(|mut spans, r| { + if ctx.should_filter(Feature::SpanV2AttachmentProcessing) { + let attachments = std::mem::take(&mut spans.attachments); + r.reject_err(Error::FilterFeatureFlag, attachments); + } + spans + }) +} + /// Applies inbound filters to individual spans. pub fn filter(spans: &mut Managed, ctx: Context<'_>) { spans.retain_with_context( |spans| (&mut spans.spans, spans.headers.meta()), - |span, meta, _| filter_span(span, meta, ctx), + |span, meta, _| filter_span(&span.span, meta, ctx), ); } diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index 48f4d812dc..d1bb1c60c5 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -1,21 +1,24 @@ use std::sync::Arc; +use bytes::Bytes; use either::Either; use relay_event_normalization::GeoIpLookup; use relay_event_schema::processor::ProcessingAction; -use relay_event_schema::protocol::SpanV2; +use relay_event_schema::protocol::{AttachmentV2Meta, SpanId, SpanV2}; use relay_pii::PiiConfigError; +use relay_protocol::Annotated; use relay_quotas::{DataCategory, RateLimits}; use crate::Envelope; use crate::envelope::{ - ContainerItems, ContainerWriteError, EnvelopeHeaders, Item, ItemContainer, ItemType, Items, + ContainerWriteError, ContentType, EnvelopeHeaders, Item, ItemContainer, ItemType, Items, + ParentId, WithHeader, }; use crate::integrations::Integration; use crate::managed::{ Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; -use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter}; +use crate::processing::{self, Context, Forward, Output, QuotaRateLimiter, RateLimited}; use crate::services::outcome::{DiscardReason, Outcome}; mod dynamic_sampling; @@ -131,11 +134,17 @@ impl processing::Processor for SpansProcessor { .take_items_by(|item| matches!(item.integration(), Some(Integration::Spans(_)))) .into_vec(); + let attachments = envelope + .envelope_mut() + .take_items_by(|item| matches!(item.content_type(), Some(ContentType::AttachmentV2))) + .to_vec(); + let work = SerializedSpans { headers, spans, legacy, integrations, + attachments, }; Some(Managed::from_envelope(envelope, work)) } @@ -145,6 +154,7 @@ impl processing::Processor for SpansProcessor { spans: Managed, ctx: Context<'_>, ) -> Result, Rejected> { + let spans = filter::feature_flag_attachment(spans, ctx); filter::feature_flag(ctx).reject(&spans)?; validate::container(&spans).reject(&spans)?; @@ -205,7 +215,10 @@ impl Forward for SpanOutput { } }; - spans.try_map(|spans, _| { + spans.try_map(|spans, r| { + // SpanOutput counts only attachment body (excluding meta), while the serialized item + // body includes both, causing an expected discrepancy. + r.lenient(relay_quotas::DataCategory::Attachment); spans .serialize_envelope() .map_err(drop) @@ -232,6 +245,16 @@ impl Forward for SpanOutput { retention: ctx.retention(|r| r.span.as_ref()), }; + // Explicitly drop standalone attachments before splitting + // They are not stored with indexed spans + let spans = spans.map(|mut inner, record_keeper| { + if !inner.stand_alone_attachments.is_empty() { + let standalone = std::mem::take(&mut inner.stand_alone_attachments); + record_keeper.reject_err(Outcome::Invalid(DiscardReason::Internal), standalone); + } + inner + }); + for span in spans.split(|spans| spans.into_indexed_spans()) { if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) { s.send(span) @@ -256,6 +279,9 @@ pub struct SerializedSpans { /// Spans which Relay received from arbitrary integrations. integrations: Vec, + + /// A list of span attachments. + attachments: Vec, } impl SerializedSpans { @@ -269,19 +295,33 @@ impl SerializedSpans { impl Counted for SerializedSpans { fn quantities(&self) -> Quantities { - let quantity = (outcome_count(&self.spans) + let span_quantity = (outcome_count(&self.spans) + outcome_count(&self.legacy) + outcome_count(&self.integrations)) as usize; - smallvec::smallvec![ - (DataCategory::Span, quantity), - (DataCategory::SpanIndexed, quantity), - ] + let attachment_quantity = self.attachments.iter().map(Item::len).sum(); + + let mut quantities = smallvec::smallvec![]; + + if span_quantity > 0 { + quantities.push((DataCategory::Span, span_quantity)); + quantities.push((DataCategory::SpanIndexed, span_quantity)); + } + + if attachment_quantity > 0 { + quantities.push((DataCategory::Attachment, attachment_quantity)); + quantities.push((DataCategory::AttachmentItem, self.attachments.len())); + } + + quantities } } -impl CountRateLimited for Managed { - type Error = Error; +struct ExpandedSpansQuantities { + span: usize, + span_indexed: usize, + attachment: usize, + attachment_item: usize, } /// Spans which have been parsed and expanded from their serialized state. @@ -293,8 +333,11 @@ pub struct ExpandedSpans { /// Server side applied (dynamic) sample rate. server_sample_rate: Option, - /// Expanded and parsed spans. - spans: ContainerItems, + /// Expanded and parsed spans, with optional associated attachments. + spans: Vec, + + /// Span attachments that are not associated with any one specific span. + stand_alone_attachments: Vec, /// Category of the contained spans. /// @@ -305,20 +348,79 @@ pub struct ExpandedSpans { impl ExpandedSpans { fn serialize_envelope(self) -> Result, ContainerWriteError> { - let mut spans = Vec::new(); + let mut items = Vec::new(); if !self.spans.is_empty() { let mut item = Item::new(ItemType::Span); - ItemContainer::from(self.spans) + let mut spans_without_attachments = Vec::new(); + + for span in self.spans { + if let Some(attachment) = span.attachment { + let span_id = span.span.value().and_then(|s| s.span_id.value().copied()); + items.push(attachment_to_item(attachment, span_id)?); + } + + spans_without_attachments.push(span.span); + } + + ItemContainer::from(spans_without_attachments) .write_to(&mut item) .inspect_err(|err| relay_log::error!("failed to serialize spans: {err}"))?; - spans.push(item); + items.push(item); } - Ok(Envelope::from_parts(self.headers, Items::from_vec(spans))) + for attachment in self.stand_alone_attachments { + items.push(attachment_to_item(attachment, None)?); + } + + Ok(Envelope::from_parts(self.headers, Items::from_vec(items))) + } + + fn quantities_helper(&self) -> ExpandedSpansQuantities { + let quantity = self.spans.len(); + let mut attachment_quantity = 0; + let mut attachment_count = 0; + + for span in &self.spans { + if let Some(attachment) = &span.attachment { + attachment_quantity += attachment.body.len(); + attachment_count += 1; + } + } + for attachment in &self.stand_alone_attachments { + attachment_quantity += attachment.body.len(); + attachment_count += 1; + } + + ExpandedSpansQuantities { + span: quantity, + span_indexed: quantity, + attachment: attachment_quantity, + attachment_item: attachment_count, + } } } +fn attachment_to_item( + attachment: ValidatedSpanAttachment, + span_id: Option, +) -> Result { + let meta_json = attachment.meta.to_json()?; + let meta_bytes = meta_json.into_bytes(); + let meta_length = meta_bytes.len(); + + let mut payload = bytes::BytesMut::with_capacity(meta_length + attachment.body.len()); + payload.extend_from_slice(&meta_bytes); + payload.extend_from_slice(&attachment.body); + + let mut item = Item::new(ItemType::Attachment); + item.set_payload(ContentType::AttachmentV2, payload.freeze()); + item.set_meta_length(meta_length as u32); + item.set_parent_id(ParentId::SpanId(span_id)); + + Ok(item) +} + impl ExpandedSpans { /// Logically transforms contained spans into [`Indexed`]. /// @@ -328,6 +430,7 @@ impl ExpandedSpans { headers, server_sample_rate, spans, + stand_alone_attachments, category: _, } = self; @@ -335,6 +438,7 @@ impl ExpandedSpans { headers, server_sample_rate, spans, + stand_alone_attachments, category: Indexed, } } @@ -365,33 +469,195 @@ pub struct Indexed; impl Counted for ExpandedSpans { fn quantities(&self) -> Quantities { - let quantity = self.spans.len(); - smallvec::smallvec![ - (DataCategory::Span, quantity), - (DataCategory::SpanIndexed, quantity), - ] + let ExpandedSpansQuantities { + span, + span_indexed, + attachment, + attachment_item, + } = self.quantities_helper(); + + let mut quantities = smallvec::smallvec![]; + if span > 0 { + quantities.push((DataCategory::Span, span)); + quantities.push((DataCategory::SpanIndexed, span_indexed)); + } + if attachment > 0 { + quantities.push((DataCategory::Attachment, attachment)); + quantities.push((DataCategory::AttachmentItem, attachment_item)); + } + + quantities } } impl Counted for ExpandedSpans { fn quantities(&self) -> Quantities { - smallvec::smallvec![(DataCategory::SpanIndexed, self.spans.len())] + let ExpandedSpansQuantities { + span: _, + span_indexed, + attachment, + attachment_item, + } = self.quantities_helper(); + + let mut quantities = smallvec::smallvec![]; + if span_indexed > 0 { + quantities.push((DataCategory::SpanIndexed, span_indexed)); + } + if attachment > 0 { + quantities.push((DataCategory::Attachment, attachment)); + quantities.push((DataCategory::AttachmentItem, attachment_item)); + } + + quantities } } -impl CountRateLimited for Managed> { +impl RateLimited for Managed> { type Error = Error; + + async fn enforce( + &mut self, + mut rate_limiter: T, + _: Context<'_>, + ) -> std::result::Result<(), Rejected> + where + T: processing::RateLimiter, + { + let scoping = self.scoping(); + + let ExpandedSpansQuantities { + span, + span_indexed, + attachment, + attachment_item, + } = self.quantities_helper(); + + if span > 0 { + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::Span), span) + .await; + if !limits.is_empty() { + // If there is a span quota reject all the spans and the associated attachments. + return Err(self.reject_err(Error::from(limits))); + } + } + + if span_indexed > 0 { + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::SpanIndexed), span_indexed) + .await; + if !limits.is_empty() { + // If there is a span quota reject all the spans and the associated attachments. + return Err(self.reject_err(Error::from(limits))); + } + } + + if attachment > 0 { + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::Attachment), attachment) + .await; + + if !limits.is_empty() { + self.modify(|this, record_keeper| { + // Reject both stand_alone and associated attachments. + let mut all_attachments = std::mem::take(&mut this.stand_alone_attachments); + all_attachments.extend( + this.spans + .iter_mut() + .filter_map(|span| span.attachment.take()), + ); + + record_keeper.reject_err(Error::from(limits), all_attachments); + }); + } + } + + if attachment_item > 0 { + let limits = rate_limiter + .try_consume(scoping.item(DataCategory::AttachmentItem), attachment_item) + .await; + + if !limits.is_empty() { + self.modify(|this, record_keeper| { + // Reject both stand_alone and associated attachments. + let mut all_attachments = std::mem::take(&mut this.stand_alone_attachments); + all_attachments.extend( + this.spans + .iter_mut() + .filter_map(|span| span.attachment.take()), + ); + + record_keeper.reject_err(Error::from(limits), all_attachments); + }); + } + } + + Ok(()) + } } /// A Span which only represents the indexed category. #[cfg(feature = "processing")] #[derive(Debug)] -struct IndexedSpan(crate::envelope::WithHeader); +struct IndexedSpan(SpanWrapper); #[cfg(feature = "processing")] impl Counted for IndexedSpan { fn quantities(&self) -> Quantities { - smallvec::smallvec![(DataCategory::SpanIndexed, 1)] + let mut quantities = smallvec::smallvec![(DataCategory::SpanIndexed, 1)]; + if let Some(attachment) = &self.0.attachment { + quantities.extend(attachment.quantities()); + } + quantities + } +} + +/// A validated and parsed span attachment. +#[derive(Debug)] +pub struct ValidatedSpanAttachment { + /// The parsed metadata from the attachment. + pub meta: Annotated, + + /// The raw attachment body. + pub body: Bytes, +} + +impl Counted for ValidatedSpanAttachment { + fn quantities(&self) -> Quantities { + smallvec::smallvec![ + (DataCategory::Attachment, self.body.len()), + (DataCategory::AttachmentItem, 1) + ] + } +} + +/// Wrapper around a SpanV2 and an optional associated attachment. +/// +/// Allows for dropping the attachment together with the Span. +#[derive(Debug)] +struct SpanWrapper { + span: WithHeader, + attachment: Option, +} + +impl SpanWrapper { + fn new(span: WithHeader) -> Self { + Self { + span, + attachment: None, + } + } +} + +impl Counted for SpanWrapper { + fn quantities(&self) -> Quantities { + let mut quantities = self.span.quantities(); + + if let Some(attachment) = &self.attachment { + quantities.extend(attachment.quantities()); + } + + quantities } } diff --git a/relay-server/src/processing/spans/process.rs b/relay-server/src/processing/spans/process.rs index 237afc1230..fa8f85f6c7 100644 --- a/relay-server/src/processing/spans/process.rs +++ b/relay-server/src/processing/spans/process.rs @@ -1,16 +1,19 @@ +use std::collections::BTreeMap; use std::time::Duration; use relay_event_normalization::{ GeoIpLookup, RequiredMode, SchemaProcessor, TimestampProcessor, TrimmingProcessor, eap, }; use relay_event_schema::processor::{ProcessingState, ValueType, process_value}; -use relay_event_schema::protocol::{Span, SpanV2}; +use relay_event_schema::protocol::{AttachmentV2Meta, Span, SpanId, SpanV2}; use relay_protocol::Annotated; -use crate::envelope::{ContainerItems, EnvelopeHeaders, Item, ItemContainer, WithHeader}; -use crate::managed::Managed; +use crate::envelope::{ContainerItems, EnvelopeHeaders, Item, ItemContainer, ParentId, WithHeader}; +use crate::managed::{Managed, RecordKeeper}; use crate::processing::Context; -use crate::processing::spans::{self, Error, ExpandedSpans, Result, SampledSpans}; +use crate::processing::spans::{ + self, Error, ExpandedSpans, Result, SampledSpans, SpanWrapper, ValidatedSpanAttachment, +}; use crate::services::outcome::DiscardReason; /// Parses all serialized spans. @@ -35,10 +38,44 @@ pub fn expand(spans: Managed) -> Managed { spans::integrations::expand_into(&mut all_spans, records, spans.inner.integrations); + let mut span_id_mapping: BTreeMap<_, _> = all_spans + .into_iter() + .filter_map(|span| { + if let Some(id) = span.value().and_then(|span| span.span_id.value().copied()) { + return Some((id, SpanWrapper::new(span))); + } + None + }) + .collect(); + + let mut stand_alone_attachments: Vec = Vec::new(); + for attachment in spans.inner.attachments { + match parse_and_validate_span_attachment(&attachment, records) { + Ok((None, attachment)) => { + stand_alone_attachments.push(attachment); + } + Ok((Some(span_id), attachment)) => { + if let Some(entry) = span_id_mapping.get_mut(&span_id) { + entry.attachment = Some(attachment); + } else { + relay_log::debug!("span attachment invalid associated span id"); + records.reject_err( + Error::Invalid(DiscardReason::InvalidSpanAttachment), + attachment, + ); + } + } + Err(err) => { + records.reject_err(err, attachment); + } + } + } + ExpandedSpans { headers: spans.inner.headers, server_sample_rate: spans.server_sample_rate, - spans: all_spans, + spans: span_id_mapping.into_values().collect(), + stand_alone_attachments, category: spans::TotalAndIndexed, } }) @@ -68,12 +105,57 @@ fn expand_legacy_span(item: &Item) -> Result> { Ok(WithHeader::new(span)) } +/// Parses and validates a span attachment, converting it into a structured type. +fn parse_and_validate_span_attachment( + item: &Item, + records: &mut RecordKeeper<'_>, +) -> Result<(Option, ValidatedSpanAttachment)> { + let associated_span_id = match item.parent_id() { + Some(ParentId::SpanId(span_id)) => *span_id, + None => { + relay_log::debug!("span attachment missing associated span id"); + return Err(Error::Invalid(DiscardReason::InvalidSpanAttachment)); + } + }; + + let total_length = item.len(); + let meta_length = item.meta_length().ok_or_else(|| { + relay_log::debug!("span attachment missing meta_length"); + Error::Invalid(DiscardReason::InvalidSpanAttachment) + })? as usize; + + if meta_length > total_length { + relay_log::debug!( + "span attachment meta_length ({}) exceeds total length ({})", + meta_length, + total_length + ); + return Err(Error::Invalid(DiscardReason::InvalidSpanAttachment)); + } + + let payload = item.payload(); + let meta = + Annotated::::from_json_bytes(&payload[..meta_length]).map_err(|err| { + relay_log::debug!("failed to parse span attachment: {err}"); + Error::Invalid(DiscardReason::InvalidJson) + })?; + let body = payload.slice(meta_length..); + + // From here on only count the body of the attachment v2 not its meta data. + records.modify_by( + relay_quotas::DataCategory::Attachment, + -(item.meta_length().unwrap_or(0) as isize), + ); + + Ok((associated_span_id, ValidatedSpanAttachment { meta, body })) +} + /// Normalizes individual spans. pub fn normalize(spans: &mut Managed, geo_lookup: &GeoIpLookup, ctx: Context<'_>) { spans.retain_with_context( |spans| (&mut spans.spans, &spans.headers), |span, headers, _| { - normalize_span(span, headers, geo_lookup, ctx).inspect_err(|err| { + normalize_span(&mut span.span, headers, geo_lookup, ctx).inspect_err(|err| { relay_log::debug!("failed to normalize span: {err}"); }) }, @@ -143,10 +225,15 @@ pub fn scrub(spans: &mut Managed, ctx: Context<'_>) { spans.retain( |spans| &mut spans.spans, |span, _| { - scrub_span(span, ctx) - .inspect_err(|err| relay_log::debug!("failed to scrub pii from span: {err}")) + scrub_span(&mut span.span, ctx) + .inspect_err(|err| relay_log::debug!("failed to scrub pii from span: {err}"))?; + + // TODO: Also scrub the attachment + Ok::<(), Error>(()) }, ); + + // TODO: Also scrub the standalone attachments } fn scrub_span(span: &mut Annotated, ctx: Context<'_>) -> Result<()> { diff --git a/relay-server/src/processing/spans/store.rs b/relay-server/src/processing/spans/store.rs index a80fd5684f..1e70b59ef7 100644 --- a/relay-server/src/processing/spans/store.rs +++ b/relay-server/src/processing/spans/store.rs @@ -33,7 +33,8 @@ pub struct Context { /// Converts a processed [`SpanV2`] into a [Kafka](crate::services::store::Store) compatible format. pub fn convert(span: IndexedSpan, ctx: &Context) -> Result> { - let mut span = required!(span.0.value); + // TODO: We are not doing anything with the attachment here. + let mut span = required!(span.0.span.value); let routing_key = span.trace_id.value().map(|v| *v.deref()); diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 9d654b0104..cce1cceb6a 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -488,6 +488,9 @@ pub enum DiscardReason { /// (Relay) A span is not valid after normalization. InvalidSpan, + /// (Relay) A span attachment that has invalid item headers or attachment meta-data. + InvalidSpanAttachment, + /// (Relay) A required feature is not enabled. FeatureDisabled(Feature), @@ -556,6 +559,7 @@ impl DiscardReason { DiscardReason::InvalidLog => "invalid_log", DiscardReason::InvalidTraceMetric => "invalid_trace_metric", DiscardReason::InvalidSpan => "invalid_span", + DiscardReason::InvalidSpanAttachment => "invalid_span_attachment", DiscardReason::FeatureDisabled(_) => "feature_disabled", DiscardReason::TransactionAttachment => "transaction_attachment", DiscardReason::InvalidCheckIn => "invalid_check_in", diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index c89683f1f7..c798a4fdd4 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -289,10 +289,12 @@ impl ProcessingGroup { Some(Integration::Spans(SpansIntegration::OtelV1 { .. })) ); let is_span = matches!(item.ty(), &ItemType::Span); + let is_span_attachment = matches!(item.content_type(), Some(ContentType::AttachmentV2)); ItemContainer::::is_container(item) || (exp_feature && is_span) || (exp_feature && is_supported_integration) + || (exp_feature && is_span_attachment) }); if !span_v2_items.is_empty() { diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index c3b71f7482..852b227c4c 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -8,7 +8,7 @@ use relay_quotas::{ Scoping, }; -use crate::envelope::{Envelope, Item, ItemType}; +use crate::envelope::{ContentType, Envelope, Item, ItemType, ParentId}; use crate::integrations::Integration; use crate::managed::ManagedEnvelope; use crate::services::outcome::Outcome; @@ -519,6 +519,10 @@ impl Enforcement { // to determine whether an item is limited. match item.ty() { ItemType::Attachment => { + // Drop span attachments if they have a span_id item header and span quota is null. + if matches!(item.content_type(), Some(ContentType::AttachmentV2)) && matches!(item.parent_id(), Some(ParentId::SpanId(_))) && (self.spans_indexed.is_active() || self.spans.is_active()) { + return false; + } if !(self.attachments.is_active() || self.attachment_items.is_active()) { return true; } diff --git a/tests/integration/test_attachmentsv2.py b/tests/integration/test_attachmentsv2.py new file mode 100644 index 0000000000..4b6461335a --- /dev/null +++ b/tests/integration/test_attachmentsv2.py @@ -0,0 +1,709 @@ +from datetime import datetime, timezone +from sentry_sdk.envelope import Envelope, Item, PayloadRef +from sentry_relay.consts import DataCategory +from unittest import mock + +from .asserts import time_within_delta, time_within +from .test_spansv2 import envelope_with_spans + +from .test_dynamic_sampling import _add_sampling_config + +import json +import uuid +import pytest + +TEST_CONFIG = { + "outcomes": { + "emit_outcomes": True, + } +} + + +def create_attachment_metadata(): + return { + "attachment_id": str(uuid.uuid4()), + "timestamp": 1760520026.781239, + "filename": "myfile.txt", + "content_type": "text/plain", + "attributes": { + "foo": {"type": "string", "value": "bar"}, + }, + } + + +def create_attachment_envelope(project_config): + return Envelope( + headers={ + "event_id": "515539018c9b4260a6f999572f1661ee", + "trace": { + "trace_id": "5b8efff798038103d269b633813fc60c", + "public_key": project_config["publicKeys"][0]["publicKey"], + }, + } + ) + + +def test_standalone_attachment_forwarding(mini_sentry, relay): + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + relay = relay(mini_sentry, options=TEST_CONFIG) + + attachment_metadata = create_attachment_metadata() + attachment_body = b"This is some mock attachment content" + metadata_bytes = json.dumps(attachment_metadata, separators=(",", ":")).encode( + "utf-8" + ) + combined_payload = metadata_bytes + attachment_body + + envelope = create_attachment_envelope(project_config) + headers = { + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": None, + "length": len(combined_payload), + "type": "attachment", + } + + attachment_item = Item(payload=PayloadRef(bytes=combined_payload), headers=headers) + envelope.add_item(attachment_item) + relay.send_envelope(project_id, envelope) + + forwarded_envelope = mini_sentry.captured_events.get(timeout=1) + attachment_item = forwarded_envelope.items[0] + assert attachment_item.type == "attachment" + + meta_length = attachment_item.headers.get("meta_length") + payload = attachment_item.payload.bytes + + metadata_part = json.loads(payload[:meta_length].decode("utf-8")) + body_part = payload[meta_length:] + + # Things send in should match the things coming out + assert metadata_part == attachment_metadata + assert body_part == attachment_body + assert attachment_item.headers == headers + + +@pytest.mark.parametrize( + "invalid_headers,quantity", + [ + # Invalid since there is no span with that id in the envelope, also the quantity here is + # lower since only the body is already counted at this point and not the meta. + pytest.param({"span_id": "ABCDFDEAD5F74052"}, 36, id="invalid_span_id"), + pytest.param({"meta_length": None}, 227, id="missing_meta_length"), + pytest.param({"meta_length": 999}, 227, id="meta_length_exceeds_payload"), + ], +) +def test_invalid_item_headers(mini_sentry, relay, invalid_headers, quantity): + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + relay = relay(mini_sentry, options=TEST_CONFIG) + + attachment_metadata = create_attachment_metadata() + attachment_body = b"This is some mock attachment content" + metadata_bytes = json.dumps(attachment_metadata, separators=(",", ":")).encode( + "utf-8" + ) + combined_payload = metadata_bytes + attachment_body + + envelope = create_attachment_envelope(project_config) + headers = { + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": None, + "length": len(combined_payload), + "type": "attachment", + } + headers.update(invalid_headers) # Apply invalid values + + envelope.add_item(Item(payload=PayloadRef(bytes=combined_payload), headers=headers)) + relay.send_envelope(project_id, envelope) + + assert mini_sentry.get_outcomes(n=2, timeout=1) == [ + { + "category": DataCategory.ATTACHMENT.value, + "org_id": 1, + "outcome": 3, + "key_id": 123, + "project_id": 42, + "reason": "invalid_span_attachment", + "quantity": quantity, + "timestamp": time_within_delta(), + }, + { + "category": DataCategory.ATTACHMENT_ITEM.value, + "org_id": 1, + "outcome": 3, + "key_id": 123, + "project_id": 42, + "reason": "invalid_span_attachment", + "quantity": 1, + "timestamp": time_within_delta(), + }, + ] + + assert mini_sentry.captured_events.empty() + + +# Tests taken from test_spansv2.py but modified to include span attachments +def test_attachment_with_matching_span(mini_sentry, relay): + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + relay = relay(mini_sentry, options=TEST_CONFIG) + + ts = datetime.now(timezone.utc) + span_id = "eee19b7ec3c1b174" + trace_id = "5b8efff798038103d269b633813fc60c" + envelope = envelope_with_spans( + { + "start_timestamp": ts.timestamp(), + "end_timestamp": ts.timestamp() + 0.5, + "trace_id": trace_id, + "span_id": span_id, + "is_segment": True, + "name": "test span", + "status": "ok", + }, + trace_info={ + "trace_id": trace_id, + "public_key": project_config["publicKeys"][0]["publicKey"], + }, + ) + + metadata = create_attachment_metadata() + body = b"span attachment content" + metadata_bytes = json.dumps(metadata, separators=(",", ":")).encode("utf-8") + combined_payload = metadata_bytes + body + + envelope.add_item( + Item( + payload=PayloadRef(bytes=combined_payload), + type="attachment", + headers={ + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": span_id, + "length": len(combined_payload), + }, + ) + ) + + relay.send_envelope(project_id, envelope) + forwarded = mini_sentry.captured_events.get(timeout=5) + + span_item = next(i for i in forwarded.items if i.type == "span") + spans = json.loads(span_item.payload.bytes.decode())["items"] + assert spans == [ + { + "trace_id": trace_id, + "span_id": span_id, + "name": "test span", + "status": "ok", + "is_segment": True, + "start_timestamp": time_within(ts), + "end_timestamp": time_within(ts.timestamp() + 0.5), + "attributes": mock.ANY, + } + ] + + attachment = next(i for i in forwarded.items if i.type == "attachment") + assert attachment.payload.bytes == combined_payload + assert attachment.headers == { + "type": "attachment", + "length": 214, + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": 191, + "span_id": span_id, + } + + +@pytest.mark.parametrize( + "rule_type", + ["project", "trace"], +) +def test_span_attachment_ds_drop(mini_sentry, relay, rule_type): + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + # A transaction rule should never apply. + _add_sampling_config(project_config, sample_rate=1, rule_type="transaction") + # Setup the actual rule we want to test against. + _add_sampling_config(project_config, sample_rate=0, rule_type=rule_type) + + relay = relay(mini_sentry, options=TEST_CONFIG) + + span_id = "eee19b7ec3c1b174" + ts = datetime.now(timezone.utc) + envelope = envelope_with_spans( + { + "start_timestamp": ts.timestamp(), + "end_timestamp": ts.timestamp() + 0.5, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": span_id, + "is_segment": False, + "name": "some op", + "attributes": {"foo": {"value": "bar", "type": "string"}}, + }, + trace_info={ + "trace_id": "5b8efff798038103d269b633813fc60c", + "public_key": project_config["publicKeys"][0]["publicKey"], + "transaction": "tx_from_root", + }, + ) + + metadata = create_attachment_metadata() + body = b"span attachment content" + metadata_bytes = json.dumps(metadata, separators=(",", ":")).encode("utf-8") + combined_payload = metadata_bytes + body + + envelope.add_item( + Item( + payload=PayloadRef(bytes=combined_payload), + type="attachment", + headers={ + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": span_id, + "length": len(combined_payload), + }, + ) + ) + + relay.send_envelope(project_id, envelope) + + assert mini_sentry.get_outcomes(n=3, timeout=3) == [ + { + "timestamp": time_within_delta(), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, + "reason": "Sampled:0", + "category": DataCategory.ATTACHMENT.value, + "quantity": len(combined_payload), + }, + { + "timestamp": time_within_delta(), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, + "reason": "Sampled:0", + "category": DataCategory.SPAN_INDEXED.value, + "quantity": 1, + }, + { + "timestamp": time_within_delta(), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, + "reason": "Sampled:0", + "category": DataCategory.ATTACHMENT_ITEM.value, + "quantity": 1, + }, + ] + + assert mini_sentry.get_metrics() == [ + { + "metadata": mock.ANY, + "name": "c:spans/count_per_root_project@none", + "tags": { + "decision": "drop", + "target_project_id": "42", + "transaction": "tx_from_root", + }, + "timestamp": time_within_delta(), + "type": "c", + "value": 1.0, + "width": 1, + }, + { + "metadata": mock.ANY, + "name": "c:spans/usage@none", + "timestamp": time_within_delta(), + "type": "c", + "value": 1.0, + "width": 1, + }, + ] + + assert mini_sentry.captured_events.empty() + assert mini_sentry.captured_outcomes.empty() + + +def test_attachments_dropped_with_span_inbound_filters(mini_sentry, relay): + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + + project_config["config"]["filterSettings"] = { + "releases": {"releases": ["foobar@1.0"]} + } + + relay = relay(mini_sentry, options=TEST_CONFIG) + + ts = datetime.now(timezone.utc) + span_id = "eee19b7ec3c1b175" + envelope = envelope_with_spans( + { + "start_timestamp": ts.timestamp(), + "end_timestamp": ts.timestamp() + 0.5, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": span_id, + "is_segment": False, + "name": "some op", + "status": "ok", + "attributes": { + "some_integer": {"value": 123, "type": "integer"}, + "sentry.release": {"value": "foobar@1.0", "type": "string"}, + "sentry.segment.name": {"value": "/foo/healthz", "type": "string"}, + }, + }, + trace_info={ + "trace_id": "5b8efff798038103d269b633813fc60c", + "public_key": project_config["publicKeys"][0]["publicKey"], + }, + ) + + headers = None + metadata = create_attachment_metadata() + body = b"span attachment content" + metadata_bytes = json.dumps(metadata, separators=(",", ":")).encode("utf-8") + combined_payload = metadata_bytes + body + + envelope.add_item( + Item( + payload=PayloadRef(bytes=combined_payload), + type="attachment", + headers={ + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": span_id, + "length": len(combined_payload), + }, + ) + ) + + relay.send_envelope(project_id, envelope, headers=headers) + assert mini_sentry.get_outcomes(n=4, timeout=3) == [ + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, + "reason": "release-version", + "category": DataCategory.ATTACHMENT.value, + "quantity": 23, + }, + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, + "reason": "release-version", + "category": DataCategory.SPAN.value, + "quantity": 1, + }, + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, + "reason": "release-version", + "category": DataCategory.SPAN_INDEXED.value, + "quantity": 1, + }, + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, + "reason": "release-version", + "category": DataCategory.ATTACHMENT_ITEM.value, + "quantity": 1, + }, + ] + + assert mini_sentry.captured_events.empty() + + +def test_attachment_dropped_with_invalid_spans(mini_sentry, relay): + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + relay = relay(mini_sentry, options=TEST_CONFIG) + + ts = datetime.now(timezone.utc) + span_id = "eee19b7ec3c1b174" + envelope = envelope_with_spans( + { + "start_timestamp": ts.timestamp(), + "end_timestamp": ts.timestamp() + 0.5, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": span_id, + "is_segment": True, + "name": None, # Should be none-empty hence invalid + "status": "ok", + }, + trace_info={ + "trace_id": "5b8efff798038103d269b633813fc60c", + "public_key": project_config["publicKeys"][0]["publicKey"], + }, + ) + + metadata = create_attachment_metadata() + body = b"span attachment content" + metadata_bytes = json.dumps(metadata, separators=(",", ":")).encode("utf-8") + combined_payload = metadata_bytes + body + + envelope.add_item( + Item( + payload=PayloadRef(bytes=combined_payload), + type="attachment", + headers={ + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": span_id, + "length": len(combined_payload), + }, + ) + ) + + relay.send_envelope(project_id, envelope) + assert mini_sentry.get_outcomes(n=4, timeout=3) == [ + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 3, + "reason": "no_data", + "category": DataCategory.ATTACHMENT.value, + "quantity": 23, + }, + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 3, + "reason": "no_data", + "category": DataCategory.SPAN.value, + "quantity": 1, + }, + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 3, + "reason": "no_data", + "category": DataCategory.SPAN_INDEXED.value, + "quantity": 1, + }, + { + "timestamp": time_within_delta(ts), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 3, + "reason": "no_data", + "category": DataCategory.ATTACHMENT_ITEM.value, + "quantity": 1, + }, + ] + + assert mini_sentry.captured_events.empty() + + +@pytest.mark.parametrize( + "quota_config,expected_outcomes", + [ + pytest.param( + [ + { + "categories": ["span_indexed"], + "limit": 0, + "window": 3600, + "id": "span_limit", + "reasonCode": "span_quota_exceeded", + } + ], + { + # Rate limit spans + (DataCategory.SPAN.value, 2): 1, + (DataCategory.SPAN_INDEXED.value, 2): 1, + # Rate limit associated span attachments + (DataCategory.ATTACHMENT.value, 2): 64, + (DataCategory.ATTACHMENT_ITEM.value, 2): 2, + }, + id="span_quota_exceeded", + ), + pytest.param( + [ + { + "categories": ["attachment"], + "limit": 0, + "window": 3600, + "id": "attachment_limit", + "reasonCode": "attachment_quota_exceeded", + } + ], + { + # Attachments don't make it through + (DataCategory.ATTACHMENT.value, 2): 446, + (DataCategory.ATTACHMENT_ITEM.value, 2): 2, + }, + id="attachment_quota_exceeded", + ), + pytest.param( + [ + { + "categories": ["span_indexed"], + "limit": 0, + "window": 3600, + "id": "span_limit", + "reasonCode": "span_quota_exceeded", + }, + { + "categories": ["attachment"], + "limit": 0, + "window": 3600, + "id": "attachment_limit", + "reasonCode": "attachment_quota_exceeded", + }, + ], + { + # Nothing makes it through + (DataCategory.SPAN.value, 2): 1, + (DataCategory.SPAN_INDEXED.value, 2): 1, + (DataCategory.ATTACHMENT.value, 2): 446, + (DataCategory.ATTACHMENT_ITEM.value, 2): 2, + }, + id="both_quotas_exceeded", + ), + ], +) +def test_span_attachment_independent_rate_limiting( + mini_sentry, + relay, + outcomes_consumer, + quota_config, + expected_outcomes, +): + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + project_config["config"]["quotas"] = quota_config + + relay = relay(mini_sentry, options=TEST_CONFIG) + outcomes_consumer = outcomes_consumer() + + ts = datetime.now(timezone.utc) + span_id = "eee19b7ec3c1b174" + trace_id = "5b8efff798038103d269b633813fc60c" + + envelope = envelope_with_spans( + { + "start_timestamp": ts.timestamp(), + "end_timestamp": ts.timestamp() + 0.5, + "trace_id": trace_id, + "span_id": span_id, + "is_segment": True, + "name": "test span", + "status": "ok", + }, + trace_info={ + "trace_id": trace_id, + "public_key": project_config["publicKeys"][0]["publicKey"], + }, + ) + + per_span_metadata = create_attachment_metadata() + per_span_body = b"per-span attachment" + per_span_metadata_bytes = json.dumps( + per_span_metadata, separators=(",", ":") + ).encode("utf-8") + per_span_payload = per_span_metadata_bytes + per_span_body + + envelope.add_item( + Item( + payload=PayloadRef(bytes=per_span_payload), + type="attachment", + headers={ + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(per_span_metadata_bytes), + "span_id": span_id, + "length": len(per_span_payload), + }, + ) + ) + + standalone_metadata = create_attachment_metadata() + standalone_body = b"standalone attachment - should be independent" + standalone_metadata_bytes = json.dumps( + standalone_metadata, separators=(",", ":") + ).encode("utf-8") + standalone_payload = standalone_metadata_bytes + standalone_body + + envelope.add_item( + Item( + payload=PayloadRef(bytes=standalone_payload), + type="attachment", + headers={ + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(standalone_metadata_bytes), + "span_id": None, # Not attached to any span + "length": len(standalone_payload), + }, + ) + ) + + relay.send_envelope(project_id, envelope) + + outcomes = mini_sentry.get_outcomes(n=len(expected_outcomes), timeout=3) + outcome_counter = {} + for outcome in outcomes: + key = (outcome["category"], outcome["outcome"]) + outcome_counter[key] = outcome_counter.get(key, 0) + outcome["quantity"] + + assert outcome_counter == expected_outcomes + + outcomes_consumer.assert_empty() diff --git a/tests/integration/test_playstation.py b/tests/integration/test_playstation.py index 175150bc1f..575fe39995 100644 --- a/tests/integration/test_playstation.py +++ b/tests/integration/test_playstation.py @@ -217,9 +217,38 @@ def test_playstation_no_feature_flag( # Get these outcomes since the feature flag is not enabled: outcomes = outcomes_consumer.get_outcomes() - assert len(outcomes) == 2 - assert outcomes[0]["reason"] == "feature_disabled" - assert outcomes[1]["reason"] == "feature_disabled" + assert outcomes == [ + { + "timestamp": time_within_delta(), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 3, + "reason": "feature_disabled", + "category": 1, + "quantity": 1, + }, + { + "timestamp": time_within_delta(), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 3, + "reason": "feature_disabled", + "category": 4, + "quantity": 209385, + }, + { + "timestamp": time_within_delta(), + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 3, + "reason": "feature_disabled", + "category": 22, + "quantity": 1, + }, + ] def test_playstation_wrong_file( diff --git a/tests/integration/test_spansv2.py b/tests/integration/test_spansv2.py index 957e77eff0..e179369233 100644 --- a/tests/integration/test_spansv2.py +++ b/tests/integration/test_spansv2.py @@ -686,7 +686,7 @@ def test_spans_v2_dsc_validations( "start_timestamp": ts.timestamp(), "end_timestamp": ts.timestamp() + 0.5, "trace_id": "33333333333333333333333333333333", - "span_id": "eee19b7ec3c1b175", + "span_id": "eee19b7ec3c1b176", "is_segment": False, "name": "some op", "status": "ok", @@ -968,6 +968,15 @@ def test_spansv2_attribute_normalization( def test_invalid_spans(mini_sentry, relay): + + def span_id(): + counter = 1 + while True: + yield f"{counter:016x}" + counter += 1 + + span_id = span_id() + """ A test asserting proper outcomes are emitted for invalid spans missing required attributes. """ @@ -993,22 +1002,25 @@ def test_invalid_spans(mini_sentry, relay): # Need to exclude the `trace_id`, since this one is fundamentally required # for DSC validations. Envelopes with mismatching DSCs are entirely rejected. - required_keys = valid_span.keys() - {"trace_id"} - nonempty_keys = valid_span.keys() - {"trace_id", "name", "status"} + required_keys = valid_span.keys() - {"trace_id", "span_id"} + nonempty_keys = valid_span.keys() - {"trace_id", "name", "status", "span_id"} invalid_spans = [] for key in required_keys: invalid_span = valid_span.copy() + invalid_span["span_id"] = next(span_id) del invalid_span[key] invalid_spans.append(invalid_span) for key in required_keys: invalid_span = valid_span.copy() + invalid_span["span_id"] = next(span_id) invalid_span[key] = None invalid_spans.append(invalid_span) for key in nonempty_keys: invalid_span = valid_span.copy() + invalid_span["span_id"] = next(span_id) invalid_span[key] = "" invalid_spans.append(invalid_span) @@ -1029,9 +1041,9 @@ def test_invalid_spans(mini_sentry, relay): "org_id": 1, "outcome": 3, "project_id": 42, - "reason": "timestamp", + "reason": "no_data", "timestamp": time_within_delta(), - "quantity": 6, + "quantity": 4, }, { "category": DataCategory.SPAN.value, @@ -1039,9 +1051,9 @@ def test_invalid_spans(mini_sentry, relay): "org_id": 1, "outcome": 3, "project_id": 42, - "reason": "no_data", + "reason": "timestamp", "timestamp": time_within_delta(), - "quantity": 7, + "quantity": 6, }, { "category": DataCategory.SPAN_INDEXED.value, @@ -1049,9 +1061,9 @@ def test_invalid_spans(mini_sentry, relay): "org_id": 1, "outcome": 3, "project_id": 42, - "reason": "timestamp", + "reason": "no_data", "timestamp": time_within_delta(), - "quantity": 6, + "quantity": 4, }, { "category": DataCategory.SPAN_INDEXED.value, @@ -1059,9 +1071,9 @@ def test_invalid_spans(mini_sentry, relay): "org_id": 1, "outcome": 3, "project_id": 42, - "reason": "no_data", + "reason": "timestamp", "timestamp": time_within_delta(), - "quantity": 7, + "quantity": 6, }, ]