Skip to content
Merged
7 changes: 7 additions & 0 deletions changelog.d/datadog-agent-timeout.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Added support for configurable request timeouts to the `datadog_agent` source.

This change also introduces two new internal metrics:
- `component_timed_out_events_total` - Counter tracking the number of events that timed out
- `component_timed_out_requests_total` - Counter tracking the number of requests that timed out

authors: bruceg
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use metrics::{Counter, counter};

use super::Count;

crate::registered_event! {
ComponentEventsTimedOut {
reason: &'static str,
} => {
timed_out_events: Counter = counter!("component_timed_out_events_total"),
timed_out_requests: Counter = counter!("component_timed_out_requests_total"),
reason: &'static str = self.reason,
}

fn emit(&self, data: Count) {
warn!(
message = "Events timed out",
events = data.0,
reason = self.reason,
);
self.timed_out_events.increment(data.0 as u64);
self.timed_out_requests.increment(1);
}
}
2 changes: 2 additions & 0 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod bytes_received;
mod bytes_sent;
pub mod cached_event;
pub mod component_events_dropped;
pub mod component_events_timed_out;
mod events_received;
mod events_sent;
mod optional_tag;
Expand All @@ -15,6 +16,7 @@ pub use bytes_sent::BytesSent;
#[allow(clippy::module_name_repetitions)]
pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache};
pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
pub use component_events_timed_out::ComponentEventsTimedOut;
pub use events_received::{EventsReceived, EventsReceivedHandle};
pub use events_sent::{DEFAULT_OUTPUT, EventsSent, TaggedEventsSent};
pub use metrics::SharedString;
Expand Down
12 changes: 11 additions & 1 deletion lib/vector-core/src/source_sender/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

use metrics::{Histogram, histogram};
use vector_buffers::topology::channel::LimitedReceiver;
Expand All @@ -12,6 +12,7 @@ pub struct Builder {
default_output: Option<Output>,
named_outputs: HashMap<String, Output>,
lag_time: Option<Histogram>,
timeout: Option<Duration>,
}

impl Default for Builder {
Expand All @@ -21,6 +22,7 @@ impl Default for Builder {
default_output: None,
named_outputs: Default::default(),
lag_time: Some(histogram!(LAG_TIME_NAME)),
timeout: None,
}
}
}
Expand All @@ -32,6 +34,12 @@ impl Builder {
self
}

#[must_use]
pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}

pub fn add_source_output(
&mut self,
output: SourceOutput,
Expand All @@ -51,6 +59,7 @@ impl Builder {
lag_time,
log_definition,
output_id,
self.timeout,
);
self.default_output = Some(output);
rx
Expand All @@ -62,6 +71,7 @@ impl Builder {
lag_time,
log_definition,
output_id,
self.timeout,
);
self.named_outputs.insert(name, output);
rx
Expand Down
59 changes: 12 additions & 47 deletions lib/vector-core/src/source_sender/errors.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,26 @@
use std::fmt;

use tokio::sync::mpsc;
use vector_buffers::topology::channel::SendError;
use vector_buffers::topology::channel;

use crate::event::{Event, EventArray};

#[derive(Clone, Debug)]
pub struct ClosedError;

impl fmt::Display for ClosedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Sender is closed.")
}
}

impl std::error::Error for ClosedError {}

