Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,13 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
/// performed when entering the state
async fn on_entering(&mut self, blk: &Block) -> anyhow::Result<()> {
let mut acc = self.acc.write().await;

// NOTE: If we switched to InSync because of a timeout handled in the
// OutOfSync's on_block_event, we need to handle `blk` as usual.
// However, given the current implementation, we only handle the tip+1
// case, at the risk of losing this block. This should be improved in
// the future

let curr_h = acc.get_curr_height().await;

if blk.header().height == curr_h + 1 {
Expand Down
92 changes: 52 additions & 40 deletions node/src/chain/fsm/outofsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
self.pool.insert(key, target_block);
self.remote_peer = peer_addr;

if let Some(last_request) = self.request_pool_missing_blocks().await {
self.last_request = last_request
}
self.request_pool_missing_blocks().await;

let (from, to) = &self.range;
info!(event = "entering out-of-sync", from, to, ?peer_addr);
Expand Down Expand Up @@ -227,13 +225,13 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
blk: &Block,
metadata: Option<Metadata>,
) -> anyhow::Result<bool> {
let mut acc = self.acc.write().await;
let acceptor = self.acc.clone();
let mut acc = acceptor.write().await;
let block_height = blk.header().height;

if self.attempts == 0 && self.is_timeout_expired() {
acc.restart_consensus().await;
// Timeout-ed sync-up
// Transit back to InSync mode
// We check the timeout here to prevent the peer from keeping us in
// outofsync by flooding our node with blocks
if self.on_sync_timeout().await {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On_sync_timeout try to acquire a write lock from the acceptor (but you already acquired it, so probably it get stuck)

Try this before merge

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

return Ok(true);
}

Expand All @@ -242,6 +240,14 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
return Ok(false);
}

// If we receive a block higher than the current target, we "extend" the
// range of the syncing process
//
// NOTE: the block could be coming from a different peer than the one we
// are currently syncing with. This means the current peer could
// actually not have blocks up to the new target. We might want to
// handle this update differently in the future, e.g. by also updating
// the sync peer.
if block_height > self.range.1 {
debug!(
event = "update sync target",
Expand Down Expand Up @@ -331,7 +337,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
return Ok(false);
}

let block_height = blk.header().height;
let pool_len = self.pool.len();

if self.pool.contains_key(&block_height) {
Expand All @@ -346,10 +351,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>

// If we almost dequeued all requested blocks (2/3)
if self.last_request < current_height + (MAX_BLOCKS_TO_REQUEST / 3) {
if let Some(last_request) = self.request_pool_missing_blocks().await
{
self.last_request = last_request
}
self.request_pool_missing_blocks().await;
}

// if the pool is full, check if the new block has higher priority
Expand Down Expand Up @@ -396,31 +398,35 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>

pub async fn on_heartbeat(&mut self) -> anyhow::Result<bool> {
if self.is_timeout_expired() {
if self.attempts == 0 {
debug!(
event = "out_of_sync timer expired",
attempts = self.attempts,
mode = "out_of_sync"
);
// sync-up has timed out, recover consensus task
self.acc.write().await.restart_consensus().await;
return Ok(self.on_sync_timeout().await);
}

// sync-up timed out for N attempts
// Transit back to InSync mode as a fail-over
return Ok(true);
}
Ok(false)
}

// Request missing from local_pool blocks
if let Some(last_request) = self.request_pool_missing_blocks().await
{
self.last_request = last_request
}
async fn on_sync_timeout(&mut self) -> bool {
debug!(
event = "out_of_sync timer expired",
attempts = self.attempts,
mode = "out_of_sync"
);

if self.attempts == 0 {
// sync-up has timed out, recover consensus task
self.acc.write().await.restart_consensus().await;

self.start_time = SystemTime::now();
self.attempts -= 1;
// sync-up timed out for N attempts
// Transit back to InSync mode as a fail-over
return true;
}

Ok(false)
// Request missing from local_pool blocks
self.request_pool_missing_blocks().await;

self.start_time = SystemTime::now();
self.attempts -= 1;

false
}

async fn request_missing_block(&self, height: u64) {
Expand Down Expand Up @@ -449,18 +455,23 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
/// present in the pool and requests them from the `remote_peer`.
///
/// Returns the height of the last block requested, if any.
async fn request_pool_missing_blocks(&self) -> Option<u64> {
let mut last_request = None;
async fn request_pool_missing_blocks(&mut self) {
let mut inv = Inv::new(0);

let mut inv_count = 0;

let mut first_request = None;
let mut last_request = None;

for height in self.range.0..=self.range.1 {
if self.pool.contains_key(&height) {
// already received
continue;
}
inv.add_block_from_height(height);
inv_count += 1;
if first_request.is_none() {
first_request = Some(height);
}
last_request = Some(height);
if inv_count >= MAX_BLOCKS_TO_REQUEST {
break;
Expand All @@ -469,8 +480,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>

if !inv.inv_list.is_empty() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we are at it, can we simply return early if inv.inv_list.is_empty() rather than nesting logic in the opposite case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally in favor!

debug!(
event = "request blocks",
target = last_request.unwrap_or_default(),
event = "request missing blocks",
first_request = first_request.unwrap_or_default(),
last_request = last_request.unwrap_or_default(),
mode = "out_of_sync",
);

Expand All @@ -490,9 +502,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
mode = "out_of_sync",
);
warn!("Unable to request missing blocks {e}");
return None;
} else {
self.last_request = last_request.unwrap();
}
}
last_request
}
}