From d0ac24da5f6b49b35e8d0507fa339bf571ed8150 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 8 Oct 2025 12:36:57 +0800 Subject: [PATCH 1/7] attestation committee index --- .../src/attestation_service.rs | 148 ++++++++++-------- 1 file changed, 85 insertions(+), 63 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index da6e8f35886..458a410561d 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -180,8 +180,9 @@ impl AttestationService Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self @@ -189,6 +190,15 @@ impl AttestationService AttestationService> = self + let aggregate_duties_by_committee_index: HashMap> = self .duties_service .attesters(slot) .into_iter() @@ -208,23 +218,21 @@ impl AttestationService AttestationService, - aggregate_production_instant: Instant, ) -> Result<(), ()> { let attestations_timer = validator_metrics::start_timer_vec( &validator_metrics::ATTESTATION_SERVICE_TIMES, &[validator_metrics::ATTESTATIONS], ); - // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have - // any validators for the given `slot` and `committee_index`. + // There's not need to produce `Attestation` if we do not have + // any validators for the given `slot`. if validator_duties.is_empty() { return Ok(()); } - // Step 1. - // - // Download, sign and publish an `Attestation` for each validator. - let attestation_opt = self - .produce_and_publish_attestations(slot, committee_index, &validator_duties) + // Download, sign and publish an `Attestation` for all validators at once + self.produce_and_publish_attestations(slot, &validator_duties) .await .map_err(move |e| { crit!( error = format!("{:?}", e), - committee_index, slot = slot.as_u64(), "Error during attestation routine" ) })?; drop(attestations_timer); + Ok(()) + } - // Step 2. - // - // If an attestation was produced, make an aggregate. - if let Some(attestation_data) = attestation_opt { - // First, wait until the `aggregation_production_instant` (2/3rds - // of the way though the slot). As verified in the - // `delay_triggers_when_in_the_past` test, this code will still run - // even if the instant has already elapsed. - sleep_until(aggregate_production_instant).await; - - // Start the metrics timer *after* we've done the delay. - let _aggregates_timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES], - ); - - // Then download, sign and publish a `SignedAggregateAndProof` for each - // validator that is elected to aggregate for this `slot` and - // `committee_index`. - self.produce_and_publish_aggregates( - &attestation_data, - committee_index, - &validator_duties, - ) + /// Produce and publish aggregated attestations for validators + async fn publish_aggregates( + self, + slot: Slot, + committee_index: CommitteeIndex, + validator_duties: Vec, + aggregate_production_instant: Instant, + ) -> Result<(), ()> { + // There's not need to produce `SignedAggregateAndProof` if we do not have + // any validators for the given `slot` and `committee_index`. + if validator_duties.is_empty() { + return Ok(()); + } + + // Wait until the `aggregation_production_instant` (2/3rds + // of the way though the slot). As verified in the + // `delay_triggers_when_in_the_past` test, this code will still run + // even if the instant has already elapsed. + sleep_until(aggregate_production_instant).await; + + // Start the metrics timer *after* we've done the delay. + let _aggregates_timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES], + ); + + let attestation_data = self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| { + format!( + "Failed to produce attestation data for aggregation: {:?}", + e + ) + }) + .map(|result| result.data) + }) + .await + .map_err(|e| { + error!( + error = %e, + slot = slot.as_u64(), + "Failed to produce attestation data for aggregation" + ); + })?; + + // Download, sign and publish a `SignedAggregateAndProof` for each + // validator that is elected to aggregate for this `slot` and + // `committee_index`. + self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties) .await .map_err(move |e| { crit!( error = format!("{:?}", e), committee_index, slot = slot.as_u64(), - "Error during attestation routine" + "Error during aggregate attestation routine" ) })?; - } Ok(()) } @@ -331,7 +356,6 @@ impl AttestationService Result, String> { if validator_duties.is_empty() { @@ -352,7 +376,8 @@ impl AttestationService AttestationService AttestationService AttestationService Date: Thu, 16 Oct 2025 10:00:25 +0800 Subject: [PATCH 2/7] refactor to reduce calling the bn --- .../src/attestation_service.rs | 150 +++++++++--------- 1 file changed, 71 insertions(+), 79 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 458a410561d..d75fa20baba 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -192,10 +192,41 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); + let attestation_data_cache = Arc::new(tokio::sync::RwLock::new(None)); + let attestation_data_cache_clone = attestation_data_cache.clone(); + + let attestation_service = self.clone(); self.inner.executor.spawn_ignoring_error( - self.clone().publish_attestations(slot, attestation_duties), + async move { + let attestation_data = attestation_service + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) + .await; + + match attestation_data { + Ok(attestation_data) => { + *attestation_data_cache_clone.write().await = + Some(attestation_data.clone()); + + attestation_service + .publish_attestations(slot, &attestation_duties, attestation_data) + .await + .ok(); + } + Err(e) => { + error!(error = %e, slot = slot.as_u64(), "Failed to get attestation data from beacon nodes"); + } + } + Ok(()) + }, "attestation publish", ); @@ -221,13 +252,15 @@ impl AttestationService AttestationService, - ) -> Result<(), ()> { - let attestations_timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS], - ); - - // There's not need to produce `Attestation` if we do not have - // any validators for the given `slot`. - if validator_duties.is_empty() { - return Ok(()); - } - - // Download, sign and publish an `Attestation` for all validators at once - self.produce_and_publish_attestations(slot, &validator_duties) - .await - .map_err(move |e| { - crit!( - error = format!("{:?}", e), - slot = slot.as_u64(), - "Error during attestation routine" - ) - })?; - - drop(attestations_timer); - Ok(()) - } - /// Produce and publish aggregated attestations for validators - async fn publish_aggregates( + async fn handle_aggregates( self, slot: Slot, committee_index: CommitteeIndex, validator_duties: Vec, aggregate_production_instant: Instant, + attestation_data: Arc>>, ) -> Result<(), ()> { // There's not need to produce `SignedAggregateAndProof` if we do not have // any validators for the given `slot` and `committee_index`. @@ -301,28 +302,29 @@ impl AttestationService AttestationService Result, String> { + attestation_data: AttestationData, + ) -> Result<(), String> { + let attestations_timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS], + ); + if validator_duties.is_empty() { - return Ok(None); + return Ok(()); } let current_epoch = self @@ -368,23 +376,6 @@ impl AttestationService AttestationService AttestationService Date: Thu, 16 Oct 2025 11:37:54 +0800 Subject: [PATCH 3/7] revise --- validator_client/validator_services/src/attestation_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index d75fa20baba..ba11760558f 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -275,7 +275,7 @@ impl AttestationService Date: Fri, 17 Oct 2025 12:29:34 +0800 Subject: [PATCH 4/7] revise --- .../src/attestation_service.rs | 123 +++++++++--------- 1 file changed, 59 insertions(+), 64 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index ba11760558f..47fd59287b7 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -193,12 +193,9 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); - - let attestation_data_cache = Arc::new(tokio::sync::RwLock::new(None)); - let attestation_data_cache_clone = attestation_data_cache.clone(); - let attestation_service = self.clone(); - self.inner.executor.spawn_ignoring_error( + + let attestation_handle = self.inner.executor.spawn_handle( async move { let attestation_data = attestation_service .beacon_nodes @@ -209,23 +206,21 @@ impl AttestationService { - *attestation_data_cache_clone.write().await = - Some(attestation_data.clone()); + .await + .map_err(|e| e.to_string())?; - attestation_service - .publish_attestations(slot, &attestation_duties, attestation_data) - .await - .ok(); - } - Err(e) => { - error!(error = %e, slot = slot.as_u64(), "Failed to get attestation data from beacon nodes"); - } - } - Ok(()) + attestation_service + .publish_attestations(slot, &attestation_duties, attestation_data.clone()) + .await + .map_err(|e| { + crit!( + error = format!("{:?}", e), + slot = slot.as_u64(), + "Error during attestation routine" + ); + e + })?; + Ok::(attestation_data) }, "attestation publish", ); @@ -248,23 +243,47 @@ impl AttestationService AttestationService, aggregate_production_instant: Instant, - attestation_data: Arc>>, + attestation_data: AttestationData, ) -> Result<(), ()> { // There's not need to produce `SignedAggregateAndProof` if we do not have // any validators for the given `slot` and `committee_index`. @@ -302,30 +321,6 @@ impl AttestationService AttestationService Result<(), String> { - let attestations_timer = validator_metrics::start_timer_vec( + let _attestations_timer = validator_metrics::start_timer_vec( &validator_metrics::ATTESTATION_SERVICE_TIMES, &[validator_metrics::ATTESTATIONS], ); @@ -388,6 +383,7 @@ impl AttestationService AttestationService Date: Fri, 17 Oct 2025 13:51:05 +0800 Subject: [PATCH 5/7] revise --- .../src/attestation_service.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 47fd59287b7..1d41214ba39 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -191,15 +191,19 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); - let attestation_service = self.clone(); + let attestation_service: AttestationService = self.clone(); - let attestation_handle = self.inner.executor.spawn_handle( + let attestation_data_handle = self.inner.executor.spawn_handle( async move { let attestation_data = attestation_service .beacon_nodes .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); beacon_node .get_validator_attestation_data(slot, 0) .await @@ -243,13 +247,13 @@ impl AttestationService AttestationService AttestationService Date: Fri, 17 Oct 2025 16:43:01 +0800 Subject: [PATCH 6/7] simplify --- validator_client/validator_services/src/attestation_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 1d41214ba39..ed069ceb4a6 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -193,7 +193,7 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); - let attestation_service: AttestationService = self.clone(); + let attestation_service = self.clone(); let attestation_data_handle = self.inner.executor.spawn_handle( async move { From 5f9ff2dcb238148615c668d9757a7a11fe1684d4 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Mon, 20 Oct 2025 07:02:24 +0800 Subject: [PATCH 7/7] simplify --- .../src/attestation_service.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index ed069ceb4a6..d3828606961 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -270,17 +270,13 @@ impl AttestationService