Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions relay-server/src/endpoints/batch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub async fn handle(
ReceivedAt(received_at): ReceivedAt,
body: SignedBytes,
) -> impl IntoResponse {
// TODO: Internal and proxy will be mutual exclusive (maybe)
if !body.relay.internal {
return StatusCode::FORBIDDEN.into_response();
}
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ fn queue_envelope(
) -> Result<(), BadStoreRequest> {
let envelope = managed_envelope.envelope_mut();

// TODO: See if we can get rid of this (see if it can happen in this PR or the next)
if state.config().relay_mode() != RelayMode::Proxy {
// Remove metrics from the envelope and queue them directly on the project's `Aggregator`.
// In proxy mode, we cannot aggregate metrics because we may not have a project ID.
Expand Down Expand Up @@ -359,6 +360,7 @@ pub async fn handle_envelope(

queue_envelope(state, managed_envelope)?;

// TODO: Check this again to understand the flow
if checked.rate_limits.is_limited() {
// Even if some envelope items have been queued, there might be active rate limits on
// other items. Communicate these rate limits to the downstream (Relay or SDK).
Expand Down
63 changes: 42 additions & 21 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOut
use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{
self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
ProxyProcessorService,
};
use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
use crate::services::projects::source::ProjectSource;
Expand Down Expand Up @@ -181,7 +182,11 @@ impl ServiceState {

// Create an address for the `EnvelopeProcessor`, which can be injected into the
// other services.
let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
let (processor, processor_rx) = match config.relay_mode() {
relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
};

let outcome_producer = services.start(OutcomeProducerService::create(
config.clone(),
upstream_relay.clone(),
Expand Down Expand Up @@ -247,28 +252,44 @@ impl ServiceState {
.map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));

let processor_pool = create_processor_pool(&config)?;
services.start_with(
EnvelopeProcessorService::new(
processor_pool.clone(),
config.clone(),
global_config_handle,
project_cache_handle.clone(),
cogs,
#[cfg(feature = "processing")]
redis_clients.clone(),
processor::Addrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
aggregator: aggregator.clone(),

// TODO: This can probably be done more elegantly
match config.relay_mode() {
relay_config::RelayMode::Proxy => services.start_with(
ProxyProcessorService::new(
processor_pool.clone(),
config.clone(),
project_cache_handle.clone(),
processor::ProxyAddrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
},
),
processor_rx,
),
relay_config::RelayMode::Managed => services.start_with(
EnvelopeProcessorService::new(
processor_pool.clone(),
config.clone(),
global_config_handle,
project_cache_handle.clone(),
cogs,
#[cfg(feature = "processing")]
global_rate_limits,
},
metric_outcomes.clone(),
redis_clients.clone(),
processor::Addrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
aggregator: aggregator.clone(),
#[cfg(feature = "processing")]
global_rate_limits,
},
metric_outcomes.clone(),
),
processor_rx,
),
processor_rx,
);
}

let envelope_buffer = PartitionedEnvelopeBuffer::create(
config.spool_partitions(),
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ impl FromMessage<MergeBuckets> for Aggregator {
/// - If flushing fails and should be retried at a later time, respond with `Err` containing the
/// failed buckets. They will be merged back into the aggregator and flushed at a later time.
#[derive(Clone, Debug)]
// TODO: Check when we write into the aggregator (see if this can happen in Proxy Mode) -> Double check that no one writes into this. (Think about ProxyAggregator or just forwarding)
// Since than we are just very defensive in the setup.
pub struct FlushBuckets {
/// The partition to which the buckets belong.
pub partition_key: u32,
Expand Down
186 changes: 186 additions & 0 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ pub struct ProcessMetrics {
pub enum MetricData {
/// Raw data, unparsed envelope items.
Raw(Vec<Item>),
// FIXME: This might needs to be serialized.
/// Already parsed buckets but unprocessed.
Parsed(Vec<Bucket>),
}
Expand Down Expand Up @@ -1146,6 +1147,12 @@ pub struct EnvelopeProcessorService {
inner: Arc<InnerProcessor>,
}

// TODO: Add documentation
#[derive(Clone)]
pub struct ProxyProcessorService {
inner: Arc<InnerProxyProcessor>,
}

/// Contains the addresses of services that the processor publishes to.
pub struct Addrs {
pub outcome_aggregator: Addr<TrackOutcome>,
Expand All @@ -1171,6 +1178,21 @@ impl Default for Addrs {
}
}

/// Contains the addresses of services that the proxy-processor publishes to.
pub struct ProxyAddrs {
pub outcome_aggregator: Addr<TrackOutcome>,
pub upstream_relay: Addr<UpstreamRelay>,
}

impl Default for ProxyAddrs {
fn default() -> Self {
ProxyAddrs {
outcome_aggregator: Addr::dummy(),
upstream_relay: Addr::dummy(),
}
}
}

struct InnerProcessor {
pool: EnvelopeProcessorServicePool,
config: Arc<Config>,
Expand All @@ -1194,6 +1216,13 @@ struct Processing {
spans: SpansProcessor,
}

struct InnerProxyProcessor {
pool: EnvelopeProcessorServicePool,
config: Arc<Config>,
project_cache: ProjectCacheHandle,
addrs: ProxyAddrs,
}

impl EnvelopeProcessorService {
/// Creates a multi-threaded envelope processor.
#[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
Expand Down Expand Up @@ -2444,6 +2473,8 @@ impl EnvelopeProcessorService {
}
}

// TODO: Better understand what is going on here.

/// Processes the envelope and returns the processed envelope back.
///
/// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
Expand All @@ -2461,6 +2492,8 @@ impl EnvelopeProcessorService {
..
} = message;

// TODO: Not entirely sure why the envelope can't have the project_id for example, but I guess we defo want to avoid this though

// Prefer the project's project ID, and fall back to the stated project id from the
// envelope. The project ID is available in all modes, other than in proxy mode, where
// envelopes for unknown projects are forwarded blindly.
Expand Down Expand Up @@ -2498,6 +2531,7 @@ impl EnvelopeProcessorService {
}
});

// TODO: This we want to skip for sure
let result = match self.process_envelope(cogs, project_id, message).await {
Ok(ProcessingResult::Envelope {
mut managed_envelope,
Expand Down Expand Up @@ -2588,6 +2622,7 @@ impl EnvelopeProcessorService {
reservoir_counters: Arc::clone(&message.reservoir_counters),
};

// TODO: Wonder if we can just skip this in its entirety :thinking:
let result = metric!(
timer(RelayTimers::EnvelopeProcessingTime),
group = group.variant(),
Expand Down Expand Up @@ -3383,6 +3418,157 @@ impl Service for EnvelopeProcessorService {
}
}

// TODO: Add documentation
impl ProxyProcessorService {
/// Creates a multi-threaded proxy processor.
pub fn new(
pool: EnvelopeProcessorServicePool,
config: Arc<Config>,
project_cache: ProjectCacheHandle,
addrs: ProxyAddrs,
) -> Self {
Self {
inner: Arc::new(InnerProxyProcessor {
pool,
project_cache,
addrs,
config,
}),
}
}

async fn handle_process_envelope(&self, message: ProcessEnvelope) {
let wait_time = message.envelope.age();
metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);

let scoping = message.envelope.scoping();
for (_, envelope) in ProcessingGroup::split_envelope(
*message.envelope.into_envelope(),
&message.project_info,
) {
let mut envelope =
ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
envelope.scope(scoping);

// FIXME: This might be too naive
self.submit_upstream(envelope);
}
}

fn handle_process_metrics(&self, _message: ProcessMetrics) {
// TODO: Add logic to forward the ProcessMetrics
// TODO: Think about if outcomes should be emitted for this
relay_log::error!("internal error: Metrics not supported in Proxy mode");
}

fn handle_process_batched_metrics(&self, _message: ProcessBatchedMetrics) {
// TODO: Add logic to forward the ProcessBatchedMetrics
// TODO: Think about if outcomes should be emitted for this
relay_log::error!("internal error: Metrics not supported in Proxy mode");
}

async fn handle_flush_buckets(&self, mut _message: FlushBuckets) {
// TODO: Add logic to forward the FlushBuckets
// TODO: Think about if outcomes should be emitted for this
relay_log::error!("internal error: Metrics not supported in Proxy mode");
}

fn handle_submit_client_reports(&self, message: SubmitClientReports) {
let SubmitClientReports {
client_reports,
scoping,
} = message;

let upstream = self.inner.config.upstream_descriptor();
let dsn = PartialDsn::outbound(&scoping, upstream);

let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
for client_report in client_reports {
let mut item = Item::new(ItemType::ClientReport);
item.set_payload(ContentType::Json, client_report.serialize().unwrap()); // TODO: unwrap OK?
envelope.add_item(item);
}

let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
self.submit_upstream(envelope);
}

fn submit_upstream(&self, envelope: ManagedEnvelope) {
let mut envelope = envelope;

if envelope.envelope_mut().is_empty() {
envelope.accept();
return;
}

relay_log::trace!("sending envelope to sentry endpoint");
let http_encoding = self.inner.config.http_encoding();
let result = envelope.envelope().to_vec().and_then(|v| {
encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
});

match result {
Ok(body) => {
self.inner
.addrs
.upstream_relay
.send(SendRequest(SendEnvelope {
envelope: envelope.into_processed(),
body,
http_encoding,
project_cache: self.inner.project_cache.clone(),
}));
}
Err(error) => {
// Errors are only logged for what we consider an internal discard reason. These
// indicate errors in the infrastructure or implementation bugs.
relay_log::error!(
error = &error as &dyn Error,
tags.project_key = %envelope.scoping().project_key,
"failed to serialize envelope payload"
);

envelope.reject(Outcome::Invalid(DiscardReason::Internal));
}
}
}

async fn handle_message(&self, message: EnvelopeProcessor) {
let ty = message.variant();

metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
match message {
EnvelopeProcessor::ProcessEnvelope(m) => self.handle_process_envelope(*m).await,
EnvelopeProcessor::ProcessProjectMetrics(m) => self.handle_process_metrics(*m),
EnvelopeProcessor::ProcessBatchedMetrics(m) => {
self.handle_process_batched_metrics(*m)
}
EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
}
});
}
}

impl Service for ProxyProcessorService {
type Interface = EnvelopeProcessor;

async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
while let Some(message) = rx.recv().await {
let service = self.clone();
self.inner
.pool
.spawn_async(
async move {
service.handle_message(message).await;
}
.boxed(),
)
.await;
}
}
}

/// Result of the enforcement of rate limiting.
///
/// If the event is already `None` or it's rate limited, it will be `None`
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/projects/cache/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl<'a> Project<'a> {
let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
let current_limits = Arc::clone(&current_limits);
// FIXME: Write test that checks that this still works like it should in (very important to still check this.)[ see if there is already one just in case ]
async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
});

Expand Down
9 changes: 7 additions & 2 deletions tests/integration/test_ephemeral_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def test_store_via_ephemeral_relay(
project_config["config"]["trustedRelays"] = list(relay.iter_public_keys())
print(project_config["config"]["trustedRelays"])

relay.send_event(project_id)
raw_payload = {"message": "Hello, World!"}
relay.send_event(project_id, payload=raw_payload)
event = mini_sentry.captured_events.get(timeout=1).get_event()
assert event["logentry"]["formatted"] == "Hello, World!"

if mode == "proxy":
assert event == raw_payload
else:
assert event["logentry"]["formatted"] == "Hello, World!"
3 changes: 3 additions & 0 deletions tests/integration/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,6 @@ def test_replay_allowed(mini_sentry, relay):

# Does not raise queue.Empty
envelope = mini_sentry.captured_events.get(timeout=10)


# TODO: Extend the logic here.
Loading
Loading