Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 110 additions & 103 deletions validator_client/validator_services/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,63 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

/// For each each required attestation, spawn a new task that downloads, signs and uploads the
/// attestation to the beacon node.
/// Spawn only one new task for attestation post-Electra
/// For each required aggregates, spawn a new task that downloads, signs and uploads the
/// aggregates to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;

// Create and publish an `Attestation` for all validators only once
// as the committee_index is not included in AttestationData post-Electra
let attestation_duties: Vec<_> = self.duties_service.attesters(slot).into_iter().collect();
let attestation_service = self.clone();

let attestation_data_handle = self.inner.executor.spawn_handle(
async move {
let attestation_data = attestation_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;

attestation_service
.publish_attestations(slot, &attestation_duties, attestation_data.clone())
.await
.map_err(|e| {
crit!(
error = format!("{:?}", e),
slot = slot.as_u64(),
"Error during attestation routine"
);
e
})?;
Ok::<AttestationData, String>(attestation_data)
},
"attestation publish",
);

// If a validator needs to publish an aggregate attestation, they must do so at 2/3
// through the slot. This delay triggers at this time
let aggregate_production_instant = Instant::now()
+ duration_to_next_slot
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));

let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
.into_iter()
Expand All @@ -207,24 +247,43 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
map
});

// For each committee index for this slot:
//
// - Create and publish an `Attestation` for all required validators.
// - Create and publish `SignedAggregateAndProof` for all aggregating validators.
duties_by_committee_index
.into_iter()
.for_each(|(committee_index, validator_duties)| {
// Spawn a separate task for each attestation.
self.inner.executor.spawn_ignoring_error(
self.clone().publish_attestations_and_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
),
"attestation publish",
);
});
// Spawn a task that awaits the attestation data handle and then spawns aggregate tasks
let attestation_service_clone = self.clone();
let executor = self.inner.executor.clone();
self.inner.executor.spawn(
async move {
// Log an error if the handle fails and return, skipping aggregates
let Some(handle) = attestation_data_handle else {
error!(slot = slot.as_u64(), "Failed to spawn attestation task");
return;
};

let Ok(Some(Ok(attestation_data))) = handle.await else {
error!(slot = slot.as_u64(), "Failed to spawn attestation task");
return;
};

// For each committee index for this slot:
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
aggregate_duties_by_committee_index.into_iter().for_each(
|(committee_index, validator_duties)| {
let attestation_service = attestation_service_clone.clone();
let attestation_data = attestation_data.clone();
executor.spawn_ignoring_error(
attestation_service.handle_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
attestation_data,
),
"aggregate publish",
);
},
)
},
"attestation and aggregate publish",
);

// Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
Expand All @@ -234,84 +293,45 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

/// Performs the first step of the attesting process: downloading `Attestation` objects,
/// signing them and returning them to the validator.
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
async fn publish_attestations_and_aggregates(
async fn handle_aggregates(
self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant,
attestation_data: AttestationData,
) -> Result<(), ()> {
let attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);

// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
// There's not need to produce `SignedAggregateAndProof` if we do not have
// any validators for the given `slot` and `committee_index`.
if validator_duties.is_empty() {
return Ok(());
}

// Step 1.
//
// Download, sign and publish an `Attestation` for each validator.
let attestation_opt = self
.produce_and_publish_attestations(slot, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
)
})?;
// Wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
sleep_until(aggregate_production_instant).await;

drop(attestations_timer);

// Step 2.
//
// If an attestation was produced, make an aggregate.
if let Some(attestation_data) = attestation_opt {
// First, wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
sleep_until(aggregate_production_instant).await;

// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(
&attestation_data,
committee_index,
&validator_duties,
)
// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);

// Download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
"Error during aggregate attestation routine"
)
})?;
}

Ok(())
}
Expand All @@ -328,14 +348,19 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
///
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
async fn produce_and_publish_attestations(
async fn publish_attestations(
&self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<Option<AttestationData>, String> {
attestation_data: AttestationData,
) -> Result<(), String> {
let _attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);

if validator_duties.is_empty() {
return Ok(None);
return Ok(());
}

let current_epoch = self
Expand All @@ -344,22 +369,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.ok_or("Unable to determine current slot from clock")?
.epoch(S::E::slots_per_epoch());

let attestation_data = self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;

// Create futures to produce signed `Attestation` objects.
let attestation_data_ref = &attestation_data;
let signing_futures = validator_duties.iter().map(|duty_and_proof| async move {
Expand Down Expand Up @@ -418,7 +427,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey,
validator = ?duty.pubkey,
committee_index = committee_index,
slot = slot.as_u64(),
"Missing pubkey for attestation"
);
Expand All @@ -428,7 +436,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
crit!(
error = ?e,
validator = ?duty.pubkey,
committee_index,
slot = slot.as_u64(),
"Failed to sign attestation"
);
Expand All @@ -446,7 +453,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,

if attestations.is_empty() {
warn!("No attestations were published");
return Ok(None);
return Ok(());
}
let fork_name = self
.chain_spec
Expand Down Expand Up @@ -507,7 +514,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
),
}

Ok(Some(attestation_data))
Ok(())
}

/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,
Expand Down
Loading