Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 213 additions & 7 deletions src/liberdus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
atomic::{AtomicBool, AtomicUsize},
Arc,
},
time::Duration,
time::{Duration, Instant},
u128,
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
Expand All @@ -26,6 +26,29 @@ pub struct Consensor {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub rng_bias: Option<f64>,
#[serde(skip)]
pub rotation_info: Option<NodeRotationInfo>,
}

#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
#[allow(non_snake_case)]
pub struct NodeRotationResponse {
pub isRotationBound: bool,
pub rotationIndex: RotationIndex,
}

#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
pub struct RotationIndex {
pub idx: u32,
pub total: u32,
}

#[derive(Clone, Debug)]
pub struct NodeRotationInfo {
pub is_rotation_bound: bool,
pub rotation_idx: u32,
pub rotation_total: u32,
pub last_updated: std::time::Instant,
}

#[allow(non_snake_case)]
Expand All @@ -50,6 +73,7 @@ pub struct Liberdus {
crypto: Arc<crypto::ShardusCrypto>,
load_distribution_commulative_bias: Arc<RwLock<Vec<f64>>>,
config: Arc<config::Config>,
rotation_cache_updated: Arc<AtomicBool>,
}

impl Liberdus {
Expand All @@ -67,6 +91,7 @@ impl Liberdus {
archivers,
crypto: sc,
load_distribution_commulative_bias: Arc::new(RwLock::new(Vec::new())),
rotation_cache_updated: Arc::new(AtomicBool::new(false)),
}
}

Expand Down Expand Up @@ -128,6 +153,9 @@ impl Liberdus {
// inititally node list does not contain load data.
self.list_prepared
.store(false, std::sync::atomic::Ordering::Relaxed);
// Reset rotation cache status
self.rotation_cache_updated
.store(false, std::sync::atomic::Ordering::Relaxed);
{
let mut guard = self.load_distribution_commulative_bias.write().await;
*guard = Vec::new();
Expand All @@ -145,6 +173,157 @@ impl Liberdus {
}
}

/// Update rotation information for all active nodes concurrently
/// This is called periodically and after nodelist updates
pub async fn update_rotation_info(&self) {
let nodes = {
let guard = self.active_nodelist.read().await;
guard.clone()
};

if nodes.is_empty() {
return;
}

println!("Updating rotation info for {} nodes", nodes.len());

// Create concurrent tasks for all nodes
let mut tasks = Vec::new();
for node in nodes.iter() {
let node_clone = node.clone();

let task = tokio::spawn(async move {
let url = format!("http://{}:{}/node/rotation", node_clone.ip, node_clone.port);

let rotation_info = match tokio::time::timeout(
Duration::from_millis(3000), // 3 second timeout for rotation check
reqwest::get(&url)
).await {
Ok(Ok(response)) => {
match response.json::<NodeRotationResponse>().await {
Ok(rotation_resp) => {
Some(NodeRotationInfo {
is_rotation_bound: rotation_resp.isRotationBound,
rotation_idx: rotation_resp.rotationIndex.idx,
rotation_total: rotation_resp.rotationIndex.total,
last_updated: std::time::Instant::now(),
})
}
Err(e) => {
eprintln!("Failed to parse rotation response from {}: {}", node_clone.ip, e);
None
}
}
}
Ok(Err(e)) => {
eprintln!("Failed to fetch rotation info from {}: {}", node_clone.ip, e);
None
}
Err(_) => {
eprintln!("Timeout fetching rotation info from {}", node_clone.ip);
None
}
};

(node_clone.id.clone(), rotation_info)
});

tasks.push(task);
}

// Collect all results
let mut rotation_results = HashMap::new();
for task in tasks {
if let Ok((node_id, rotation_info)) = task.await {
rotation_results.insert(node_id, rotation_info);
}
}

// Update the nodelist with rotation information
{
let mut guard = self.active_nodelist.write().await;
for node in guard.iter_mut() {
if let Some(rotation_info) = rotation_results.remove(&node.id) {
node.rotation_info = rotation_info;
}
}
}

// Mark rotation cache as updated
self.rotation_cache_updated
.store(true, std::sync::atomic::Ordering::Relaxed);

// Reset list prepared flag to force recalculation of bias with rotation filtering
self.list_prepared
.store(false, std::sync::atomic::Ordering::Relaxed);

println!("Rotation info update completed");
}

/// Check if a node is suitable for selection based on rotation criteria
/// Nodes are suitable if:
/// - rotation_idx > 5 (strictly greater than 5)
/// - is_rotation_bound == false
fn is_node_suitable(&self, node: &Consensor) -> bool {
match &node.rotation_info {
Some(info) => {
// Check if rotation data is recent (within 5 minutes)
let is_recent = info.last_updated.elapsed() < Duration::from_secs(300);

if !is_recent {
return true; // If data is stale, allow the node (fallback)
}

// Apply the filtering criteria: idx > 5 AND rotation_bound == false
!info.is_rotation_bound && info.rotation_idx > 5
}
None => true, // If no rotation info, allow the node (fallback)
}
}

/// Start a background task to periodically update rotation information
/// This should be called when the Liberdus instance is created
pub fn start_rotation_monitor(&self, liberdus_arc: Arc<Self>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60)); // Update every minute

