diff --git a/CHANGELOG.md b/CHANGELOG.md index c242cdb..f0a57bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed + +- Lower steady-state CPU usage (~35–45% across all supported modes), release binary size (~22%), and process memory footprint (~2–3.5%). +- ROS control-plane callbacks (parameter and service handlers, timers) are now serviced immediately rather than within a ~110 ms polling cycle. Data-plane publish latency is unchanged. + +No public API or topic-content changes. + ## [0.1.0] - 2026-04-19 Initial public release. diff --git a/Cargo.lock b/Cargo.lock index 8b22670..a087723 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,15 +885,6 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" -[[package]] -name = "lock_api" -version = "0.4.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" -dependencies = [ - "scopeguard", -] - [[package]] name = "log" version = "0.4.29" @@ -1097,29 +1088,6 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" -[[package]] -name = "parking_lot" -version = "0.12.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-link", -] - [[package]] name = "paste" version = "1.0.15" @@ -1287,15 +1255,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "redox_syscall" -version = "0.5.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" -dependencies = [ - "bitflags", -] - [[package]] name = "regex" version = "1.12.3" @@ -1442,12 +1401,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "semver" version = "1.0.28" @@ -1770,7 +1723,6 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/Cargo.toml b/Cargo.toml index f882725..10b35e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ rtcm-rs = "0.11" serial2-tokio = "0.1" base64 = "0.22" url = "2.5" -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "signal"] } # NTRIP client ntrip-core = "0.2" @@ -78,3 +78,8 @@ ros2 = ["dep:rclrs", "dep:sensor_msgs", "dep:geometry_msgs", "dep:std_msgs", "de [package.metadata.ros] install_to_share = ["launch", "config", "udev"] + +[profile.release] +lto = "fat" +codegen-units = 1 +strip = "debuginfo" diff --git a/src/device/task.rs b/src/device/task.rs index 3f584d3..6e12767 100644 --- a/src/device/task.rs +++ b/src/device/task.rs @@ -7,6 +7,7 @@ //! - Injecting RTCM corrections //! - Reporting position data for NTRIP GGA +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -21,8 +22,7 @@ use crate::state::{DeviceState, FixType, GnssIntegrity, IntegrityAggregator, Int use super::config::{ConfigStep, ConfiguratorOptions, DeviceConfigurator}; use super::serial::SerialPortBuilder; use super::ubx::{ - CovData, HpPosData, MonCommsData, MonHwData, MonRfData, PosEcefData, PvtData, RelPosNedData, - RxmCorData, SatInfo, SecSigData, SecSiglogData, SurveyInData, UbxHandler, + HpPosData, PvtData, RelPosNedData, SatInfo, SecSigData, SurveyInData, UbxHandler, }; /// Channels required by the device task. @@ -74,36 +74,26 @@ impl GgaData { } /// Messages sent from the device task. +/// +/// Only messages that are forwarded to the supervisor/ROS task appear here. +/// Internal UBX payloads consumed purely by the integrity aggregator +/// (NAV-COV, NAV-POSECEF, SEC-SIGLOG, RXM-COR, MON-COMMS, MON-HW, MON-RF, +/// fix-type transitions) are handled in-place inside the device task and +/// never touch this channel. #[derive(Debug, Clone)] pub enum DeviceMessage { /// Device state changed StateChanged(DeviceState), /// New PVT data available Pvt(PvtData), - /// Fix type changed - FixTypeChanged(FixType), /// High precision position HpPos(HpPosData), /// Satellite status SatInfo(SatInfo), /// Integrity status update Integrity(GnssIntegrity), - /// Position covariance - Covariance(CovData), - /// ECEF position - PosEcef(PosEcefData), /// Security signal status SecSig(SecSigData), - /// Security event log - SecSiglog(SecSiglogData), - /// Correction status - RxmCor(RxmCorData), - /// Communication port status - MonComms(MonCommsData), - /// Hardware status (antenna, jamming indicator) - deprecated - MonHw(MonHwData), - /// RF status (antenna, jamming indicator) - replaces MonHw - MonRf(MonRfData), /// Relative position for moving base/rover RelPosNed(RelPosNedData), /// Survey-in status (NAV-SVIN) — static_base progress for /diagnostics @@ -112,35 +102,22 @@ pub enum DeviceMessage { impl DeviceMessage { /// Convert to a GnssMessage for forwarding to the supervisor/ROS task. - /// - /// Returns `None` for internal messages that are processed by the - /// IntegrityAggregator and don't need to be forwarded (Covariance, - /// PosEcef, SecSiglog, RxmCor, MonComms, MonHw, MonRf, FixTypeChanged). - pub fn into_gnss_message(self) -> Option { + pub fn into_gnss_message(self) -> crate::state::GnssMessage { use crate::state::GnssMessage; match self { - DeviceMessage::Pvt(pvt) => Some(GnssMessage::Pvt(pvt)), - DeviceMessage::StateChanged(state) => Some(GnssMessage::DeviceStateChanged(state)), - DeviceMessage::HpPos(hp) => Some(GnssMessage::HpPos(hp)), - DeviceMessage::SatInfo(sat) => Some(GnssMessage::SatInfo(sat)), - DeviceMessage::SecSig(sig) => Some(GnssMessage::SecSig(sig)), - DeviceMessage::Integrity(integrity) => Some(GnssMessage::Integrity(integrity)), - DeviceMessage::RelPosNed(rel_pos) => Some(GnssMessage::RelPosNed(rel_pos)), - DeviceMessage::SurveyIn(svin) => Some(GnssMessage::SurveyIn(svin)), - // Internal messages processed by IntegrityAggregator - not forwarded - DeviceMessage::FixTypeChanged(_) - | DeviceMessage::Covariance(_) - | DeviceMessage::PosEcef(_) - | DeviceMessage::SecSiglog(_) - | DeviceMessage::RxmCor(_) - | DeviceMessage::MonComms(_) - | DeviceMessage::MonHw(_) - | DeviceMessage::MonRf(_) => None, + DeviceMessage::Pvt(pvt) => GnssMessage::Pvt(pvt), + DeviceMessage::StateChanged(state) => GnssMessage::DeviceStateChanged(state), + DeviceMessage::HpPos(hp) => GnssMessage::HpPos(hp), + DeviceMessage::SatInfo(sat) => GnssMessage::SatInfo(sat), + DeviceMessage::SecSig(sig) => GnssMessage::SecSig(sig), + DeviceMessage::Integrity(integrity) => GnssMessage::Integrity(integrity), + DeviceMessage::RelPosNed(rel_pos) => GnssMessage::RelPosNed(rel_pos), + DeviceMessage::SurveyIn(svin) => GnssMessage::SurveyIn(svin), } } } -/// Shared device task state for external monitoring. +/// Snapshot of device task state for external monitoring. #[derive(Debug, Default)] pub struct DeviceTaskState { /// Current device state @@ -153,31 +130,46 @@ pub struct DeviceTaskState { pub rtcm_bytes_injected: u64, } +#[derive(Debug, Default)] +struct DeviceTaskInner { + state: DeviceState, + fix_type: FixType, +} + +/// Shared device task state. Counters are atomic and mutated lock-free from +/// the hot path; the mutable device state enum stays under the mutex. +#[derive(Debug, Default)] +pub struct DeviceTaskShared { + inner: Mutex, + messages_received: AtomicU64, + rtcm_bytes_injected: AtomicU64, +} + /// Handle for monitoring the device task. #[derive(Clone)] pub struct DeviceTaskHandle { - state: Arc>, + shared: Arc, } impl DeviceTaskHandle { /// Get current device state. pub async fn state(&self) -> DeviceState { - self.state.lock().await.state.clone() + self.shared.inner.lock().await.state.clone() } /// Get current fix type. pub async fn fix_type(&self) -> FixType { - self.state.lock().await.fix_type + self.shared.inner.lock().await.fix_type } /// Get a snapshot of the task state. pub async fn snapshot(&self) -> DeviceTaskState { - let s = self.state.lock().await; + let inner = self.shared.inner.lock().await; DeviceTaskState { - state: s.state.clone(), - fix_type: s.fix_type, - messages_received: s.messages_received, - rtcm_bytes_injected: s.rtcm_bytes_injected, + state: inner.state.clone(), + fix_type: inner.fix_type, + messages_received: self.shared.messages_received.load(Ordering::Relaxed), + rtcm_bytes_injected: self.shared.rtcm_bytes_injected.load(Ordering::Relaxed), } } } @@ -190,7 +182,7 @@ pub struct DeviceTask { /// Enabled ROS topics (for mode-aware validation) enabled_topics: Vec, channels: DeviceTaskChannels, - state: Arc>, + shared: Arc, configurator_options: ConfiguratorOptions, /// Integrity aggregator for safety monitoring integrity: IntegrityAggregator, @@ -219,7 +211,7 @@ impl DeviceTask { ublox_config, enabled_topics, channels, - state: Arc::new(Mutex::new(DeviceTaskState::default())), + shared: Arc::new(DeviceTaskShared::default()), configurator_options: ConfiguratorOptions::default(), integrity: IntegrityAggregator::with_thresholds(integrity_thresholds), last_rtcm_received: None, @@ -240,7 +232,7 @@ impl DeviceTask { ublox_config, enabled_topics, channels, - state: Arc::new(Mutex::new(DeviceTaskState::default())), + shared: Arc::new(DeviceTaskShared::default()), configurator_options, integrity: IntegrityAggregator::with_thresholds(integrity_thresholds), last_rtcm_received: None, @@ -250,7 +242,7 @@ impl DeviceTask { /// Get a handle for monitoring this task. pub fn handle(&self) -> DeviceTaskHandle { DeviceTaskHandle { - state: Arc::clone(&self.state), + shared: Arc::clone(&self.shared), } } @@ -428,7 +420,7 @@ impl DeviceTask { serial: &mut super::SerialPort, ubx: &mut UbxHandler, ) -> Result<(), DeviceError> { - let mut read_buf = [0u8; 1024]; + let mut read_buf = [0u8; 4096]; let watchdog = Duration::from_secs_f64(self.config.watchdog_timeout_secs); let packet_watchdog = Duration::from_secs_f64(self.config.packet_watchdog_secs); let mut last_progress = tokio::time::Instant::now(); @@ -510,8 +502,9 @@ impl DeviceTask { let result = ubx.process(data); if result.messages_processed > 0 { - let mut state = self.state.lock().await; - state.messages_received += result.messages_processed as u64; + self.shared + .messages_received + .fetch_add(result.messages_processed as u64, Ordering::Relaxed); } // Track if we need to recompute integrity @@ -529,120 +522,77 @@ impl DeviceTask { } // Handle Satellite Info and update signal quality for integrity - if let Some(ref sat) = result.sat_info { + if let Some(sat) = result.sat_info { // Compute signal quality metrics from satellite data let signal_quality = sat.compute_signal_quality(); self.integrity.update_signal_quality(&signal_quality); integrity_updated = true; - let _ = self - .channels - .msg_tx - .send(DeviceMessage::SatInfo(sat.clone())) - .await; + let _ = self.channels.msg_tx.send(DeviceMessage::SatInfo(sat)).await; } - // Handle NAV-COV (position/velocity covariance) + // Handle NAV-COV (position/velocity covariance) — integrity only if let Some(ref cov) = result.cov { self.integrity.update_covariance(cov); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::Covariance(cov.clone())) - .await; integrity_updated = true; } - // Handle NAV-POSECEF + // Handle NAV-POSECEF — integrity only if let Some(ref pos) = result.pos_ecef { self.integrity.update_pos_ecef(pos); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::PosEcef(pos.clone())) - .await; } // Handle SEC-SIG (jamming/spoofing detection) - if let Some(ref sig) = result.sec_sig { - self.integrity.update_sec_sig(sig); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::SecSig(sig.clone())) - .await; + if let Some(sig) = result.sec_sig { + self.integrity.update_sec_sig(&sig); + let _ = self.channels.msg_tx.send(DeviceMessage::SecSig(sig)).await; integrity_updated = true; } - // Handle SEC-SIGLOG (security event log) + // Handle SEC-SIGLOG (security event log) — integrity only if let Some(ref siglog) = result.sec_siglog { self.integrity.update_sec_siglog(siglog); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::SecSiglog(siglog.clone())) - .await; integrity_updated = true; } - // Handle RXM-COR (correction status) + // Handle RXM-COR (correction status) — integrity only if let Some(ref cor) = result.rxm_cor { self.integrity.update_rxm_cor(cor); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::RxmCor(cor.clone())) - .await; integrity_updated = true; } - // Handle MON-COMMS (communication port status) + // Handle MON-COMMS (communication port status) — integrity only if let Some(ref comms) = result.mon_comms { self.integrity.update_mon_comms(comms); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::MonComms(comms.clone())) - .await; } - // Handle MON-HW (hardware status) - deprecated, prefer MON-RF + // Handle MON-HW (hardware status) — integrity only (deprecated, prefer MON-RF) if let Some(ref hw) = result.mon_hw { self.integrity.update_mon_hw(hw); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::MonHw(hw.clone())) - .await; integrity_updated = true; } - // Handle MON-RF (RF status) - replaces MON-HW + // Handle MON-RF (RF status) — integrity only (replaces MON-HW) if let Some(ref rf) = result.mon_rf { self.integrity.update_mon_rf(rf); - let _ = self - .channels - .msg_tx - .send(DeviceMessage::MonRf(rf.clone())) - .await; integrity_updated = true; } // Handle NAV-RELPOSNED (relative position for moving base/rover) - if let Some(ref rel_pos) = result.rel_pos_ned { + if let Some(rel_pos) = result.rel_pos_ned { let _ = self .channels .msg_tx - .send(DeviceMessage::RelPosNed(rel_pos.clone())) + .send(DeviceMessage::RelPosNed(rel_pos)) .await; } // Handle NAV-SVIN (survey-in progress on a static base) - if let Some(ref svin) = result.svin { + if let Some(svin) = result.svin { let _ = self .channels .msg_tx - .send(DeviceMessage::SurveyIn(svin.clone())) + .send(DeviceMessage::SurveyIn(svin)) .await; } @@ -704,17 +654,12 @@ impl DeviceTask { pvt.diff_corr_age_s, // Device-reported correction age ); - // Update fix type + // Update fix type (tracked internally; publishers read it via the + // integrity/PVT stream, so no separate channel message is needed). { - let mut state = self.state.lock().await; - if state.fix_type != pvt.fix_type { - state.fix_type = pvt.fix_type; - // Send fix type change message - let _ = self - .channels - .msg_tx - .send(DeviceMessage::FixTypeChanged(pvt.fix_type)) - .await; + let mut inner = self.shared.inner.lock().await; + if inner.fix_type != pvt.fix_type { + inner.fix_type = pvt.fix_type; } } @@ -752,8 +697,9 @@ impl DeviceTask { debug!(bytes = n, "RTCM data injected"); // Track timestamp for host-side correction age self.last_rtcm_received = Some(Instant::now()); - let mut state = self.state.lock().await; - state.rtcm_bytes_injected += n as u64; + self.shared + .rtcm_bytes_injected + .fetch_add(n as u64, Ordering::Relaxed); } Err(e) => { warn!(error = %e, "Failed to inject RTCM data"); @@ -764,14 +710,14 @@ impl DeviceTask { /// Update the device state and notify. async fn set_state(&mut self, new_state: DeviceState) { { - let mut state = self.state.lock().await; - if state.state != new_state { + let mut inner = self.shared.inner.lock().await; + if inner.state != new_state { info!( - from = %state.state, + from = %inner.state, to = %new_state, "Device state transition" ); - state.state = new_state.clone(); + inner.state = new_state.clone(); } } @@ -906,16 +852,13 @@ mod tests { fn test_device_message_variants() { let state_msg = DeviceMessage::StateChanged(DeviceState::Active); assert!(matches!(state_msg, DeviceMessage::StateChanged(_))); - - let fix_msg = DeviceMessage::FixTypeChanged(FixType::RtkFixed); - assert!(matches!(fix_msg, DeviceMessage::FixTypeChanged(_))); } #[tokio::test] async fn test_device_task_handle() { - let state = Arc::new(Mutex::new(DeviceTaskState::default())); + let shared = Arc::new(DeviceTaskShared::default()); let handle = DeviceTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // Default state @@ -924,11 +867,11 @@ mod tests { // Update state { - let mut s = state.lock().await; - s.state = DeviceState::Active; - s.fix_type = FixType::RtkFixed; - s.messages_received = 100; + let mut inner = shared.inner.lock().await; + inner.state = DeviceState::Active; + inner.fix_type = FixType::RtkFixed; } + shared.messages_received.store(100, Ordering::Relaxed); assert!(matches!(handle.state().await, DeviceState::Active)); assert_eq!(handle.fix_type().await, FixType::RtkFixed); diff --git a/src/device/ubx.rs b/src/device/ubx.rs index d6e9425..02a3e43 100644 --- a/src/device/ubx.rs +++ b/src/device/ubx.rs @@ -1436,7 +1436,7 @@ impl UbxHandler { } fn parse_nav_sat(msg: &NavSatRef) -> SatInfo { - let mut sats = Vec::new(); + let mut sats = Vec::with_capacity(msg.num_svs() as usize); for sv in msg.svs() { sats.push(SatStatus { gnss_id: sv.gnss_id(), @@ -1594,8 +1594,8 @@ impl UbxHandler { let flags: SecSigFlags = msg.sig_sec_flags(); // Additional observability (not currently exposed in ROS messages) let jam_num_cent_freqs = msg.jam_num_cent_freqs(); - let mut cent_freq_khz = Vec::new(); - let mut jammed = Vec::new(); + let mut cent_freq_khz = Vec::with_capacity(jam_num_cent_freqs as usize); + let mut jammed = Vec::with_capacity(jam_num_cent_freqs as usize); for e in msg.jam_state_cent_freqs() { cent_freq_khz.push(e.cent_freq_khz); jammed.push(e.jammed); @@ -1621,7 +1621,7 @@ impl UbxHandler { /// Parse SEC-SIGLOG (security event log) message. fn parse_sec_siglog(msg: &SecSiglogRef) -> SecSiglogData { - let mut events = Vec::new(); + let mut events = Vec::with_capacity(msg.num_events() as usize); for event in msg.events() { events.push(SecSiglogEventData { time_elapsed_s: event.time_elapsed_s, @@ -1724,7 +1724,7 @@ impl UbxHandler { fn parse_mon_comms(msg: &MonCommsRef) -> MonCommsData { use ublox::mon_comms::PortId; - let mut ports = Vec::new(); + let mut ports = Vec::with_capacity(msg.n_ports() as usize); for port in msg.ports() { let port_name = match port.port_id { // The u-blox F9P interface description uses portId values: diff --git a/src/main.rs b/src/main.rs index c6b067b..e03e863 100644 --- a/src/main.rs +++ b/src/main.rs @@ -154,16 +154,16 @@ async fn main() -> Result<(), Box> { let supervisor_msg_tx_device = supervisor_msg_tx.clone(); tokio::spawn(async move { while let Some(msg) = device_msg_rx.recv().await { - // Convert and forward messages that need to reach ROS/supervisor - // Internal messages (Covariance, PosEcef, etc.) return None and are skipped - if let Some(gnss_msg) = msg.into_gnss_message() { - if supervisor_msg_tx_device.send(gnss_msg).await.is_err() { - tracing::warn!( - target: oxide_gnss::logging::category::STATE, - "Failed to forward device message to supervisor - shutting down" - ); - break; - } + if supervisor_msg_tx_device + .send(msg.into_gnss_message()) + .await + .is_err() + { + tracing::warn!( + target: oxide_gnss::logging::category::STATE, + "Failed to forward device message to supervisor - shutting down" + ); + break; } } }); @@ -242,31 +242,67 @@ async fn main() -> Result<(), Box> { let _ros_handle = spawn_ros_task(publishers, ros_channels, ros_config); - // Spin the node + // Spin the rclrs executor on its own OS thread. + // + // The old approach polled `executor.spin(spin_once().timeout(100ms))` in a + // tokio loop with a 10ms sleep between iterations — that woke the main + // thread 100×/sec for nothing and put a ~110ms ceiling on ROS callback + // latency (parameter events, service calls, timers). Running `spin()` with + // `SpinOptions::default()` (no timeout) on a dedicated thread instead lets + // the executor block in `rcl_wait` and wake the instant a callback is + // ready — zero polling, no latency floor. + // + // Shutdown: `SpinOptions::until_promise_resolved` is the mechanism that + // both flips the executor's `halt_spinning` flag *and* triggers its guard + // conditions to wake a blocked wait set. We wire that promise to a tokio + // oneshot; dropping `stop_tx` (on shutdown) resolves it and the spin + // thread returns cleanly. info!("Node is ready, spinning..."); - // Main loop + let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>(); + let stop_promise = executor.commands().run(async move { + // Awaited by the rclrs task executor; completes when `stop_tx` is + // dropped or sends. `tokio::sync::oneshot` doesn't need the tokio + // reactor to be polled, so this is fine off the tokio runtime. + let _ = stop_rx.await; + }); + + let spin_thread = std::thread::Builder::new() + .name("rclrs-spin".into()) + .spawn(move || { + let errors = executor.spin(SpinOptions::default().until_promise_resolved(stop_promise)); + for e in &errors { + if !e.to_string().contains("Timeout") { + warn!("Error from ROS2 executor: {}", e); + } + } + }) + .expect("failed to spawn rclrs-spin thread"); + + // Wait for a shutdown request. + let mut shutdown_rx = supervisor.shutdown_rx(); loop { - // Check if shutdown was requested - if *supervisor.shutdown_rx().borrow() { - info!("Shutdown requested, stopping node"); + if *shutdown_rx.borrow_and_update() { break; } - - // Process ROS callbacks (spin once with timeout) - let errors = executor.spin(SpinOptions::spin_once().timeout(Duration::from_millis(100))); - for e in &errors { - // Ignore timeout errors - they're expected when there's nothing to process - if !e.to_string().contains("Timeout") { - warn!("Error spinning ROS2 executor: {}", e); - } + if shutdown_rx.changed().await.is_err() { + // All shutdown senders dropped — treat as shutdown. + break; } + } + info!("Shutdown requested, stopping ROS executor"); - // Small sleep to avoid busy loop - sleep(Duration::from_millis(10)).await; + // Resolve the spin-stop promise; the executor returns within ~1 wait-set + // cycle and the thread exits. + drop(stop_tx); + sleep(Duration::from_millis(500)).await; + if spin_thread.is_finished() { + let _ = spin_thread.join(); + } else { + warn!("rclrs-spin thread did not stop promptly; exiting anyway"); } - // Wait for tasks to finish + // Wait for the tokio tasks (device / NTRIP / ROS publisher) to wind down. info!("Waiting for tasks to finish..."); sleep(Duration::from_millis(500)).await; diff --git a/src/ntrip/task.rs b/src/ntrip/task.rs index 6033433..022a26a 100644 --- a/src/ntrip/task.rs +++ b/src/ntrip/task.rs @@ -6,6 +6,7 @@ //! - Sending GGA position reports //! - Automatic reconnection with backoff +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -51,7 +52,7 @@ pub enum NtripMessage { }, } -/// Shared NTRIP task state for external monitoring. +/// Snapshot of NTRIP task state for external monitoring. #[derive(Debug, Default)] pub struct NtripTaskState { /// Current NTRIP state @@ -66,26 +67,43 @@ pub struct NtripTaskState { pub connection_count: u32, } +#[derive(Debug, Default)] +struct NtripTaskInner { + state: NtripState, + last_data_time: Option, +} + +/// Shared NTRIP task state. Counters are atomic and mutated lock-free from +/// the hot path; the mutable NTRIP state enum stays under the mutex. +#[derive(Debug, Default)] +pub struct NtripTaskShared { + inner: Mutex, + bytes_received: AtomicU64, + messages_forwarded: AtomicU64, + connection_count: AtomicU32, +} + /// Handle for monitoring the NTRIP task. #[derive(Clone)] pub struct NtripTaskHandle { - state: Arc>, + shared: Arc, } impl NtripTaskHandle { /// Get current NTRIP state. pub async fn state(&self) -> NtripState { - self.state.lock().await.state.clone() + self.shared.inner.lock().await.state.clone() } /// Get total bytes received. pub async fn bytes_received(&self) -> u64 { - self.state.lock().await.bytes_received + self.shared.bytes_received.load(Ordering::Relaxed) } /// Get correction age in seconds (time since last data). pub async fn correction_age_secs(&self) -> Option { - self.state + self.shared + .inner .lock() .await .last_data_time @@ -94,13 +112,13 @@ impl NtripTaskHandle { /// Get a snapshot of the task state. pub async fn snapshot(&self) -> NtripTaskState { - let s = self.state.lock().await; + let inner = self.shared.inner.lock().await; NtripTaskState { - state: s.state.clone(), - bytes_received: s.bytes_received, - messages_forwarded: s.messages_forwarded, - last_data_time: s.last_data_time, - connection_count: s.connection_count, + state: inner.state.clone(), + bytes_received: self.shared.bytes_received.load(Ordering::Relaxed), + messages_forwarded: self.shared.messages_forwarded.load(Ordering::Relaxed), + last_data_time: inner.last_data_time, + connection_count: self.shared.connection_count.load(Ordering::Relaxed), } } } @@ -109,7 +127,7 @@ impl NtripTaskHandle { pub struct NtripTask { config: NtripConfig, channels: NtripTaskChannels, - state: Arc>, + shared: Arc, } impl NtripTask { @@ -118,14 +136,14 @@ impl NtripTask { Self { config, channels, - state: Arc::new(Mutex::new(NtripTaskState::default())), + shared: Arc::new(NtripTaskShared::default()), } } /// Get a handle for monitoring this task. pub fn handle(&self) -> NtripTaskHandle { NtripTaskHandle { - state: Arc::clone(&self.state), + shared: Arc::clone(&self.shared), } } @@ -228,10 +246,7 @@ impl NtripTask { client.connect_with_gga(initial_gga.as_ref()).await?; // Update state and stats - { - let mut state = self.state.lock().await; - state.connection_count += 1; - } + self.shared.connection_count.fetch_add(1, Ordering::Relaxed); self.set_state(NtripState::Streaming).await; *last_streaming_start = Some(Instant::now()); if self @@ -311,18 +326,18 @@ impl NtripTask { debug!(bytes = data.len(), "Received RTCM data"); // Update stats - { - let mut state = self.state.lock().await; - state.bytes_received += data.len() as u64; - state.last_data_time = Some(Instant::now()); - } + self.shared + .bytes_received + .fetch_add(data.len() as u64, Ordering::Relaxed); + self.shared.inner.lock().await.last_data_time = Some(Instant::now()); // Forward to device if let Err(e) = self.channels.rtcm_tx.send(data.to_vec()).await { warn!(error = %e, "Failed to forward RTCM to device"); } else { - let mut state = self.state.lock().await; - state.messages_forwarded += 1; + self.shared + .messages_forwarded + .fetch_add(1, Ordering::Relaxed); } // Notify supervisor @@ -361,14 +376,14 @@ impl NtripTask { /// Update the NTRIP state and notify. async fn set_state(&mut self, new_state: NtripState) { { - let mut state = self.state.lock().await; - if state.state != new_state { + let mut inner = self.shared.inner.lock().await; + if inner.state != new_state { info!( - from = %state.state, + from = %inner.state, to = %new_state, "NTRIP state transition" ); - state.state = new_state.clone(); + inner.state = new_state.clone(); } } @@ -451,9 +466,9 @@ mod tests { #[tokio::test] async fn test_ntrip_task_handle() { - let state = Arc::new(Mutex::new(NtripTaskState::default())); + let shared = Arc::new(NtripTaskShared::default()); let handle = NtripTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // Default state @@ -462,11 +477,11 @@ mod tests { // Update state { - let mut s = state.lock().await; - s.state = NtripState::Streaming; - s.bytes_received = 5000; - s.last_data_time = Some(Instant::now()); + let mut inner = shared.inner.lock().await; + inner.state = NtripState::Streaming; + inner.last_data_time = Some(Instant::now()); } + shared.bytes_received.store(5000, Ordering::Relaxed); assert!(matches!(handle.state().await, NtripState::Streaming)); assert_eq!(handle.bytes_received().await, 5000); @@ -479,23 +494,17 @@ mod tests { #[tokio::test] async fn test_handle_state_transitions() { - let state = Arc::new(Mutex::new(NtripTaskState::default())); + let shared = Arc::new(NtripTaskShared::default()); let handle = NtripTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // Simulate state progression: Disabled -> Connecting -> Streaming - { - let mut s = state.lock().await; - s.state = NtripState::Connecting; - } + shared.inner.lock().await.state = NtripState::Connecting; assert!(matches!(handle.state().await, NtripState::Connecting)); - { - let mut s = state.lock().await; - s.state = NtripState::Streaming; - s.connection_count = 1; - } + shared.inner.lock().await.state = NtripState::Streaming; + shared.connection_count.store(1, Ordering::Relaxed); assert!(matches!(handle.state().await, NtripState::Streaming)); // Verify connection count tracked @@ -505,20 +514,17 @@ mod tests { #[tokio::test] async fn test_handle_backoff_state() { - let state = Arc::new(Mutex::new(NtripTaskState::default())); + let shared = Arc::new(NtripTaskShared::default()); let handle = NtripTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // Simulate backoff state - { - let mut s = state.lock().await; - s.state = NtripState::Backoff { - attempt: 3, - delay_secs: 8, - reason: "Connection refused".to_string(), - }; - } + shared.inner.lock().await.state = NtripState::Backoff { + attempt: 3, + delay_secs: 8, + reason: "Connection refused".to_string(), + }; let current = handle.state().await; match current { @@ -537,19 +543,16 @@ mod tests { #[tokio::test] async fn test_handle_correction_age_none_when_no_data() { - let state = Arc::new(Mutex::new(NtripTaskState::default())); + let shared = Arc::new(NtripTaskShared::default()); let handle = NtripTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // No data received yet assert!(handle.correction_age_secs().await.is_none()); // After receiving data - { - let mut s = state.lock().await; - s.last_data_time = Some(Instant::now()); - } + shared.inner.lock().await.last_data_time = Some(Instant::now()); // Correction age should be very small (just set) let age = handle.correction_age_secs().await.unwrap(); @@ -558,20 +561,20 @@ mod tests { #[tokio::test] async fn test_handle_snapshot_captures_all_fields() { - let state = Arc::new(Mutex::new(NtripTaskState::default())); + let shared = Arc::new(NtripTaskShared::default()); let handle = NtripTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // Set up complex state { - let mut s = state.lock().await; - s.state = NtripState::Streaming; - s.bytes_received = 123456; - s.messages_forwarded = 42; - s.connection_count = 5; - s.last_data_time = Some(Instant::now()); + let mut inner = shared.inner.lock().await; + inner.state = NtripState::Streaming; + inner.last_data_time = Some(Instant::now()); } + shared.bytes_received.store(123456, Ordering::Relaxed); + shared.messages_forwarded.store(42, Ordering::Relaxed); + shared.connection_count.store(5, Ordering::Relaxed); let snapshot = handle.snapshot().await; assert!(matches!(snapshot.state, NtripState::Streaming)); @@ -730,17 +733,16 @@ mod tests { #[tokio::test] async fn test_stats_accumulation() { - let state = Arc::new(Mutex::new(NtripTaskState::default())); + let shared = Arc::new(NtripTaskShared::default()); let handle = NtripTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // Simulate receiving data in chunks for _ in 0..10 { - let mut s = state.lock().await; - s.bytes_received += 256; - s.messages_forwarded += 1; - s.last_data_time = Some(Instant::now()); + shared.bytes_received.fetch_add(256, Ordering::Relaxed); + shared.messages_forwarded.fetch_add(1, Ordering::Relaxed); + shared.inner.lock().await.last_data_time = Some(Instant::now()); } let snapshot = handle.snapshot().await; @@ -750,16 +752,15 @@ mod tests { #[tokio::test] async fn test_multiple_reconnections_tracked() { - let state = Arc::new(Mutex::new(NtripTaskState::default())); + let shared = Arc::new(NtripTaskShared::default()); let handle = NtripTaskHandle { - state: Arc::clone(&state), + shared: Arc::clone(&shared), }; // Simulate multiple reconnection cycles for i in 1..=5 { - let mut s = state.lock().await; - s.connection_count = i; - s.state = NtripState::Streaming; + shared.connection_count.store(i, Ordering::Relaxed); + shared.inner.lock().await.state = NtripState::Streaming; } let snapshot = handle.snapshot().await; diff --git a/src/state/integrity.rs b/src/state/integrity.rs index 00df65f..1756176 100644 --- a/src/state/integrity.rs +++ b/src/state/integrity.rs @@ -352,23 +352,10 @@ pub struct IntegrityAggregator { pub thresholds: IntegrityThresholds, /// Current integrity state current: GnssIntegrity, - /// Last covariance data - last_cov: Option, - /// Last ECEF position - last_pos_ecef: Option, - /// Last security signal status - last_sec_sig: Option, - /// Last security event log - last_sec_siglog: Option, - /// Last correction status - last_rxm_cor: Option, - /// Last communication status - last_mon_comms: Option, - /// Last hardware status - last_mon_hw: Option, /// Timestamp of last PVT update (None = never received) last_pvt_update: Option, - /// Last protection level data (NAV-PL) + /// Last protection level data (NAV-PL) — retained because compute() + /// reads it back to fill the output message. last_nav_pl: Option, } @@ -391,47 +378,39 @@ impl IntegrityAggregator { /// Note: Covariance matrices are available in NavSatFix/TwistWithCovarianceStamped. /// This only tracks validity for integrity monitoring. pub fn update_covariance(&mut self, cov: &CovData) { - self.last_cov = Some(cov.clone()); self.current.covariance_valid = cov.pos_cov_valid && cov.vel_cov_valid; } /// Update with ECEF position data (NAV-POSECEF). - pub fn update_pos_ecef(&mut self, pos: &PosEcefData) { - self.last_pos_ecef = Some(pos.clone()); - // ECEF position is stored for potential coordinate transforms - // but doesn't directly affect integrity level + pub fn update_pos_ecef(&mut self, _pos: &PosEcefData) { + // ECEF position doesn't directly affect integrity level. } /// Update with security signal status (SEC-SIG). pub fn update_sec_sig(&mut self, sig: &SecSigData) { - self.last_sec_sig = Some(sig.clone()); self.current.jamming_state = sig.jamming_state; self.current.spoofing_state = sig.spoofing_state; } /// Update with security event log (SEC-SIGLOG). pub fn update_sec_siglog(&mut self, siglog: &SecSiglogData) { - self.last_sec_siglog = Some(siglog.clone()); self.current.security_events = siglog.num_events as u32; } /// Update with differential correction status (RXM-COR). pub fn update_rxm_cor(&mut self, cor: &RxmCorData) { - self.last_rxm_cor = Some(cor.clone()); self.current.correction_received = true; self.current.correction_used = cor.msg_used == CorrectionMsgUsed::Used; } /// Update with communication port status (MON-COMMS). pub fn update_mon_comms(&mut self, comms: &MonCommsData) { - self.last_mon_comms = Some(comms.clone()); self.current.comm_ports = comms.n_ports; self.current.comm_tx_errors = comms.tx_errors; } /// Update with hardware status (MON-HW). pub fn update_mon_hw(&mut self, hw: &MonHwData) { - self.last_mon_hw = Some(hw.clone()); self.current.antenna_status = hw.antenna_status.into(); self.current.jamming_indicator = hw.jam_ind; }