From fe9c6b5c85781e89bff6d79e8c24443f3c84d4b3 Mon Sep 17 00:00:00 2001 From: Nick Carton Date: Thu, 2 Apr 2026 22:17:36 +0200 Subject: [PATCH 1/4] Add remote write support --- Cargo.lock | 421 +++++++++++++++++++++++++++++++++++++++- Cargo.toml | 3 + src/config.rs | 30 +++ src/main.rs | 43 ++++- src/remote_write.rs | 457 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 939 insertions(+), 15 deletions(-) create mode 100644 src/remote_write.rs diff --git a/Cargo.lock b/Cargo.lock index 7be63a7..6e5c696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,21 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + [[package]] name = "approx" version = "0.5.1" @@ -182,6 +197,12 @@ dependencies = [ "shlex", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.4" @@ -200,7 +221,11 @@ version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ + "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", + "windows-link", ] [[package]] @@ -239,6 +264,16 @@ dependencies = [ "tracing-error", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -314,6 +349,7 @@ dependencies = [ name = "distributed-metrics" version = "1.1.0" dependencies = [ + "chrono", "color-eyre", "eyre", "figment", @@ -326,8 +362,10 @@ dependencies = [ "metrics-util 0.20.1", "poem", "progenitor", + "prometheus-parse", + "prometheus-reqwest-remote-write", "regress", - "reqwest", + "reqwest 0.12.28", "serde", "serde_json", "serde_regex", @@ -350,6 +388,12 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "endian-type" version = "0.1.2" @@ -774,6 +818,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.2.0" @@ -935,12 +1003,74 @@ dependencies = [ "iso_country", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys 0.3.1", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41a652e1f9b6e0275df1f15b32661cf0d4b78d4d87ddec5e0c3c20f097433258" +dependencies = [ + "jni-sys 0.4.1", +] + +[[package]] +name = "jni-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6377a88cb3910bee9b0fa88d4f42e1d2da8e79915598f65fb0c7ee14c878af2" +dependencies = [ + "jni-sys-macros", +] + +[[package]] +name = "jni-sys-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38c0b942f458fe50cdac086d2f946512305e5631e720728f2a61aabcd47a6264" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "jobserver" version = "0.1.34" @@ -1068,7 +1198,7 @@ dependencies = [ "metrics-util 0.16.3", "parking_lot", "poem", - "prometheus", + "prometheus 0.13.4", "rust-embed", "serde", ] @@ -1104,7 +1234,7 @@ dependencies = [ "metrics 0.22.4", "metrics-util 0.16.3", "once_cell", - "prometheus", + "prometheus 0.13.4", "sealed", "smallvec", "thiserror 1.0.69", @@ -1475,7 +1605,7 @@ dependencies = [ "bytes", "futures-core", "percent-encoding", - "reqwest", + "reqwest 0.12.28", "serde", "serde_json", "serde_urlencoded", @@ -1532,16 +1662,98 @@ dependencies = [ "lazy_static", "memchr", "parking_lot", - "protobuf", + "protobuf 2.28.0", "thiserror 1.0.69", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf 3.7.2", + "thiserror 2.0.18", +] + +[[package]] +name = "prometheus-parse" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "811031bea65e5a401fb2e1f37d802cca6601e204ac463809a3189352d13b78a5" +dependencies = [ + "chrono", + "itertools 0.12.1", + "once_cell", + "regex", +] + +[[package]] +name = "prometheus-reqwest-remote-write" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "441b28c02014ba0e458838548d5fe770fd0238f024d250f5512cf8540ab48b7a" +dependencies = [ + "prometheus 0.14.0", + "prost", + "reqwest 0.13.2", + "snap", +] + +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "protobuf" version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "quanta" version = "0.12.6" @@ -1583,6 +1795,7 @@ version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ + "aws-lc-rs", "bytes", "getrandom 0.3.4", "lru-slab", @@ -1773,6 +1986,43 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "reqwest" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +dependencies = [ + "base64", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "serde", + "serde_json", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "rfc7239" version = "0.1.3" @@ -1889,6 +2139,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +dependencies = [ + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.61.2", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.103.10" @@ -2189,6 +2466,12 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.6.3" @@ -2838,6 +3121,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-root-certs" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "1.0.6" @@ -2884,12 +3176,74 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2917,6 +3271,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -2950,6 +3319,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -2962,6 +3337,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -2974,6 +3355,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2998,6 +3385,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -3010,6 +3403,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -3022,6 +3421,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -3034,6 +3439,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 3b75c32..ef0def0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,9 @@ humantime-serde = "1.1.1" serde_regex = "1.1.0" thiserror = "2.0.9" geohash = "0.13.1" +prometheus-reqwest-remote-write = "0.5" +prometheus-parse = "0.2" +chrono = "0.4" # The profile that 'dist' will build with [profile.dist] diff --git a/src/config.rs b/src/config.rs index 69df8b8..4ac7429 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,12 +25,42 @@ pub struct GlobalConfig { #[serde(with = "humantime_serde")] #[serde(default = "default_metric_clear_timeout")] pub metric_clear_timeout: Duration, + + /// Enable the /metrics scrape endpoint (default: true) + #[serde(default = "default_true")] + pub scrape_enabled: bool, + + /// Remote write destinations + #[serde(default)] + pub remote_write: Vec, } fn default_metric_clear_timeout() -> Duration { Duration::from_secs(10) } +fn default_true() -> bool { + true +} + +fn default_remote_write_interval() -> Duration { + Duration::from_secs(15) +} + +#[derive(Deserialize, Clone, Debug)] +pub struct RemoteWriteDestination { + pub name: String, + pub url: String, + pub username: Option, + pub password: Option, + /// Custom HTTP headers (e.g., bearer tokens, API keys). + #[serde(default)] + pub headers: HashMap, + #[serde(with = "humantime_serde")] + #[serde(default = "default_remote_write_interval")] + pub interval: Duration, +} + #[derive(Deserialize, AsRefStr, Clone, Debug)] #[serde(rename_all = "snake_case")] #[serde(tag = "type")] diff --git a/src/main.rs b/src/main.rs index 82f6228..c5af8a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,12 +12,12 @@ use poem::EndpointExt; use poem::{get, handler, listener::TcpListener, Route, Server}; use progenitor::generate_api; use std::sync::LazyLock; -use tokio::join; use tokio::task::JoinSet; use tracing::{error, info}; mod collectors; mod config; +mod remote_write; generate_api!(spec = "./api-spec.json", interface = Builder); @@ -81,20 +81,43 @@ async fn main() -> Result<()> { .install_recorder() .expect("failed to install recorder"); - let app = Route::new() - .at("/metrics", get(render_prom)) - .with(AddData::new(handle)); - - let http_server = Server::new(TcpListener::bind("[::]:3000")).run(app); - // Start collection tasks let mut join_set = JoinSet::new(); - spawn_collectors(&CONFIG, &mut join_set).await?; + // Conditionally start the /metrics scrape endpoint + if CONFIG.global_config.scrape_enabled { + let scrape_handle = handle.clone(); + let app = Route::new() + .at("/metrics", get(render_prom)) + .with(AddData::new(scrape_handle)); + join_set.spawn(async move { + if let Err(e) = Server::new(TcpListener::bind("[::]:3000")) + .run(app) + .await + { + error!("HTTP server failed: {}", e); + std::process::exit(1); + } + }); + info!("Scrape endpoint enabled on :3000/metrics"); + } + + // Start remote write senders + if !CONFIG.global_config.remote_write.is_empty() { + let sender = remote_write::RemoteWriteSender::new( + CONFIG.global_config.remote_write.clone(), + handle.clone(), + ); + sender.spawn_all(&mut join_set); + info!( + count = CONFIG.global_config.remote_write.len(), + "Remote write destinations started" + ); + } - let (rs, _) = join!(http_server, join_set.join_all()); + spawn_collectors(&CONFIG, &mut join_set).await?; - rs?; + join_set.join_all().await; Ok(()) } diff --git a/src/remote_write.rs b/src/remote_write.rs new file mode 100644 index 0000000..e9e0d2a --- /dev/null +++ b/src/remote_write.rs @@ -0,0 +1,457 @@ +use std::io::BufRead; + +use crate::config::RemoteWriteDestination; +use eyre::Result; +use metrics_exporter_prometheus::PrometheusHandle; +use prometheus_reqwest_remote_write::{ + Label, Sample as RwSample, TimeSeries, WriteRequest, LABEL_NAME, +}; +use tokio::task::JoinSet; +use std::time::Duration; +use tracing::{debug, info, warn}; + +pub struct RemoteWriteSender { + destinations: Vec, + handle: PrometheusHandle, + client: reqwest::Client, +} + +impl RemoteWriteSender { + pub fn new(destinations: Vec, handle: PrometheusHandle) -> Self { + Self { + destinations, + handle, + client: reqwest::Client::new(), + } + } + + pub fn spawn_all(self, join_set: &mut JoinSet<()>) { + for dest in self.destinations { + let handle = self.handle.clone(); + let client = self.client.clone(); + join_set.spawn(async move { + Self::push_loop(dest, handle, client).await; + }); + } + } + + async fn push_loop( + dest: RemoteWriteDestination, + handle: PrometheusHandle, + client: reqwest::Client, + ) { + let base_interval = dest.interval; + let mut consecutive_failures: u32 = 0; + + loop { + let backoff = if consecutive_failures > 0 { + // Exponential backoff: base * 2^failures, capped at 5 minutes + let multiplier = 2u64.saturating_pow(consecutive_failures.min(8)); + let backoff = base_interval.saturating_mul(multiplier as u32); + backoff.min(Duration::from_secs(300)) + } else { + base_interval + }; + + tokio::time::sleep(backoff).await; + + let text = { + let h = handle.clone(); + tokio::task::spawn_blocking(move || h.render()) + .await + .unwrap() + }; + + if text.is_empty() { + debug!(dest = %dest.name, "no metrics to push"); + continue; + } + + match Self::push_once(&dest, &client, &text).await { + Ok(()) => { + if consecutive_failures > 0 { + info!(dest = %dest.name, "remote_write push recovered after {} failures", consecutive_failures); + } + consecutive_failures = 0; + debug!(dest = %dest.name, "remote_write push succeeded"); + } + Err(e) => { + consecutive_failures = consecutive_failures.saturating_add(1); + let next_multiplier = 2u64.saturating_pow(consecutive_failures.min(8)); + let next_backoff = base_interval + .saturating_mul(next_multiplier as u32) + .min(Duration::from_secs(300)); + warn!( + dest = %dest.name, + error = %e, + consecutive_failures, + next_retry_secs = next_backoff.as_secs(), + "remote_write push failed" + ); + } + } + } + } + + async fn push_once( + dest: &RemoteWriteDestination, + client: &reqwest::Client, + text: &str, + ) -> Result<()> { + let write_request = parse_text_to_write_request(text)?; + let body = write_request + .encode_compressed() + .map_err(|e| eyre::eyre!("snappy compression failed: {}", e))?; + + let mut req = client + .post(&dest.url) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .header("X-Prometheus-Remote-Write-Version", "0.1.0") + .body(body); + + for (key, value) in &dest.headers { + req = req.header(key, value); + } + + if let Some(ref username) = dest.username { + req = req.basic_auth(username, dest.password.as_deref()); + } + + let resp = req.send().await?; + + if !resp.status().is_success() { + let status = resp.status(); + let mut body = resp.text().await.unwrap_or_default(); + body.truncate(1024); + return Err(eyre::eyre!("remote_write returned {}: {}", status, body)); + } + Ok(()) + } +} + +fn parse_text_to_write_request(text: &str) -> Result { + let reader = std::io::BufReader::new(text.as_bytes()); + let scrape = prometheus_parse::Scrape::parse(reader.lines()) + .map_err(|e| eyre::eyre!("failed to parse prometheus text: {}", e))?; + + let now_ms = chrono::Utc::now().timestamp_millis(); + + let mut timeseries: Vec = Vec::new(); + + for sample in &scrape.samples { + let base_labels: Vec