loop {
interval.tick().await;

// Only update if we have an active nodelist
if !liberdus_arc.active_nodelist.read().await.is_empty() {
liberdus_arc.update_rotation_info().await;
}
}
});
}

/// Manually trigger a rotation info update (useful for testing or forced refresh)
pub async fn refresh_rotation_info(&self) {
self.rotation_cache_updated
.store(false, std::sync::atomic::Ordering::Relaxed);
self.update_rotation_info().await;
}

/// Get statistics about the current rotation status of nodes
pub async fn get_rotation_stats(&self) -> (usize, usize, usize) {
let nodes = self.active_nodelist.read().await;
let total_nodes = nodes.len();
let mut suitable_nodes = 0;
let mut nodes_with_rotation_info = 0;

for node in nodes.iter() {
if node.rotation_info.is_some() {
nodes_with_rotation_info += 1;
if self.is_node_suitable(node) {
suitable_nodes += 1;
}
}
}

(total_nodes, nodes_with_rotation_info, suitable_nodes)
}

/// Calculates a node's bias for weighted random selection based on its HTTP round-trip time (RTT).
///
/// # Formula
Expand Down Expand Up @@ -255,6 +434,7 @@ impl Liberdus {
/// # Implementation in `prepare_list`
/// - The cumulative bias is stored in `self.load_distribution_commulative_bias`.
/// - It is recalculated whenever the node list is updated or RTT data changes.
/// - Now also filters nodes based on rotation criteria to ensure only synchronized nodes are used.
async fn prepare_list(&self) {
if self
.list_prepared
Expand All @@ -273,18 +453,35 @@ impl Liberdus {
guard.clone()
};

let max_timeout = self.config.max_http_timeout_ms.try_into().unwrap_or(4000); // 3 seconds
let mut sorted_nodes = nodes;
let max_timeout = self.config.max_http_timeout_ms.try_into().unwrap_or(4000); // 4 seconds

// Filter nodes based on rotation criteria first
let mut filtered_nodes: Vec<Consensor> = nodes
.into_iter()
.filter(|node| self.is_node_suitable(node))
.collect();

// If no nodes pass the filter, fall back to all nodes (avoid complete service failure)
if filtered_nodes.is_empty() {
eprintln!("Warning: No nodes meet rotation criteria, falling back to all nodes");
let guard = self.active_nodelist.read().await;
filtered_nodes = guard.clone();
} else {
println!("Using {} filtered nodes out of {} total nodes",
filtered_nodes.len(),
self.active_nodelist.read().await.len());
}

sorted_nodes.sort_by(|a, b| {
// Sort filtered nodes by RTT
filtered_nodes.sort_by(|a, b| {
let a_time = trip_ms.get(&a.id).unwrap_or(&max_timeout);
let b_time = trip_ms.get(&b.id).unwrap_or(&max_timeout);
a_time.cmp(b_time)
});

let mut total_bias = 0.0;
let mut cumulative_bias = Vec::new();
for node in &mut sorted_nodes {
for node in &mut filtered_nodes {
let last_http_round_trip = *trip_ms.get(&node.id).unwrap_or(&max_timeout);
let bias = self.calculate_bias(last_http_round_trip, max_timeout);
node.rng_bias = Some(bias);
Expand All @@ -294,7 +491,7 @@ impl Liberdus {

{
let mut guard = self.active_nodelist.write().await;
*guard = sorted_nodes;
*guard = filtered_nodes;
}

{
Expand Down Expand Up @@ -350,17 +547,26 @@ impl Liberdus {
Some((index, nodes[index].clone()))
}

/// This function is the defecto way to get a consensor.
/// This function is the defacto way to get a consensor.
/// When nodeList is first refreshed the round trip http request time for the nodes are
/// unknown. The function will round robin from the list to return consensor.
/// During the interaction with the each consensors in each rpc call, it will collect the round trip time for
/// each node. The values are then used to calculate a weighted bias for
/// node selection. Subsequent call will be redirected towards the node based on that bias and round robin
/// is dismissed.
///
/// Now also includes rotation filtering to ensure only synchronized nodes are selected.
pub async fn get_next_appropriate_consensor(&self) -> Option<(usize, Consensor)> {
if self.active_nodelist.read().await.is_empty() {
return None;
}

// Check if we need to update rotation info
if !self.rotation_cache_updated.load(std::sync::atomic::Ordering::Relaxed) {
// Update rotation info synchronously for the first time, then cache it
self.update_rotation_info().await;
}

match self
.list_prepared
.load(std::sync::atomic::Ordering::Relaxed)
Expand Down
8 changes: 8 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
_configs.clone(),
));

// Start the rotation monitor for filtering synchronized nodes
lbd.start_rotation_monitor(Arc::clone(&lbd));

let _archivers = Arc::clone(&arch_utils);
let _liberdus = Arc::clone(&lbd);

Expand All @@ -124,6 +127,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
ticker.tick().await;
Arc::clone(&_archivers).discover().await;
_liberdus.update_active_nodelist().await;

// Print rotation stats periodically
let (total, with_info, suitable) = _liberdus.get_rotation_stats().await;
println!("\nRotation Stats - Total: {}, With Info: {}, Suitable: {}",
total, with_info, suitable);
}
});

Expand Down
Loading