diff --git a/changelog.d/datadog-agent-timeout.enhancement.md b/changelog.d/datadog-agent-timeout.enhancement.md new file mode 100644 index 0000000000000..a93475c1840f9 --- /dev/null +++ b/changelog.d/datadog-agent-timeout.enhancement.md @@ -0,0 +1,7 @@ +Added support for configurable request timeouts to the `datadog_agent` source. + + This change also introduces two new internal metrics: + - `component_timed_out_events_total` - Counter tracking the number of events that timed out + - `component_timed_out_requests_total` - Counter tracking the number of requests that timed out + +authors: bruceg diff --git a/lib/vector-common/src/internal_event/component_events_timed_out.rs b/lib/vector-common/src/internal_event/component_events_timed_out.rs new file mode 100644 index 0000000000000..bf138dd1481c7 --- /dev/null +++ b/lib/vector-common/src/internal_event/component_events_timed_out.rs @@ -0,0 +1,23 @@ +use metrics::{Counter, counter}; + +use super::Count; + +crate::registered_event! { + ComponentEventsTimedOut { + reason: &'static str, + } => { + timed_out_events: Counter = counter!("component_timed_out_events_total"), + timed_out_requests: Counter = counter!("component_timed_out_requests_total"), + reason: &'static str = self.reason, + } + + fn emit(&self, data: Count) { + warn!( + message = "Events timed out", + events = data.0, + reason = self.reason, + ); + self.timed_out_events.increment(data.0 as u64); + self.timed_out_requests.increment(1); + } +} diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index 83c06260c0d5e..d1c7e47913caf 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -2,6 +2,7 @@ mod bytes_received; mod bytes_sent; pub mod cached_event; pub mod component_events_dropped; +pub mod component_events_timed_out; mod events_received; mod events_sent; mod optional_tag; @@ -15,6 +16,7 @@ pub use bytes_sent::BytesSent; #[allow(clippy::module_name_repetitions)] pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache}; pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL}; +pub use component_events_timed_out::ComponentEventsTimedOut; pub use events_received::{EventsReceived, EventsReceivedHandle}; pub use events_sent::{DEFAULT_OUTPUT, EventsSent, TaggedEventsSent}; pub use metrics::SharedString; diff --git a/lib/vector-core/src/source_sender/builder.rs b/lib/vector-core/src/source_sender/builder.rs index e06d1fa87012b..452d890f0d863 100644 --- a/lib/vector-core/src/source_sender/builder.rs +++ b/lib/vector-core/src/source_sender/builder.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use metrics::{Histogram, histogram}; use vector_buffers::topology::channel::LimitedReceiver; @@ -12,6 +12,7 @@ pub struct Builder { default_output: Option, named_outputs: HashMap, lag_time: Option, + timeout: Option, } impl Default for Builder { @@ -21,6 +22,7 @@ impl Default for Builder { default_output: None, named_outputs: Default::default(), lag_time: Some(histogram!(LAG_TIME_NAME)), + timeout: None, } } } @@ -32,6 +34,12 @@ impl Builder { self } + #[must_use] + pub fn with_timeout(mut self, timeout: Option) -> Self { + self.timeout = timeout; + self + } + pub fn add_source_output( &mut self, output: SourceOutput, @@ -51,6 +59,7 @@ impl Builder { lag_time, log_definition, output_id, + self.timeout, ); self.default_output = Some(output); rx @@ -62,6 +71,7 @@ impl Builder { lag_time, log_definition, output_id, + self.timeout, ); self.named_outputs.insert(name, output); rx diff --git a/lib/vector-core/src/source_sender/errors.rs b/lib/vector-core/src/source_sender/errors.rs index b0d9052c8747a..57f7be8752d64 100644 --- a/lib/vector-core/src/source_sender/errors.rs +++ b/lib/vector-core/src/source_sender/errors.rs @@ -1,61 +1,26 @@ use std::fmt; -use tokio::sync::mpsc; -use vector_buffers::topology::channel::SendError; +use vector_buffers::topology::channel; -use crate::event::{Event, EventArray}; - -#[derive(Clone, Debug)] -pub struct ClosedError; - -impl fmt::Display for ClosedError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Sender is closed.") - } -} - -impl std::error::Error for ClosedError {} - -impl From> for ClosedError { - fn from(_: mpsc::error::SendError) -> Self { - Self - } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SendError { + Timeout, + Closed, } -impl From> for ClosedError { - fn from(_: mpsc::error::SendError) -> Self { - Self +impl From> for SendError { + fn from(_: channel::SendError) -> Self { + Self::Closed } } -impl From> for ClosedError { - fn from(_: SendError) -> Self { - Self - } -} - -#[derive(Debug)] -pub enum StreamSendError { - Closed(ClosedError), - Stream(E), -} - -impl fmt::Display for StreamSendError -where - E: fmt::Display, -{ +impl fmt::Display for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - StreamSendError::Closed(e) => e.fmt(f), - StreamSendError::Stream(e) => e.fmt(f), + Self::Timeout => f.write_str("Send timed out."), + Self::Closed => f.write_str("Sender is closed."), } } } -impl std::error::Error for StreamSendError where E: std::error::Error {} - -impl From for StreamSendError { - fn from(e: ClosedError) -> Self { - StreamSendError::Closed(e) - } -} +impl std::error::Error for SendError {} diff --git a/lib/vector-core/src/source_sender/mod.rs b/lib/vector-core/src/source_sender/mod.rs index b6615c106b608..c8af2db8bbf87 100644 --- a/lib/vector-core/src/source_sender/mod.rs +++ b/lib/vector-core/src/source_sender/mod.rs @@ -13,7 +13,7 @@ mod sender; mod tests; pub use builder::Builder; -pub use errors::{ClosedError, StreamSendError}; +pub use errors::SendError; use output::Output; pub use sender::{SourceSender, SourceSenderItem}; diff --git a/lib/vector-core/src/source_sender/output.rs b/lib/vector-core/src/source_sender/output.rs index 20a122d7f485f..88556eae08527 100644 --- a/lib/vector-core/src/source_sender/output.rs +++ b/lib/vector-core/src/source_sender/output.rs @@ -1,4 +1,9 @@ -use std::{fmt, num::NonZeroUsize, sync::Arc, time::Instant}; +use std::{ + fmt, + num::NonZeroUsize, + sync::Arc, + time::{Duration, Instant}, +}; use chrono::Utc; use futures::{Stream, StreamExt as _}; @@ -11,13 +16,13 @@ use vector_buffers::{ use vector_common::{ byte_size_of::ByteSizeOf, internal_event::{ - self, ComponentEventsDropped, CountByteSize, EventsSent, InternalEventHandle as _, - Registered, UNINTENTIONAL, + self, ComponentEventsDropped, ComponentEventsTimedOut, Count, CountByteSize, EventsSent, + InternalEventHandle as _, RegisterInternalEvent as _, Registered, UNINTENTIONAL, }, }; use vrl::value::Value; -use super::{CHUNK_SIZE, ClosedError, SourceSenderItem}; +use super::{CHUNK_SIZE, SendError, SourceSenderItem}; use crate::{ EstimatedJsonEncodedSizeOf, config::{OutputId, log_schema}, @@ -52,6 +57,15 @@ impl UnsentEventCount { const fn discard(&mut self) { self.count = 0; } + + fn timed_out(&mut self) { + ComponentEventsTimedOut { + reason: "Source send timed out.", + } + .register() + .emit(Count(self.count)); + self.count = 0; + } } impl Drop for UnsentEventCount { @@ -76,6 +90,7 @@ pub(super) struct Output { /// The OutputId related to this source sender. This is set as the `upstream_id` in /// `EventMetadata` for all event sent through here. id: Arc, + timeout: Option, } #[expect(clippy::missing_fields_in_debug)] @@ -84,6 +99,7 @@ impl fmt::Debug for Output { fmt.debug_struct("Output") .field("sender", &self.sender) .field("output_id", &self.id) + .field("timeout", &self.timeout) // `metrics::Histogram` is missing `impl Debug` .finish() } @@ -96,6 +112,7 @@ impl Output { lag_time: Option, log_definition: Option>, output_id: OutputId, + timeout: Option, ) -> (Self, LimitedReceiver) { let (tx, rx) = channel::limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap())); ( @@ -107,6 +124,7 @@ impl Output { ))), log_definition, id: Arc::new(output_id), + timeout, }, rx, ) @@ -116,7 +134,7 @@ impl Output { &mut self, mut events: EventArray, unsent_event_count: &mut UnsentEventCount, - ) -> Result<(), ClosedError> { + ) -> Result<(), SendError> { let send_reference = Instant::now(); let reference = Utc::now().timestamp_millis(); events @@ -133,31 +151,51 @@ impl Output { let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); - self.sender - .send(SourceSenderItem { - events, - send_reference, - }) - .await - .map_err(|_| ClosedError)?; + self.send_with_timeout(events, send_reference).await?; self.events_sent.emit(CountByteSize(count, byte_size)); unsent_event_count.decr(count); Ok(()) } + async fn send_with_timeout( + &mut self, + events: EventArray, + send_reference: Instant, + ) -> Result<(), SendError> { + let item = SourceSenderItem { + events, + send_reference, + }; + if let Some(timeout) = self.timeout { + match tokio::time::timeout(timeout, self.sender.send(item)).await { + Ok(Ok(())) => Ok(()), + Ok(Err(error)) => Err(error.into()), + Err(_elapsed) => Err(SendError::Timeout), + } + } else { + self.sender.send(item).await.map_err(Into::into) + } + } + pub(super) async fn send_event( &mut self, event: impl Into, - ) -> Result<(), ClosedError> { + ) -> Result<(), SendError> { let event: EventArray = event.into(); // It's possible that the caller stops polling this future while it is blocked waiting // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit // `ComponentEventsDropped` events. let mut unsent_event_count = UnsentEventCount::new(event.len()); - self.send(event, &mut unsent_event_count).await + self.send(event, &mut unsent_event_count) + .await + .inspect_err(|error| { + if let SendError::Timeout = error { + unsent_event_count.timed_out(); + } + }) } - pub(super) async fn send_event_stream(&mut self, events: S) -> Result<(), ClosedError> + pub(super) async fn send_event_stream(&mut self, events: S) -> Result<(), SendError> where S: Stream + Unpin, E: Into + ByteSizeOf, @@ -169,7 +207,7 @@ impl Output { Ok(()) } - pub(super) async fn send_batch(&mut self, events: I) -> Result<(), ClosedError> + pub(super) async fn send_batch(&mut self, events: I) -> Result<(), SendError> where E: Into + ByteSizeOf, I: IntoIterator, @@ -183,10 +221,15 @@ impl Output { for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { self.send(events, &mut unsent_event_count) .await - .inspect_err(|_| { - // The unsent event count is discarded here because the callee emits the - // `StreamClosedError`. - unsent_event_count.discard(); + .inspect_err(|error| match error { + SendError::Timeout => { + unsent_event_count.timed_out(); + } + SendError::Closed => { + // The unsent event count is discarded here because the callee emits the + // `StreamClosedError`. + unsent_event_count.discard(); + } })?; } Ok(()) diff --git a/lib/vector-core/src/source_sender/sender.rs b/lib/vector-core/src/source_sender/sender.rs index 88d6f13f97b45..8bbf09404e755 100644 --- a/lib/vector-core/src/source_sender/sender.rs +++ b/lib/vector-core/src/source_sender/sender.rs @@ -1,3 +1,5 @@ +#[cfg(any(test, feature = "test"))] +use std::time::Duration; use std::{collections::HashMap, time::Instant}; use futures::Stream; @@ -18,7 +20,7 @@ use vector_common::{ json_size::JsonSize, }; -use super::{Builder, ClosedError, Output}; +use super::{Builder, Output, SendError}; #[cfg(any(test, feature = "test"))] use super::{LAG_TIME_NAME, TEST_BUFFER_SIZE}; use crate::{ @@ -101,14 +103,23 @@ impl SourceSender { } #[cfg(any(test, feature = "test"))] - pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver) { + pub fn new_test_sender_with_options( + n: usize, + timeout: Option, + ) -> (Self, LimitedReceiver) { let lag_time = Some(histogram!(LAG_TIME_NAME)); let output_id = OutputId { component: "test".to_string().into(), port: None, }; - let (default_output, rx) = - Output::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id); + let (default_output, rx) = Output::new_with_buffer( + n, + DEFAULT_OUTPUT.to_owned(), + lag_time, + None, + output_id, + timeout, + ); ( Self { default_output: Some(default_output), @@ -120,14 +131,14 @@ impl SourceSender { #[cfg(any(test, feature = "test"))] pub fn new_test() -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None); let recv = recv.into_stream().flat_map(into_event_stream); (pipe, recv) } #[cfg(any(test, feature = "test"))] pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -146,7 +157,7 @@ impl SourceSender { pub fn new_test_errors( error_at: impl Fn(usize) -> bool, ) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -180,7 +191,8 @@ impl SourceSender { component: "test".to_string().into(), port: Some(name.clone()), }; - let (output, recv) = Output::new_with_buffer(100, name.clone(), None, None, output_id); + let (output, recv) = + Output::new_with_buffer(100, name.clone(), None, None, output_id, None); let recv = recv.into_stream().map(move |mut item| { item.events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); @@ -201,14 +213,14 @@ impl SourceSender { /// Send an event to the default output. /// /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. - pub async fn send_event(&mut self, event: impl Into) -> Result<(), ClosedError> { + pub async fn send_event(&mut self, event: impl Into) -> Result<(), SendError> { self.default_output_mut().send_event(event).await } /// Send a stream of events to the default output. /// /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. - pub async fn send_event_stream(&mut self, events: S) -> Result<(), ClosedError> + pub async fn send_event_stream(&mut self, events: S) -> Result<(), SendError> where S: Stream + Unpin, E: Into + ByteSizeOf, @@ -219,7 +231,7 @@ impl SourceSender { /// Send a batch of events to the default output. /// /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. - pub async fn send_batch(&mut self, events: I) -> Result<(), ClosedError> + pub async fn send_batch(&mut self, events: I) -> Result<(), SendError> where E: Into + ByteSizeOf, I: IntoIterator, @@ -231,7 +243,7 @@ impl SourceSender { /// Send a batch of events event to a named output. /// /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. - pub async fn send_batch_named(&mut self, name: &str, events: I) -> Result<(), ClosedError> + pub async fn send_batch_named(&mut self, name: &str, events: I) -> Result<(), SendError> where E: Into + ByteSizeOf, I: IntoIterator, diff --git a/lib/vector-core/src/source_sender/tests.rs b/lib/vector-core/src/source_sender/tests.rs index a659c196a3a3e..113fe4f7e7f70 100644 --- a/lib/vector-core/src/source_sender/tests.rs +++ b/lib/vector-core/src/source_sender/tests.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Duration, Utc}; use rand::{Rng, rng}; +use std::time::{Duration as StdDuration, Instant}; use tokio::time::timeout; use vrl::event_path; @@ -97,7 +98,7 @@ async fn emit_and_test(make_event: impl FnOnce(DateTime) -> Event) { #[tokio::test] async fn emits_component_discarded_events_total_for_send_event() { metrics::init_test(); - let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1); + let (mut sender, _recv) = SourceSender::new_test_sender_with_options(1, None); let event = Event::Metric(Metric::new( "name", @@ -138,7 +139,7 @@ async fn emits_component_discarded_events_total_for_send_event() { #[expect(clippy::cast_precision_loss)] async fn emits_component_discarded_events_total_for_send_batch() { metrics::init_test(); - let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1); + let (mut sender, _recv) = SourceSender::new_test_sender_with_options(1, None); let expected_drop = 100; let events: Vec = (0..(CHUNK_SIZE + expected_drop)) @@ -159,18 +160,81 @@ async fn emits_component_discarded_events_total_for_send_batch() { .await; assert!(res.is_err(), "Send should have timed out."); - let component_discarded_events_total = Controller::get() + let metrics = get_component_metrics(); + assert_no_metric(&metrics, "component_timed_out_events_total"); + assert_no_metric(&metrics, "component_timed_out_requests_total"); + assert_counter_metric( + &metrics, + "component_discarded_events_total", + expected_drop as f64, + ); +} + +#[tokio::test] +async fn times_out_send_event_with_timeout() { + metrics::init_test(); + + let timeout_duration = StdDuration::from_millis(10); + let (mut sender, _recv) = SourceSender::new_test_sender_with_options(1, Some(timeout_duration)); + + let event = Event::Metric(Metric::new( + "name", + MetricKind::Absolute, + MetricValue::Gauge { value: 123.4 }, + )); + + sender + .send_event(event.clone()) + .await + .expect("First send should succeed"); + + let start = Instant::now(); + let result = sender.send_event(event).await; + let elapsed = start.elapsed(); + + assert!( + matches!(result, Err(SendError::Timeout)), + "Send should return a timeout error." + ); + assert!( + elapsed >= timeout_duration, + "Send did not wait for the configured timeout" + ); + assert!(elapsed <= timeout_duration * 2, "Send waited too long"); + + let metrics = get_component_metrics(); + assert_no_metric(&metrics, "component_discarded_events_total"); + assert_counter_metric(&metrics, "component_timed_out_events_total", 1.0); + assert_counter_metric(&metrics, "component_timed_out_requests_total", 1.0); +} + +fn get_component_metrics() -> Vec { + Controller::get() .expect("There must be a controller") .capture_metrics() .into_iter() - .filter(|metric| metric.name() == "component_discarded_events_total") - .collect::>(); - assert_eq!(component_discarded_events_total.len(), 1); + .filter(|metric| metric.name().starts_with("component_")) + .collect() +} - let component_discarded_events_total = &component_discarded_events_total[0]; - let MetricValue::Counter { value } = component_discarded_events_total.value() else { - panic!("component_discarded_events_total has invalid type") - }; +fn assert_no_metric(metrics: &[Metric], name: &str) { + assert!( + !metrics.iter().any(|metric| metric.name() == name), + "Metric {name} should not be present" + ); +} - assert_eq!(*value, expected_drop as f64,); +fn assert_counter_metric(metrics: &[Metric], name: &str, expected: f64) { + let mut filter = metrics.iter().filter(|metric| metric.name() == name); + let Some(metric) = filter.next() else { + panic!("Metric {name} should be present"); + }; + let MetricValue::Counter { value } = metric.value() else { + panic!("Metric {name} should be a counter"); + }; + assert_eq!(*value, expected); + assert!( + filter.next().is_none(), + "Only one {name} metric should be present" + ); } diff --git a/src/config/source.rs b/src/config/source.rs index 25fd6ab72915b..54e3c409f1e1f 100644 --- a/src/config/source.rs +++ b/src/config/source.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, collections::HashMap}; +use std::{cell::RefCell, collections::HashMap, time::Duration}; use async_trait::async_trait; use dyn_clone::DynClone; @@ -120,6 +120,12 @@ pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sy /// well as emit contextual warnings when end-to-end acknowledgements are enabled, but the /// topology as configured does not actually support the use of end-to-end acknowledgements. fn can_acknowledge(&self) -> bool; + + /// If this source supports timeout returns from the `SourceSender` and the configuration + /// provides a timeout value, return it here and the `out` channel will be configured with it. + fn send_timeout(&self) -> Option { + None + } } dyn_clone::clone_trait_object!(SourceConfig); diff --git a/src/sources/aws_kinesis_firehose/errors.rs b/src/sources/aws_kinesis_firehose/errors.rs index baf8463e5528b..24c56d5a797de 100644 --- a/src/sources/aws_kinesis_firehose/errors.rs +++ b/src/sources/aws_kinesis_firehose/errors.rs @@ -35,15 +35,8 @@ pub enum RequestError { source: std::io::Error, request_id: String, }, - #[snafu(display( - "Could not forward events for request {}, downstream is closed: {}", - request_id, - source - ))] - ShuttingDown { - source: vector_lib::source_sender::ClosedError, - request_id: String, - }, + #[snafu(display("Could not forward events for request {request_id}, downstream is closed"))] + ShuttingDown { request_id: String }, #[snafu(display("Unsupported encoding: {}", encoding))] UnsupportedEncoding { encoding: String, diff --git a/src/sources/aws_kinesis_firehose/handlers.rs b/src/sources/aws_kinesis_firehose/handlers.rs index 551f96f3cd4c7..b9eefc857f3c3 100644 --- a/src/sources/aws_kinesis_firehose/handlers.rs +++ b/src/sources/aws_kinesis_firehose/handlers.rs @@ -18,6 +18,7 @@ use vector_lib::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered, }, lookup::{PathPrefix, metadata_path, path}, + source_sender::SendError, }; use vrl::compiler::SecretTarget; use warp::reject; @@ -143,13 +144,16 @@ pub(super) async fn firehose( } let count = events.len(); - if let Err(error) = context.out.send_batch(events).await { - emit!(StreamClosedError { count }); - let error = RequestError::ShuttingDown { - request_id: request_id.clone(), - source: error, - }; - warp::reject::custom(error); + match context.out.send_batch(events).await { + Ok(()) => (), + Err(SendError::Closed) => { + emit!(StreamClosedError { count }); + let error = RequestError::ShuttingDown { + request_id: request_id.clone(), + }; + warp::reject::custom(error); + } + Err(SendError::Timeout) => unreachable!("No timeout is configured here"), } drop(batch); diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index a7d972bef167a..36e09a61b0fc5 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -38,6 +38,7 @@ use vector_lib::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered, }, lookup::{PathPrefix, metadata_path, path}, + source_sender::SendError, }; use crate::{ @@ -232,7 +233,7 @@ pub enum ProcessingError { }, #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))] PipelineSend { - source: vector_lib::source_sender::ClosedError, + source: vector_lib::source_sender::SendError, bucket: String, key: String, }, @@ -783,11 +784,12 @@ impl IngestorProcess { let send_error = match self.out.send_event_stream(&mut stream).await { Ok(_) => None, - Err(_) => { + Err(SendError::Closed) => { let (count, _) = stream.size_hint(); emit!(StreamClosedError { count }); - Some(vector_lib::source_sender::ClosedError) + Some(SendError::Closed) } + Err(SendError::Timeout) => unreachable!("No timeout is configured here"), }; // Up above, `lines` captures `read_error`, and eventually is captured by `stream`, diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 0ee1ee8bd5ac5..ac84188df7c78 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -27,6 +27,7 @@ use http::StatusCode; use hyper::{Server, service::make_service_fn}; use regex::Regex; use serde::{Deserialize, Serialize}; +use serde_with::serde_as; use snafu::Snafu; use tokio::net::TcpStream; use tower::ServiceBuilder; @@ -39,6 +40,7 @@ use vector_lib::{ internal_event::{EventsReceived, Registered}, lookup::owned_value_path, schema::meaning, + source_sender::SendError, tls::MaybeTlsIncomingStream, }; use vrl::{ @@ -73,6 +75,7 @@ pub const TRACES: &str = "traces"; "datadog_agent", "Receive logs, metrics, and traces collected by a Datadog Agent." ))] +#[serde_as] #[derive(Clone, Debug)] pub struct DatadogAgentConfig { /// The socket address to accept connections on. @@ -150,6 +153,18 @@ pub struct DatadogAgentConfig { #[configurable(derived)] #[serde(default)] keepalive: KeepaliveConfig, + + /// The timeout before responding to requests with a HTTP 503 Service Unavailable error. + /// + /// If not set, responses to completed requests will block indefinitely until connected + /// transforms or sinks are ready to receive the events. When this happens, the sending Datadog + /// Agent will eventually time out the request and drop the connection, resulting Vector + /// generating an "Events dropped." error and incrementing the `component_discarded_events_total` + /// internal metric. By setting this option to a value less than the Agent's timeout, Vector + /// will instead respond to the Agent with a HTTP 503 Service Unavailable error, emit a warning, + /// and increment the `component_timed_out_events_total` internal metric instead. + #[serde_as(as = "Option>")] + send_timeout_secs: Option, } impl GenerateConfig for DatadogAgentConfig { @@ -169,6 +184,7 @@ impl GenerateConfig for DatadogAgentConfig { split_metric_namespace: true, log_namespace: Some(false), keepalive: KeepaliveConfig::default(), + send_timeout_secs: None, }) .unwrap() } @@ -201,8 +217,12 @@ impl SourceConfig for DatadogAgentConfig { self.split_metric_namespace, ); let listener = tls.bind(&self.address).await?; - let acknowledgements = cx.do_acknowledgements(self.acknowledgements); - let filters = source.build_warp_filters(cx.out, acknowledgements, self)?; + let handler = RequestHandler { + acknowledgements: cx.do_acknowledgements(self.acknowledgements), + multiple_outputs: self.multiple_outputs, + out: cx.out, + }; + let filters = source.build_warp_filters(handler, self)?; let shutdown = cx.shutdown; let keepalive_settings = self.keepalive.clone(); @@ -329,6 +349,10 @@ impl SourceConfig for DatadogAgentConfig { fn can_acknowledge(&self) -> bool { true } + + fn send_timeout(&self) -> Option { + self.send_timeout_secs.map(Duration::from_secs_f64) + } } #[derive(Clone, Copy, Debug, Snafu)] @@ -421,15 +445,9 @@ impl DatadogAgentSource { fn build_warp_filters( &self, - out: SourceSender, - acknowledgements: bool, + handler: RequestHandler, config: &DatadogAgentConfig, ) -> crate::Result> { - let handler = RequestHandler { - acknowledgements, - multiple_outputs: config.multiple_outputs, - out, - }; let mut filters = (!config.disable_logs).then(|| logs::build_warp_filter(handler.clone(), self.clone())); @@ -526,15 +544,25 @@ impl RequestHandler { let count = events.len(); let output = self.multiple_outputs.then_some(output); - if let Some(name) = output { + let result = if let Some(name) = output { self.out.send_batch_named(name, events).await } else { self.out.send_batch(events).await + }; + match result { + Ok(()) => {} + Err(SendError::Closed) => { + emit!(StreamClosedError { count }); + return Err(warp::reject::custom(ApiError::ServerShutdown)); + } + Err(SendError::Timeout) => { + return Ok(warp::reply::with_status( + "Service unavailable", + StatusCode::SERVICE_UNAVAILABLE, + ) + .into_response()); + } } - .map_err(|_| { - emit!(StreamClosedError { count }); - warp::reject::custom(ApiError::ServerShutdown) - })?; match receiver { None => Ok(warp::reply().into_response()), Some(receiver) => match receiver.await { diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 168eb727c1671..955f5c636d8c4 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -3,6 +3,7 @@ use std::{ iter::FromIterator, net::SocketAddr, str, + time::Duration, }; use bytes::Bytes; @@ -14,6 +15,7 @@ use ordered_float::NotNan; use prost::Message; use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; use similar_asserts::assert_eq; +use tokio::time::timeout; use vector_lib::{ codecs::{ BytesDecoder, BytesDeserializer, CharacterDelimitedDecoderConfig, @@ -63,6 +65,7 @@ const DD_API_SERIES_V1_PATH: &str = "/api/v1/series"; const DD_API_SERIES_V2_PATH: &str = "/api/v2/series"; const DD_API_SKETCHES_PATH: &str = "/api/beta/sketches"; const DD_API_TRACES_PATH: &str = "/api/v0.2/traces"; +const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); fn test_logs_schema_definition() -> schema::Definition { schema::Definition::empty_legacy_namespace().with_event_field( @@ -228,7 +231,60 @@ async fn source( SocketAddr, PortGuard, ) { - let (mut sender, recv) = SourceSender::new_test_finalize(status); + let (sender, recv) = SourceSender::new_test_finalize(status); + let (logs_output, metrics_output, address, guard) = source_with_sender( + sender, + status, + acknowledgements, + store_api_key, + multiple_outputs, + split_metric_namespace, + ) + .await; + (recv, logs_output, metrics_output, address, guard) +} + +async fn source_with_timeout( + status: EventStatus, + acknowledgements: bool, + store_api_key: bool, + multiple_outputs: bool, + split_metric_namespace: bool, + send_timeout: Duration, +) -> ( + impl Stream + Unpin, + Option>, + Option>, + SocketAddr, + PortGuard, +) { + let (sender, recv) = SourceSender::new_test_sender_with_options(1, Some(send_timeout)); + let (logs_output, metrics_output, address, guard) = source_with_sender( + sender, + status, + acknowledgements, + store_api_key, + multiple_outputs, + split_metric_namespace, + ) + .await; + let recv = recv.into_stream().flat_map(into_event_stream); + (recv, logs_output, metrics_output, address, guard) +} + +async fn source_with_sender( + mut sender: SourceSender, + status: EventStatus, + acknowledgements: bool, + store_api_key: bool, + multiple_outputs: bool, + split_metric_namespace: bool, +) -> ( + Option>, + Option>, + SocketAddr, + PortGuard, +) { let mut logs_output = None; let mut metrics_output = None; if multiple_outputs { @@ -243,7 +299,7 @@ async fn source( .flat_map(into_event_stream), ); } - let (_guard, address) = next_addr(); + let (guard, address) = next_addr(); let config = toml::from_str::(&format!( indoc! { r#" address = "{}" @@ -264,19 +320,23 @@ async fn source( config.build(context).await.unwrap().await.unwrap(); }); wait_for_tcp(address).await; - (recv, logs_output, metrics_output, address, _guard) + (logs_output, metrics_output, address, guard) } async fn send_with_path(address: SocketAddr, body: &str, headers: HeaderMap, path: &str) -> u16 { - reqwest::Client::new() - .post(format!("http://{address}{path}")) - .headers(headers) - .body(body.to_owned()) - .send() - .await - .unwrap() - .status() - .as_u16() + timeout( + HTTP_REQUEST_TIMEOUT, + reqwest::Client::new() + .post(format!("http://{address}{path}")) + .headers(headers) + .body(body.to_owned()) + .send(), + ) + .await + .expect("send_with_path request timed out") + .unwrap() + .status() + .as_u16() } async fn send_and_collect( @@ -678,6 +738,68 @@ async fn delivery_failure() { .await; } +#[tokio::test] +async fn send_timeout_returns_service_unavailable() { + trace_init(); + let (rx, _, _, addr, _guard) = source_with_timeout( + EventStatus::Delivered, + false, + true, + false, + true, + Duration::from_millis(50), + ) + .await; + + let body = serde_json::to_string(&[LogMsg { + message: Bytes::from("foo"), + timestamp: Utc + .timestamp_opt(123, 0) + .single() + .expect("invalid timestamp"), + hostname: Bytes::from("festeburg"), + status: Bytes::from("notice"), + service: Bytes::from("vector"), + ddsource: Bytes::from("curl"), + ddtags: Bytes::from("one,two,three"), + }]) + .unwrap(); + + assert_eq!( + 200, + send_with_path(addr, &body, HeaderMap::new(), DD_API_LOGS_V1_PATH).await + ); + + assert_eq!( + 503, + send_with_path(addr, &body, HeaderMap::new(), DD_API_LOGS_V1_PATH).await + ); + drop(rx); +} + +#[test] +fn parse_config_with_send_timeout_secs() { + let config = toml::from_str::(indoc! { r#" + address = "0.0.0.0:8012" + send_timeout_secs = 1.5 + "#}) + .unwrap(); + + assert_eq!(config.send_timeout_secs, Some(1.5)); + assert_eq!(config.send_timeout(), Some(Duration::from_secs_f64(1.5))); +} + +#[test] +fn parse_config_without_send_timeout_secs() { + let config = toml::from_str::(indoc! { r#" + address = "0.0.0.0:8012" + "#}) + .unwrap(); + + assert_eq!(config.send_timeout_secs, None); + assert_eq!(config.send_timeout(), None); +} + #[tokio::test] async fn ignores_disabled_acknowledgements() { assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { @@ -1498,6 +1620,7 @@ fn test_config_outputs_with_disabled_data_types() { split_metric_namespace: true, log_namespace: Some(false), keepalive: Default::default(), + send_timeout_secs: None, }; let outputs: Vec = config @@ -1941,6 +2064,7 @@ fn test_config_outputs() { split_metric_namespace: true, log_namespace: Some(false), keepalive: Default::default(), + send_timeout_secs: None, }; let mut outputs = config @@ -2613,6 +2737,7 @@ impl ValidatableComponent for DatadogAgentConfig { split_metric_namespace: true, log_namespace: Some(false), keepalive: Default::default(), + send_timeout_secs: None, }; let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into(); diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index f92e85eb2e424..7f4895c8ed698 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1778,7 +1778,7 @@ mod integration_test { delay: Duration, status: EventStatus, ) -> (SourceSender, impl Stream + Unpin) { - let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100); + let (pipe, recv) = SourceSender::new_test_sender_with_options(100, None); let recv = recv.into_stream(); let recv = recv.then(move |item| async move { let mut events = item.events; diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index b821cf4e0f257..166232a89a8ea 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -811,7 +811,7 @@ mod test { // shutdown. let (guard, addr) = next_addr(); - let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000); + let (source_tx, source_rx) = SourceSender::new_test_sender_with_options(10_000, None); let source_key = ComponentKey::from("tcp_shutdown_infinite_stream"); let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx); diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index f3158a46cf6dc..b020383b9fbda 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -31,7 +31,7 @@ use vector_lib::{ lookup::{self, event_path, lookup_v2::OptionalValuePath, owned_value_path}, schema::meaning, sensitive_string::SensitiveString, - source_sender::ClosedError, + source_sender::SendError, tls::MaybeTlsIncomingStream, }; use vrl::{ @@ -433,10 +433,16 @@ impl SplunkSource { } } - if !events.is_empty() - && let Err(ClosedError) = out.send_batch(events).await - { - return Err(Rejection::from(ApiError::ServerShutdown)); + if !events.is_empty() { + match out.send_batch(events).await { + Ok(()) => (), + Err(SendError::Closed) => { + return Err(Rejection::from(ApiError::ServerShutdown)); + } + Err(SendError::Timeout) => { + unreachable!("No timeout is configured for this source.") + } + } } if let Some(error) = error { diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 7f7cf027d397d..2b55b861012a0 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -543,7 +543,7 @@ mod test { }); let component_key = ComponentKey::from("statsd_conversion_disabled"); - let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096); + let (tx, rx) = SourceSender::new_test_sender_with_options(4096, None); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -580,7 +580,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096); + let (tx, rx) = SourceSender::new_test_sender_with_options(4096, None); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -674,7 +674,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096); + let (tx, _rx) = SourceSender::new_test_sender_with_options(4096, None); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 77a3c861c547e..daf0fee5927da 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -31,12 +31,12 @@ pub fn backpressure_source(counter: &Arc) -> BackpressureSourceConf } pub fn basic_source() -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_options(1, None); (tx, BasicSourceConfig::new(rx)) } pub fn basic_source_with_data(data: &str) -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_options(1, None); (tx, BasicSourceConfig::new_with_data(rx, data)) } @@ -44,7 +44,7 @@ pub fn basic_source_with_event_counter( force_shutdown: bool, ) -> (SourceSender, BasicSourceConfig, Arc) { let event_counter = Arc::new(AtomicUsize::new(0)); - let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_options(1, None); let mut source = BasicSourceConfig::new_with_event_counter(rx, Arc::clone(&event_counter)); source.set_force_shutdown(force_shutdown); @@ -76,7 +76,7 @@ pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig } pub fn basic_sink(channel_size: usize) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_options(channel_size, None); let sink = BasicSinkConfig::new(tx, true); (rx.into_stream(), sink) } @@ -88,7 +88,7 @@ pub fn basic_sink_with_data( impl Stream + use<>, BasicSinkConfig, ) { - let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_options(channel_size, None); let sink = BasicSinkConfig::new_with_data(tx, true, data); (rx.into_stream(), sink) } @@ -96,7 +96,7 @@ pub fn basic_sink_with_data( pub fn basic_sink_failing_healthcheck( channel_size: usize, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_options(channel_size, None); let sink = BasicSinkConfig::new(tx, false); (rx.into_stream(), sink) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 797b0bf3554f7..ffc8041a70724 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -260,7 +260,9 @@ impl<'a> Builder<'a> { key.id() ); - let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE); + let mut builder = SourceSender::builder() + .with_buffer(*SOURCE_SENDER_BUFFER_SIZE) + .with_timeout(source.inner.send_timeout()); let mut pumps = Vec::new(); let mut controls = HashMap::new(); let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); @@ -343,8 +345,6 @@ impl<'a> Builder<'a> { }; let pump = Task::new(key.clone(), typetag, pump); - let pipeline = builder.build(); - let (shutdown_signal, force_shutdown_tripwire) = self .shutdown_coordinator .register_source(key, INTERNAL_SOURCES.contains(&typetag)); @@ -354,15 +354,14 @@ impl<'a> Builder<'a> { globals: self.config.global.clone(), enrichment_tables: enrichment_tables.clone(), shutdown: shutdown_signal, - out: pipeline, + out: builder.build(), proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy), acknowledgements: source.sink_acknowledgements, schema_definitions, schema: self.config.schema, extra_context: self.extra_context.clone(), }; - let source = source.inner.build(context).await; - let server = match source { + let server = match source.inner.build(context).await { Err(error) => { self.errors.push(format!("Source \"{key}\": {error}")); continue; diff --git a/website/cue/reference/components/sources/datadog_agent.cue b/website/cue/reference/components/sources/datadog_agent.cue index 96e841889ab2a..52ac91a375bce 100644 --- a/website/cue/reference/components/sources/datadog_agent.cue +++ b/website/cue/reference/components/sources/datadog_agent.cue @@ -220,9 +220,33 @@ components: sources: datadog_agent: { duration distribution). """ } + request_timeouts: { + title: "Request timeout handling" + body: """ + When the Datadog Agent sends a request to this Vector source, and the source + blocks on sending the events in that request to the connected transforms or sinks, + the Agent will eventually time out the request and drop the connection. When that + happens, by default, Vector will emit an "Events dropped." error and increment + the `component_discarded_events_total` internal metric. + + However, while it is technically true that Vector has dropped the events, the + Agent will retry resending that request indefinitely, which means the events will + eventually be received unless the blockage above is permanent or the Agent is + killed before the request is accepted. + + To prevent this potentially misleading telemetry, you can configure + the `send_timeout_secs` option to a + value _less than_ the Agent's timeout, which defaults to 10 seconds. + This will cause Vector to respond to the Agent when such blockages occur with a HTTP 503 + Service Unavailable response, emit a warning instead of an error, + and increment the `component_timed_out_requests_total` internal metric. + """ + } } telemetry: metrics: { + component_timed_out_events_total: components.sources.internal_metrics.output.metrics.component_timed_out_events_total + component_timed_out_requests_total: components.sources.internal_metrics.output.metrics.component_timed_out_requests_total http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total diff --git a/website/cue/reference/components/sources/generated/datadog_agent.cue b/website/cue/reference/components/sources/generated/datadog_agent.cue index 32393130abd24..d18e4410ad609 100644 --- a/website/cue/reference/components/sources/generated/datadog_agent.cue +++ b/website/cue/reference/components/sources/generated/datadog_agent.cue @@ -584,6 +584,21 @@ generated: components: sources: datadog_agent: configuration: { required: false type: bool: default: false } + send_timeout_secs: { + description: """ + The timeout before responding to requests with a HTTP 503 Service Unavailable error. + + If not set, responses to completed requests will block indefinitely until connected + transforms or sinks are ready to receive the events. When this happens, the sending Datadog + Agent will eventually time out the request and drop the connection, resulting Vector + generating an "Events dropped." error and incrementing the `component_discarded_events_total` + internal metric. By setting this option to a value less than the Agent's timeout, Vector + will instead respond to the Agent with a HTTP 503 Service Unavailable error, emit a warning, + and increment the `component_timed_out_events_total` internal metric instead. + """ + required: false + type: float: {} + } split_metric_namespace: { description: """ If this is set to `true`, metric names are split at the first '.' into a namespace and name. diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 7498601b6aa9c..2acd6c2bfbc28 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -84,6 +84,18 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _internal_metrics_tags } + component_timed_out_events_total: { + description: "The total number of events for which this source responded with a timeout error." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } + component_timed_out_requests_total: { + description: "The total number of requests for which this source responded with a timeout error." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } connection_established_total: { description: "The total number of times a connection has been established." type: "counter"