impl From<mpsc::error::SendError<Event>> for ClosedError {
fn from(_: mpsc::error::SendError<Event>) -> Self {
Self
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SendError {
Timeout,
Closed,
}

impl From<mpsc::error::SendError<EventArray>> for ClosedError {
fn from(_: mpsc::error::SendError<EventArray>) -> Self {
Self
impl<T> From<channel::SendError<T>> for SendError {
fn from(_: channel::SendError<T>) -> Self {
Self::Closed
}
}

impl<T> From<SendError<T>> for ClosedError {
fn from(_: SendError<T>) -> Self {
Self
}
}

#[derive(Debug)]
pub enum StreamSendError<E> {
Closed(ClosedError),
Stream(E),
}

impl<E> fmt::Display for StreamSendError<E>
where
E: fmt::Display,
{
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StreamSendError::Closed(e) => e.fmt(f),
StreamSendError::Stream(e) => e.fmt(f),
Self::Timeout => f.write_str("Send timed out."),
Self::Closed => f.write_str("Sender is closed."),
}
}
}

impl<E> std::error::Error for StreamSendError<E> where E: std::error::Error {}

impl<E> From<ClosedError> for StreamSendError<E> {
fn from(e: ClosedError) -> Self {
StreamSendError::Closed(e)
}
}
impl std::error::Error for SendError {}
2 changes: 1 addition & 1 deletion lib/vector-core/src/source_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod sender;
mod tests;

pub use builder::Builder;
pub use errors::{ClosedError, StreamSendError};
pub use errors::SendError;
use output::Output;
pub use sender::{SourceSender, SourceSenderItem};

Expand Down
83 changes: 63 additions & 20 deletions lib/vector-core/src/source_sender/output.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{fmt, num::NonZeroUsize, sync::Arc, time::Instant};
use std::{
fmt,
num::NonZeroUsize,
sync::Arc,
time::{Duration, Instant},
};

use chrono::Utc;
use futures::{Stream, StreamExt as _};
Expand All @@ -11,13 +16,13 @@ use vector_buffers::{
use vector_common::{
byte_size_of::ByteSizeOf,
internal_event::{
self, ComponentEventsDropped, CountByteSize, EventsSent, InternalEventHandle as _,
Registered, UNINTENTIONAL,
self, ComponentEventsDropped, ComponentEventsTimedOut, Count, CountByteSize, EventsSent,
InternalEventHandle as _, RegisterInternalEvent as _, Registered, UNINTENTIONAL,
},
};
use vrl::value::Value;

use super::{CHUNK_SIZE, ClosedError, SourceSenderItem};
use super::{CHUNK_SIZE, SendError, SourceSenderItem};
use crate::{
EstimatedJsonEncodedSizeOf,
config::{OutputId, log_schema},
Expand Down Expand Up @@ -52,6 +57,15 @@ impl UnsentEventCount {
const fn discard(&mut self) {
self.count = 0;
}

fn timed_out(&mut self) {
ComponentEventsTimedOut {
reason: "Source send timed out.",
}
.register()
.emit(Count(self.count));
self.count = 0;
}
}

impl Drop for UnsentEventCount {
Expand All @@ -76,6 +90,7 @@ pub(super) struct Output {
/// The OutputId related to this source sender. This is set as the `upstream_id` in
/// `EventMetadata` for all event sent through here.
id: Arc<OutputId>,
timeout: Option<Duration>,
}

#[expect(clippy::missing_fields_in_debug)]
Expand All @@ -84,6 +99,7 @@ impl fmt::Debug for Output {
fmt.debug_struct("Output")
.field("sender", &self.sender)
.field("output_id", &self.id)
.field("timeout", &self.timeout)
// `metrics::Histogram` is missing `impl Debug`
.finish()
}
Expand All @@ -96,6 +112,7 @@ impl Output {
lag_time: Option<Histogram>,
log_definition: Option<Arc<Definition>>,
output_id: OutputId,
timeout: Option<Duration>,
) -> (Self, LimitedReceiver<SourceSenderItem>) {
let (tx, rx) = channel::limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap()));
(
Expand All @@ -107,6 +124,7 @@ impl Output {
))),
log_definition,
id: Arc::new(output_id),
timeout,
},
rx,
)
Expand All @@ -116,7 +134,7 @@ impl Output {
&mut self,
mut events: EventArray,
unsent_event_count: &mut UnsentEventCount,
) -> Result<(), ClosedError> {
) -> Result<(), SendError> {
let send_reference = Instant::now();
let reference = Utc::now().timestamp_millis();
events
Expand All @@ -133,31 +151,51 @@ impl Output {

let byte_size = events.estimated_json_encoded_size_of();
let count = events.len();
self.sender
.send(SourceSenderItem {
events,
send_reference,
})
.await
.map_err(|_| ClosedError)?;
self.send_with_timeout(events, send_reference).await?;
self.events_sent.emit(CountByteSize(count, byte_size));
unsent_event_count.decr(count);
Ok(())
}

async fn send_with_timeout(
&mut self,
events: EventArray,
send_reference: Instant,
) -> Result<(), SendError> {
let item = SourceSenderItem {
events,
send_reference,
};
if let Some(timeout) = self.timeout {
match tokio::time::timeout(timeout, self.sender.send(item)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(error)) => Err(error.into()),
Err(_elapsed) => Err(SendError::Timeout),
}
} else {
self.sender.send(item).await.map_err(Into::into)
}
}

pub(super) async fn send_event(
&mut self,
event: impl Into<EventArray>,
) -> Result<(), ClosedError> {
) -> Result<(), SendError> {
let event: EventArray = event.into();
// It's possible that the caller stops polling this future while it is blocked waiting
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
// `ComponentEventsDropped` events.
let mut unsent_event_count = UnsentEventCount::new(event.len());
self.send(event, &mut unsent_event_count).await
self.send(event, &mut unsent_event_count)
.await
.inspect_err(|error| {
if let SendError::Timeout = error {
unsent_event_count.timed_out();
}
})
}

pub(super) async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
pub(super) async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
where
S: Stream<Item = E> + Unpin,
E: Into<Event> + ByteSizeOf,
Expand All @@ -169,7 +207,7 @@ impl Output {
Ok(())
}

pub(super) async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
pub(super) async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
where
E: Into<Event> + ByteSizeOf,
I: IntoIterator<Item = E>,
Expand All @@ -183,10 +221,15 @@ impl Output {
for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
self.send(events, &mut unsent_event_count)
.await
.inspect_err(|_| {
// The unsent event count is discarded here because the callee emits the
// `StreamClosedError`.
unsent_event_count.discard();
.inspect_err(|error| match error {
SendError::Timeout => {
unsent_event_count.timed_out();
}
SendError::Closed => {
// The unsent event count is discarded here because the callee emits the
// `StreamClosedError`.
unsent_event_count.discard();
}
})?;
}
Ok(())
Expand Down
Loading
Loading