diff --git a/src/liberdus.rs b/src/liberdus.rs index a1d8604..5627803 100644 --- a/src/liberdus.rs +++ b/src/liberdus.rs @@ -10,7 +10,7 @@ use std::{ atomic::{AtomicBool, AtomicUsize}, Arc, }, - time::Duration, + time::{Duration, Instant}, u128, }; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; @@ -26,6 +26,29 @@ pub struct Consensor { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub rng_bias: Option, + #[serde(skip)] + pub rotation_info: Option, +} + +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] +#[allow(non_snake_case)] +pub struct NodeRotationResponse { + pub isRotationBound: bool, + pub rotationIndex: RotationIndex, +} + +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] +pub struct RotationIndex { + pub idx: u32, + pub total: u32, +} + +#[derive(Clone, Debug)] +pub struct NodeRotationInfo { + pub is_rotation_bound: bool, + pub rotation_idx: u32, + pub rotation_total: u32, + pub last_updated: std::time::Instant, } #[allow(non_snake_case)] @@ -50,6 +73,7 @@ pub struct Liberdus { crypto: Arc, load_distribution_commulative_bias: Arc>>, config: Arc, + rotation_cache_updated: Arc, } impl Liberdus { @@ -67,6 +91,7 @@ impl Liberdus { archivers, crypto: sc, load_distribution_commulative_bias: Arc::new(RwLock::new(Vec::new())), + rotation_cache_updated: Arc::new(AtomicBool::new(false)), } } @@ -128,6 +153,9 @@ impl Liberdus { // inititally node list does not contain load data. self.list_prepared .store(false, std::sync::atomic::Ordering::Relaxed); + // Reset rotation cache status + self.rotation_cache_updated + .store(false, std::sync::atomic::Ordering::Relaxed); { let mut guard = self.load_distribution_commulative_bias.write().await; *guard = Vec::new(); @@ -145,6 +173,157 @@ impl Liberdus { } } + /// Update rotation information for all active nodes concurrently + /// This is called periodically and after nodelist updates + pub async fn update_rotation_info(&self) { + let nodes = { + let guard = self.active_nodelist.read().await; + guard.clone() + }; + + if nodes.is_empty() { + return; + } + + println!("Updating rotation info for {} nodes", nodes.len()); + + // Create concurrent tasks for all nodes + let mut tasks = Vec::new(); + for node in nodes.iter() { + let node_clone = node.clone(); + + let task = tokio::spawn(async move { + let url = format!("http://{}:{}/node/rotation", node_clone.ip, node_clone.port); + + let rotation_info = match tokio::time::timeout( + Duration::from_millis(3000), // 3 second timeout for rotation check + reqwest::get(&url) + ).await { + Ok(Ok(response)) => { + match response.json::().await { + Ok(rotation_resp) => { + Some(NodeRotationInfo { + is_rotation_bound: rotation_resp.isRotationBound, + rotation_idx: rotation_resp.rotationIndex.idx, + rotation_total: rotation_resp.rotationIndex.total, + last_updated: std::time::Instant::now(), + }) + } + Err(e) => { + eprintln!("Failed to parse rotation response from {}: {}", node_clone.ip, e); + None + } + } + } + Ok(Err(e)) => { + eprintln!("Failed to fetch rotation info from {}: {}", node_clone.ip, e); + None + } + Err(_) => { + eprintln!("Timeout fetching rotation info from {}", node_clone.ip); + None + } + }; + + (node_clone.id.clone(), rotation_info) + }); + + tasks.push(task); + } + + // Collect all results + let mut rotation_results = HashMap::new(); + for task in tasks { + if let Ok((node_id, rotation_info)) = task.await { + rotation_results.insert(node_id, rotation_info); + } + } + + // Update the nodelist with rotation information + { + let mut guard = self.active_nodelist.write().await; + for node in guard.iter_mut() { + if let Some(rotation_info) = rotation_results.remove(&node.id) { + node.rotation_info = rotation_info; + } + } + } + + // Mark rotation cache as updated + self.rotation_cache_updated + .store(true, std::sync::atomic::Ordering::Relaxed); + + // Reset list prepared flag to force recalculation of bias with rotation filtering + self.list_prepared + .store(false, std::sync::atomic::Ordering::Relaxed); + + println!("Rotation info update completed"); + } + + /// Check if a node is suitable for selection based on rotation criteria + /// Nodes are suitable if: + /// - rotation_idx > 5 (strictly greater than 5) + /// - is_rotation_bound == false + fn is_node_suitable(&self, node: &Consensor) -> bool { + match &node.rotation_info { + Some(info) => { + // Check if rotation data is recent (within 5 minutes) + let is_recent = info.last_updated.elapsed() < Duration::from_secs(300); + + if !is_recent { + return true; // If data is stale, allow the node (fallback) + } + + // Apply the filtering criteria: idx > 5 AND rotation_bound == false + !info.is_rotation_bound && info.rotation_idx > 5 + } + None => true, // If no rotation info, allow the node (fallback) + } + } + + /// Start a background task to periodically update rotation information + /// This should be called when the Liberdus instance is created + pub fn start_rotation_monitor(&self, liberdus_arc: Arc) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(60)); // Update every minute + + loop { + interval.tick().await; + + // Only update if we have an active nodelist + if !liberdus_arc.active_nodelist.read().await.is_empty() { + liberdus_arc.update_rotation_info().await; + } + } + }); + } + + /// Manually trigger a rotation info update (useful for testing or forced refresh) + pub async fn refresh_rotation_info(&self) { + self.rotation_cache_updated + .store(false, std::sync::atomic::Ordering::Relaxed); + self.update_rotation_info().await; + } + + /// Get statistics about the current rotation status of nodes + pub async fn get_rotation_stats(&self) -> (usize, usize, usize) { + let nodes = self.active_nodelist.read().await; + let total_nodes = nodes.len(); + let mut suitable_nodes = 0; + let mut nodes_with_rotation_info = 0; + + for node in nodes.iter() { + if node.rotation_info.is_some() { + nodes_with_rotation_info += 1; + if self.is_node_suitable(node) { + suitable_nodes += 1; + } + } + } + + (total_nodes, nodes_with_rotation_info, suitable_nodes) + } + /// Calculates a node's bias for weighted random selection based on its HTTP round-trip time (RTT). /// /// # Formula @@ -255,6 +434,7 @@ impl Liberdus { /// # Implementation in `prepare_list` /// - The cumulative bias is stored in `self.load_distribution_commulative_bias`. /// - It is recalculated whenever the node list is updated or RTT data changes. + /// - Now also filters nodes based on rotation criteria to ensure only synchronized nodes are used. async fn prepare_list(&self) { if self .list_prepared @@ -273,10 +453,27 @@ impl Liberdus { guard.clone() }; - let max_timeout = self.config.max_http_timeout_ms.try_into().unwrap_or(4000); // 3 seconds - let mut sorted_nodes = nodes; + let max_timeout = self.config.max_http_timeout_ms.try_into().unwrap_or(4000); // 4 seconds + + // Filter nodes based on rotation criteria first + let mut filtered_nodes: Vec = nodes + .into_iter() + .filter(|node| self.is_node_suitable(node)) + .collect(); + + // If no nodes pass the filter, fall back to all nodes (avoid complete service failure) + if filtered_nodes.is_empty() { + eprintln!("Warning: No nodes meet rotation criteria, falling back to all nodes"); + let guard = self.active_nodelist.read().await; + filtered_nodes = guard.clone(); + } else { + println!("Using {} filtered nodes out of {} total nodes", + filtered_nodes.len(), + self.active_nodelist.read().await.len()); + } - sorted_nodes.sort_by(|a, b| { + // Sort filtered nodes by RTT + filtered_nodes.sort_by(|a, b| { let a_time = trip_ms.get(&a.id).unwrap_or(&max_timeout); let b_time = trip_ms.get(&b.id).unwrap_or(&max_timeout); a_time.cmp(b_time) @@ -284,7 +481,7 @@ impl Liberdus { let mut total_bias = 0.0; let mut cumulative_bias = Vec::new(); - for node in &mut sorted_nodes { + for node in &mut filtered_nodes { let last_http_round_trip = *trip_ms.get(&node.id).unwrap_or(&max_timeout); let bias = self.calculate_bias(last_http_round_trip, max_timeout); node.rng_bias = Some(bias); @@ -294,7 +491,7 @@ impl Liberdus { { let mut guard = self.active_nodelist.write().await; - *guard = sorted_nodes; + *guard = filtered_nodes; } { @@ -350,17 +547,26 @@ impl Liberdus { Some((index, nodes[index].clone())) } - /// This function is the defecto way to get a consensor. + /// This function is the defacto way to get a consensor. /// When nodeList is first refreshed the round trip http request time for the nodes are /// unknown. The function will round robin from the list to return consensor. /// During the interaction with the each consensors in each rpc call, it will collect the round trip time for /// each node. The values are then used to calculate a weighted bias for /// node selection. Subsequent call will be redirected towards the node based on that bias and round robin /// is dismissed. + /// + /// Now also includes rotation filtering to ensure only synchronized nodes are selected. pub async fn get_next_appropriate_consensor(&self) -> Option<(usize, Consensor)> { if self.active_nodelist.read().await.is_empty() { return None; } + + // Check if we need to update rotation info + if !self.rotation_cache_updated.load(std::sync::atomic::Ordering::Relaxed) { + // Update rotation info synchronously for the first time, then cache it + self.update_rotation_info().await; + } + match self .list_prepared .load(std::sync::atomic::Ordering::Relaxed) diff --git a/src/main.rs b/src/main.rs index ce8abae..ab6d84b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -108,6 +108,9 @@ async fn main() -> Result<(), Box> { _configs.clone(), )); + // Start the rotation monitor for filtering synchronized nodes + lbd.start_rotation_monitor(Arc::clone(&lbd)); + let _archivers = Arc::clone(&arch_utils); let _liberdus = Arc::clone(&lbd); @@ -124,6 +127,11 @@ async fn main() -> Result<(), Box> { ticker.tick().await; Arc::clone(&_archivers).discover().await; _liberdus.update_active_nodelist().await; + + // Print rotation stats periodically + let (total, with_info, suitable) = _liberdus.get_rotation_stats().await; + println!("\nRotation Stats - Total: {}, With Info: {}, Suitable: {}", + total, with_info, suitable); } });